MAPREDUCE-5652. NM Recovery. ShuffleHandler should handle NM restarts. (Jason Lowe via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1594329 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-05-13 19:10:48 +00:00
parent b0e80d1a47
commit f3c3d9e0c6
9 changed files with 441 additions and 12 deletions

View File

@ -191,6 +191,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5774. Job overview in History UI should list reducer phases in MAPREDUCE-5774. Job overview in History UI should list reducer phases in
chronological order. (Gera Shegalov via kasha) chronological order. (Gera Shegalov via kasha)
MAPREDUCE-5652. NM Recovery. ShuffleHandler should handle NM restarts.
(Jason Lowe via kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -242,3 +242,100 @@ For the org.apache.hadoop.util.bloom.* classes:
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
The binary distribution of this product bundles binaries of leveldbjni
(https://github.com/fusesource/leveldbjni), which is available under the
following license:
Copyright (c) 2011 FuseSource Corp. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of FuseSource Corp. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The binary distribution of this product bundles binaries of leveldb
(http://code.google.com/p/leveldb/), which is available under the following
license:
Copyright (c) 2011 The LevelDB Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The binary distribution of this product bundles binaries of snappy
(http://code.google.com/p/snappy/), which is available under the following
license:
Copyright 2011, Google Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -465,6 +465,9 @@
<Match> <Match>
<Class name="~org\.apache\.hadoop\.mapreduce\.v2\.proto.*" /> <Class name="~org\.apache\.hadoop\.mapreduce\.v2\.proto.*" />
</Match> </Match>
<Match>
<Package name="org.apache.hadoop.mapred.proto" />
</Match>
<!-- <!--
The below fields are accessed locally and only via methods that are synchronized. The below fields are accessed locally and only via methods that are synchronized.

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.io.Text;
@InterfaceStability.Stable @InterfaceStability.Stable
public class JobID extends org.apache.hadoop.mapred.ID public class JobID extends org.apache.hadoop.mapred.ID
implements Comparable<ID> { implements Comparable<ID> {
protected static final String JOB = "job"; public static final String JOB = "job";
// Jobid regex for various tools and framework components // Jobid regex for various tools and framework components
public static final String JOBID_REGEX = public static final String JOBID_REGEX =

View File

@ -33,14 +33,54 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId> <artifactId>hadoop-yarn-server-nodemanager</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId> <artifactId>hadoop-mapreduce-client-common</artifactId>
</dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions>
<execution>
<id>compile-protoc</id>
<phase>generate-sources</phase>
<goals>
<goal>protoc</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<protocCommand>${protoc.path}</protocCommand>
<imports>
<param>${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto</param>
<param>${basedir}/src/main/proto</param>
</imports>
<source>
<directory>${basedir}/src/main/proto</directory>
<includes>
<include>ShuffleHandlerRecovery.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import static org.fusesource.leveldbjni.JniDBFactory.asString;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.jboss.netty.handler.codec.http.HttpMethod.GET; import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
@ -60,6 +62,8 @@ import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto;
import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
@ -72,6 +76,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt; import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
@ -81,7 +86,14 @@ import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Logger;
import org.iq80.leveldb.Options;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
@ -115,6 +127,7 @@ import org.mortbay.jetty.HttpHeaders;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
public class ShuffleHandler extends AuxiliaryService { public class ShuffleHandler extends AuxiliaryService {
@ -132,6 +145,10 @@ public class ShuffleHandler extends AuxiliaryService {
"^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$", "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
Pattern.CASE_INSENSITIVE); Pattern.CASE_INSENSITIVE);
private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version";
private static final String STATE_DB_SCHEMA_VERSION = "1.0";
private int port; private int port;
private ChannelFactory selector; private ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup(); private final ChannelGroup accepted = new DefaultChannelGroup();
@ -149,14 +166,14 @@ public class ShuffleHandler extends AuxiliaryService {
private boolean shuffleTransferToAllowed; private boolean shuffleTransferToAllowed;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
private Map<String,String> userRsrc;
private JobTokenSecretManager secretManager;
private DB stateDb = null;
public static final String MAPREDUCE_SHUFFLE_SERVICEID = public static final String MAPREDUCE_SHUFFLE_SERVICEID =
"mapreduce_shuffle"; "mapreduce_shuffle";
private static final Map<String,String> userRsrc =
new ConcurrentHashMap<String,String>();
private static final JobTokenSecretManager secretManager =
new JobTokenSecretManager();
public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port"; public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
public static final int DEFAULT_SHUFFLE_PORT = 13562; public static final int DEFAULT_SHUFFLE_PORT = 13562;
@ -292,9 +309,7 @@ public class ShuffleHandler extends AuxiliaryService {
Token<JobTokenIdentifier> jt = deserializeServiceData(secret); Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
// TODO: Once SHuffle is out of NM, this can use MR APIs // TODO: Once SHuffle is out of NM, this can use MR APIs
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId()); JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
userRsrc.put(jobId.toString(), user); recordJobShuffleInfo(jobId, user, jt);
LOG.info("Added token for " + jobId.toString());
secretManager.addTokenForJob(jobId.toString(), jt);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error during initApp", e); LOG.error("Error during initApp", e);
// TODO add API to AuxiliaryServices to report failures // TODO add API to AuxiliaryServices to report failures
@ -305,8 +320,12 @@ public class ShuffleHandler extends AuxiliaryService {
public void stopApplication(ApplicationTerminationContext context) { public void stopApplication(ApplicationTerminationContext context) {
ApplicationId appId = context.getApplicationId(); ApplicationId appId = context.getApplicationId();
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId()); JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
secretManager.removeTokenForJob(jobId.toString()); try {
userRsrc.remove(jobId.toString()); removeJobShuffleInfo(jobId);
} catch (IOException e) {
LOG.error("Error during stopApp", e);
// TODO add API to AuxiliaryServices to report failures
}
} }
@Override @Override
@ -350,6 +369,9 @@ public class ShuffleHandler extends AuxiliaryService {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
Configuration conf = getConfig(); Configuration conf = getConfig();
userRsrc = new ConcurrentHashMap<String,String>();
secretManager = new JobTokenSecretManager();
recoverState(conf);
ServerBootstrap bootstrap = new ServerBootstrap(selector); ServerBootstrap bootstrap = new ServerBootstrap(selector);
try { try {
pipelineFact = new HttpPipelineFactory(conf); pipelineFact = new HttpPipelineFactory(conf);
@ -389,6 +411,9 @@ public class ShuffleHandler extends AuxiliaryService {
if (pipelineFact != null) { if (pipelineFact != null) {
pipelineFact.destroy(); pipelineFact.destroy();
} }
if (stateDb != null) {
stateDb.close();
}
super.serviceStop(); super.serviceStop();
} }
@ -407,6 +432,140 @@ public class ShuffleHandler extends AuxiliaryService {
return new Shuffle(conf); return new Shuffle(conf);
} }
private void recoverState(Configuration conf) throws IOException {
Path recoveryRoot = getRecoveryPath();
if (recoveryRoot != null) {
startStore(recoveryRoot);
Pattern jobPattern = Pattern.compile(JobID.JOBID_REGEX);
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(stateDb);
iter.seek(bytes(JobID.JOB));
while (iter.hasNext()) {
Map.Entry<byte[],byte[]> entry = iter.next();
String key = asString(entry.getKey());
if (!jobPattern.matcher(key).matches()) {
break;
}
recoverJobShuffleInfo(key, entry.getValue());
}
} catch (DBException e) {
throw new IOException("Database error during recovery", e);
} finally {
if (iter != null) {
iter.close();
}
}
}
}
private void startStore(Path recoveryRoot) throws IOException {
Options options = new Options();
options.createIfMissing(false);
options.logger(new LevelDBLogger());
Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
LOG.info("Using state database at " + dbPath + " for recovery");
File dbfile = new File(dbPath.toString());
byte[] schemaVersionData;
try {
stateDb = JniDBFactory.factory.open(dbfile, options);
schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating state database at " + dbfile);
options.createIfMissing(true);
try {
stateDb = JniDBFactory.factory.open(dbfile, options);
schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION);
stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData);
} catch (DBException dbExc) {
throw new IOException("Unable to create state store", dbExc);
}
} else {
throw e;
}
}
if (schemaVersionData != null) {
String schemaVersion = asString(schemaVersionData);
// only support exact schema matches for now
if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) {
throw new IOException("Incompatible state database schema, found "
+ schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION);
}
} else {
throw new IOException("State database schema version not found");
}
}
private void addJobToken(JobID jobId, String user,
Token<JobTokenIdentifier> jobToken) {
userRsrc.put(jobId.toString(), user);
secretManager.addTokenForJob(jobId.toString(), jobToken);
LOG.info("Added token for " + jobId.toString());
}
private void recoverJobShuffleInfo(String jobIdStr, byte[] data)
throws IOException {
JobID jobId;
try {
jobId = JobID.forName(jobIdStr);
} catch (IllegalArgumentException e) {
throw new IOException("Bad job ID " + jobIdStr + " in state store", e);
}
JobShuffleInfoProto proto = JobShuffleInfoProto.parseFrom(data);
String user = proto.getUser();
TokenProto tokenProto = proto.getJobToken();
Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
tokenProto.getIdentifier().toByteArray(),
tokenProto.getPassword().toByteArray(),
new Text(tokenProto.getKind()), new Text(tokenProto.getService()));
addJobToken(jobId, user, jobToken);
}
private void recordJobShuffleInfo(JobID jobId, String user,
Token<JobTokenIdentifier> jobToken) throws IOException {
if (stateDb != null) {
TokenProto tokenProto = TokenProto.newBuilder()
.setIdentifier(ByteString.copyFrom(jobToken.getIdentifier()))
.setPassword(ByteString.copyFrom(jobToken.getPassword()))
.setKind(jobToken.getKind().toString())
.setService(jobToken.getService().toString())
.build();
JobShuffleInfoProto proto = JobShuffleInfoProto.newBuilder()
.setUser(user).setJobToken(tokenProto).build();
try {
stateDb.put(bytes(jobId.toString()), proto.toByteArray());
} catch (DBException e) {
throw new IOException("Error storing " + jobId, e);
}
}
addJobToken(jobId, user, jobToken);
}
private void removeJobShuffleInfo(JobID jobId) throws IOException {
String jobIdStr = jobId.toString();
secretManager.removeTokenForJob(jobIdStr);
userRsrc.remove(jobIdStr);
if (stateDb != null) {
try {
stateDb.delete(bytes(jobIdStr));
} catch (DBException e) {
throw new IOException("Unable to remove " + jobId
+ " from state store", e);
}
}
}
private static class LevelDBLogger implements Logger {
private static final Log LOG = LogFactory.getLog(LevelDBLogger.class);
@Override
public void log(String message) {
LOG.info(message);
}
}
class HttpPipelineFactory implements ChannelPipelineFactory { class HttpPipelineFactory implements ChannelPipelineFactory {
final Shuffle SHUFFLE; final Shuffle SHUFFLE;

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.mapred.proto";
option java_outer_classname = "ShuffleHandlerRecoveryProtos";
option java_generic_services = true;
package hadoop.mapreduce;
import "Security.proto";
message JobShuffleInfoProto {
optional string user = 1;
optional hadoop.common.TokenProto jobToken = 2;
}

View File

@ -51,11 +51,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSource;
@ -68,6 +72,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
@ -645,4 +650,93 @@ public class TestShuffleHandler {
output.writeLong(chk.getChecksum().getValue()); output.writeLong(chk.getChecksum().getValue());
output.close(); output.close();
} }
@Test
public void testRecovery() throws IOException {
final String user = "someuser";
final ApplicationId appId = ApplicationId.newInstance(12345, 1);
final JobID jobId = JobID.downgrade(TypeConverter.fromYarn(appId));
final File tmpDir = new File(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestShuffleHandler.class.getName());
Configuration conf = new Configuration();
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
ShuffleHandler shuffle = new ShuffleHandler();
// emulate aux services startup with recovery enabled
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
tmpDir.mkdirs();
try {
shuffle.init(conf);
shuffle.start();
// setup a shuffle token for an application
DataOutputBuffer outputBuffer = new DataOutputBuffer();
outputBuffer.reset();
Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(
"identifier".getBytes(), "password".getBytes(), new Text(user),
new Text("shuffleService"));
jt.write(outputBuffer);
shuffle.initializeApplication(new ApplicationInitializationContext(user,
appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
outputBuffer.getLength())));
// verify we are authorized to shuffle
int rc = getShuffleResponseCode(shuffle, jt);
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
// emulate shuffle handler restart
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
shuffle.start();
// verify we are still authorized to shuffle to the old application
rc = getShuffleResponseCode(shuffle, jt);
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
// shutdown app and verify access is lost
shuffle.stopApplication(new ApplicationTerminationContext(appId));
rc = getShuffleResponseCode(shuffle, jt);
Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
// emulate shuffle handler restart
shuffle.close();
shuffle = new ShuffleHandler();
shuffle.setRecoveryPath(new Path(tmpDir.toString()));
shuffle.init(conf);
shuffle.start();
// verify we still don't have access
rc = getShuffleResponseCode(shuffle, jt);
Assert.assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
} finally {
if (shuffle != null) {
shuffle.close();
}
FileUtil.fullyDelete(tmpDir);
}
}
private static int getShuffleResponseCode(ShuffleHandler shuffle,
Token<JobTokenIdentifier> jt) throws IOException {
URL url = new URL("http://127.0.0.1:"
+ shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
String encHash = SecureShuffleUtils.hashFromString(
SecureShuffleUtils.buildMsgFrom(url),
JobTokenSecretManager.createSecretKey(jt.getPassword()));
conn.addRequestProperty(
SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
conn.connect();
int rc = conn.getResponseCode();
conn.disconnect();
return rc;
}
} }

View File

@ -144,6 +144,10 @@
<artifactId>hsqldb</artifactId> <artifactId>hsqldb</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
</dependencies> </dependencies>