Fixing MR intermediate spills. Contributed by Arun Suresh.

(cherry picked from commit 6b710a42e0)
(cherry picked from commit 87862970f1)
This commit is contained in:
Vinod Kumar Vavilapalli 2015-05-14 16:07:56 -07:00
parent 5161751433
commit d9d7bbd99b
15 changed files with 154 additions and 44 deletions

View File

@ -82,6 +82,7 @@ public class LocalContainerLauncher extends AbstractService implements
private final TaskUmbilicalProtocol umbilical; private final TaskUmbilicalProtocol umbilical;
private ExecutorService taskRunner; private ExecutorService taskRunner;
private Thread eventHandler; private Thread eventHandler;
private byte[] encryptedSpillKey = new byte[] {0};
private BlockingQueue<ContainerLauncherEvent> eventQueue = private BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>(); new LinkedBlockingQueue<ContainerLauncherEvent>();
@ -156,6 +157,11 @@ public class LocalContainerLauncher extends AbstractService implements
} }
} }
public void setEncryptedSpillKey(byte[] encryptedSpillKey) {
if (encryptedSpillKey != null) {
this.encryptedSpillKey = encryptedSpillKey;
}
}
/* /*
* Uber-AM lifecycle/ordering ("normal" case): * Uber-AM lifecycle/ordering ("normal" case):
@ -354,6 +360,10 @@ public class LocalContainerLauncher extends AbstractService implements
// map to handle) // map to handle)
conf.setBoolean("mapreduce.task.uberized", true); conf.setBoolean("mapreduce.task.uberized", true);
// Check and handle Encrypted spill key
task.setEncryptedSpillKey(encryptedSpillKey);
YarnChild.setEncryptedSpillKeyIfRequired(task);
// META-FIXME: do we want the extra sanity-checking (doneWithMaps, // META-FIXME: do we want the extra sanity-checking (doneWithMaps,
// etc.), or just assume/hope the state machine(s) and uber-AM work // etc.), or just assume/hope the state machine(s) and uber-AM work
// as expected? // as expected?

View File

@ -85,13 +85,17 @@ public class TaskAttemptListenerImpl extends CompositeService
private JobTokenSecretManager jobTokenSecretManager = null; private JobTokenSecretManager jobTokenSecretManager = null;
private byte[] encryptedSpillKey;
public TaskAttemptListenerImpl(AppContext context, public TaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager, JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler) { RMHeartbeatHandler rmHeartbeatHandler,
byte[] secretShuffleKey) {
super(TaskAttemptListenerImpl.class.getName()); super(TaskAttemptListenerImpl.class.getName());
this.context = context; this.context = context;
this.jobTokenSecretManager = jobTokenSecretManager; this.jobTokenSecretManager = jobTokenSecretManager;
this.rmHeartbeatHandler = rmHeartbeatHandler; this.rmHeartbeatHandler = rmHeartbeatHandler;
this.encryptedSpillKey = secretShuffleKey;
} }
@Override @Override
@ -439,6 +443,7 @@ public class TaskAttemptListenerImpl extends CompositeService
jvmIDToActiveAttemptMap.remove(wJvmID); jvmIDToActiveAttemptMap.remove(wJvmID);
launchedJVMs.remove(wJvmID); launchedJVMs.remove(wJvmID);
LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
task.setEncryptedSpillKey(encryptedSpillKey);
jvmTask = new JvmTask(task, false); jvmTask = new JvmTask(task, false);
} }
} }

View File

@ -159,6 +159,7 @@ class YarnChild {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
// use job-specified working directory // use job-specified working directory
setEncryptedSpillKeyIfRequired(taskFinal);
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory()); FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
taskFinal.run(job, umbilical); // run the task taskFinal.run(job, umbilical); // run the task
return null; return null;
@ -217,6 +218,23 @@ class YarnChild {
} }
} }
/**
* Utility method to check if the Encrypted Spill Key needs to be set into the
* user credentials of the user running the Map / Reduce Task
* @param task The Map / Reduce task to set the Encrypted Spill information in
* @throws Exception
*/
public static void setEncryptedSpillKeyIfRequired(Task task) throws
Exception {
if ((task != null) && (task.getEncryptedSpillKey() != null) && (task
.getEncryptedSpillKey().length > 1)) {
Credentials creds =
UserGroupInformation.getCurrentUser().getCredentials();
TokenCache.setEncryptedSpillKey(task.getEncryptedSpillKey(), creds);
UserGroupInformation.getCurrentUser().addCredentials(creds);
}
}
/** /**
* Configure mapred-local dirs. This config is used by the task for finding * Configure mapred-local dirs. This config is used by the task for finding
* out an output directory. * out an output directory.

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -47,6 +48,7 @@ import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl; import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
@ -145,6 +147,8 @@ import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import javax.crypto.KeyGenerator;
/** /**
* The Map-Reduce Application Master. * The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface. * The state machine is encapsulated in the implementation of Job interface.
@ -172,6 +176,7 @@ public class MRAppMaster extends CompositeService {
* Priority of the MRAppMaster shutdown hook. * Priority of the MRAppMaster shutdown hook.
*/ */
public static final int SHUTDOWN_HOOK_PRIORITY = 30; public static final int SHUTDOWN_HOOK_PRIORITY = 30;
public static final String INTERMEDIATE_DATA_ENCRYPTION_ALGO = "HmacSHA1";
private Clock clock; private Clock clock;
private final long startTime; private final long startTime;
@ -202,6 +207,7 @@ public class MRAppMaster extends CompositeService {
private JobEventDispatcher jobEventDispatcher; private JobEventDispatcher jobEventDispatcher;
private JobHistoryEventHandler jobHistoryEventHandler; private JobHistoryEventHandler jobHistoryEventHandler;
private SpeculatorEventDispatcher speculatorEventDispatcher; private SpeculatorEventDispatcher speculatorEventDispatcher;
private byte[] encryptedSpillKey;
private Job job; private Job job;
private Credentials jobCredentials = new Credentials(); // Filled during init private Credentials jobCredentials = new Credentials(); // Filled during init
@ -646,8 +652,22 @@ public class MRAppMaster extends CompositeService {
try { try {
this.currentUser = UserGroupInformation.getCurrentUser(); this.currentUser = UserGroupInformation.getCurrentUser();
this.jobCredentials = ((JobConf)conf).getCredentials(); this.jobCredentials = ((JobConf)conf).getCredentials();
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
int keyLen = conf.getInt(
MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
MRJobConfig
.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS);
KeyGenerator keyGen =
KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO);
keyGen.init(keyLen);
encryptedSpillKey = keyGen.generateKey().getEncoded();
} else {
encryptedSpillKey = new byte[] {0};
}
} catch (IOException e) { } catch (IOException e) {
throw new YarnRuntimeException(e); throw new YarnRuntimeException(e);
} catch (NoSuchAlgorithmException e) {
throw new YarnRuntimeException(e);
} }
} }
@ -703,7 +723,7 @@ public class MRAppMaster extends CompositeService {
protected TaskAttemptListener createTaskAttemptListener(AppContext context) { protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
TaskAttemptListener lis = TaskAttemptListener lis =
new TaskAttemptListenerImpl(context, jobTokenSecretManager, new TaskAttemptListenerImpl(context, jobTokenSecretManager,
getRMHeartbeatHandler()); getRMHeartbeatHandler(), encryptedSpillKey);
return lis; return lis;
} }
@ -870,6 +890,8 @@ public class MRAppMaster extends CompositeService {
if (job.isUber()) { if (job.isUber()) {
this.containerLauncher = new LocalContainerLauncher(context, this.containerLauncher = new LocalContainerLauncher(context,
(TaskUmbilicalProtocol) taskAttemptListener); (TaskUmbilicalProtocol) taskAttemptListener);
((LocalContainerLauncher) this.containerLauncher)
.setEncryptedSpillKey(encryptedSpillKey);
} else { } else {
this.containerLauncher = new ContainerLauncherImpl(context); this.containerLauncher = new ContainerLauncherImpl(context);
} }

View File

@ -59,14 +59,14 @@ public class TestTaskAttemptListenerImpl {
public MockTaskAttemptListenerImpl(AppContext context, public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager, JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler) { RMHeartbeatHandler rmHeartbeatHandler) {
super(context, jobTokenSecretManager, rmHeartbeatHandler); super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
} }
public MockTaskAttemptListenerImpl(AppContext context, public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager, JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler, RMHeartbeatHandler rmHeartbeatHandler,
TaskHeartbeatHandler hbHandler) { TaskHeartbeatHandler hbHandler) {
super(context, jobTokenSecretManager, rmHeartbeatHandler); super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
this.taskHeartbeatHandler = hbHandler; this.taskHeartbeatHandler = hbHandler;
} }

View File

@ -253,7 +253,7 @@ public class TestFail {
//task time out is reduced //task time out is reduced
//when attempt times out, heartbeat handler will send the lost event //when attempt times out, heartbeat handler will send the lost event
//leading to Attempt failure //leading to Attempt failure
return new TaskAttemptListenerImpl(getContext(), null, null) { return new TaskAttemptListenerImpl(getContext(), null, null, null) {
@Override @Override
public void startRpcServer(){}; public void startRpcServer(){};
@Override @Override

View File

@ -148,6 +148,8 @@ abstract public class Task implements Writable, Configurable {
private String user; // user running the job private String user; // user running the job
private TaskAttemptID taskId; // unique, includes job id private TaskAttemptID taskId; // unique, includes job id
private int partition; // id within job private int partition; // id within job
private byte[] encryptedSpillKey = new byte[] {0}; // Key Used to encrypt
// intermediate spills
TaskStatus taskStatus; // current status of the task TaskStatus taskStatus; // current status of the task
protected JobStatus.State jobRunStateForCleanup; protected JobStatus.State jobRunStateForCleanup;
protected boolean jobCleanup = false; protected boolean jobCleanup = false;
@ -255,6 +257,24 @@ abstract public class Task implements Writable, Configurable {
this.tokenSecret = tokenSecret; this.tokenSecret = tokenSecret;
} }
/**
* Get Encrypted spill key
* @return encrypted spill key
*/
public byte[] getEncryptedSpillKey() {
return encryptedSpillKey;
}
/**
* Set Encrypted spill key
* @param encryptedSpillKey key
*/
public void setEncryptedSpillKey(byte[] encryptedSpillKey) {
if (encryptedSpillKey != null) {
this.encryptedSpillKey = encryptedSpillKey;
}
}
/** /**
* Get the job token secret * Get the job token secret
* @return the token secret * @return the token secret
@ -485,6 +505,8 @@ abstract public class Task implements Writable, Configurable {
out.writeBoolean(writeSkipRecs); out.writeBoolean(writeSkipRecs);
out.writeBoolean(taskCleanup); out.writeBoolean(taskCleanup);
Text.writeString(out, user); Text.writeString(out, user);
out.writeInt(encryptedSpillKey.length);
out.write(encryptedSpillKey);
extraData.write(out); extraData.write(out);
} }
@ -510,6 +532,9 @@ abstract public class Task implements Writable, Configurable {
setPhase(TaskStatus.Phase.CLEANUP); setPhase(TaskStatus.Phase.CLEANUP);
} }
user = StringInterner.weakIntern(Text.readString(in)); user = StringInterner.weakIntern(Text.readString(in));
int len = in.readInt();
encryptedSpillKey = new byte[len];
in.readFully(encryptedSpillKey);
extraData.readFields(in); extraData.readFields(in);
} }

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.crypto.CryptoFSDataInputStream; import org.apache.hadoop.fs.crypto.CryptoFSDataInputStream;
import org.apache.hadoop.fs.crypto.CryptoFSDataOutputStream; import org.apache.hadoop.fs.crypto.CryptoFSDataOutputStream;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.LimitInputStream; import org.apache.hadoop.util.LimitInputStream;
@ -50,7 +49,7 @@ public class CryptoUtils {
private static final Log LOG = LogFactory.getLog(CryptoUtils.class); private static final Log LOG = LogFactory.getLog(CryptoUtils.class);
public static boolean isShuffleEncrypted(Configuration conf) { public static boolean isEncryptedSpillEnabled(Configuration conf) {
return conf.getBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, return conf.getBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA); MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA);
} }
@ -64,7 +63,7 @@ public class CryptoUtils {
*/ */
public static byte[] createIV(Configuration conf) throws IOException { public static byte[] createIV(Configuration conf) throws IOException {
CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf); CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
if (isShuffleEncrypted(conf)) { if (isEncryptedSpillEnabled(conf)) {
byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()]; byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
cryptoCodec.generateSecureRandom(iv); cryptoCodec.generateSecureRandom(iv);
return iv; return iv;
@ -75,12 +74,12 @@ public class CryptoUtils {
public static int cryptoPadding(Configuration conf) { public static int cryptoPadding(Configuration conf) {
// Sizeof(IV) + long(start-offset) // Sizeof(IV) + long(start-offset)
return isShuffleEncrypted(conf) ? CryptoCodec.getInstance(conf) return isEncryptedSpillEnabled(conf) ? CryptoCodec.getInstance(conf)
.getCipherSuite().getAlgorithmBlockSize() + 8 : 0; .getCipherSuite().getAlgorithmBlockSize() + 8 : 0;
} }
private static byte[] getEncryptionKey() throws IOException { private static byte[] getEncryptionKey() throws IOException {
return TokenCache.getShuffleSecretKey(UserGroupInformation.getCurrentUser() return TokenCache.getEncryptedSpillKey(UserGroupInformation.getCurrentUser()
.getCredentials()); .getCredentials());
} }
@ -102,7 +101,7 @@ public class CryptoUtils {
*/ */
public static FSDataOutputStream wrapIfNecessary(Configuration conf, public static FSDataOutputStream wrapIfNecessary(Configuration conf,
FSDataOutputStream out) throws IOException { FSDataOutputStream out) throws IOException {
if (isShuffleEncrypted(conf)) { if (isEncryptedSpillEnabled(conf)) {
out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array()); out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array());
byte[] iv = createIV(conf); byte[] iv = createIV(conf);
out.write(iv); out.write(iv);
@ -137,7 +136,7 @@ public class CryptoUtils {
*/ */
public static InputStream wrapIfNecessary(Configuration conf, InputStream in, public static InputStream wrapIfNecessary(Configuration conf, InputStream in,
long length) throws IOException { long length) throws IOException {
if (isShuffleEncrypted(conf)) { if (isEncryptedSpillEnabled(conf)) {
int bufferSize = getBufferSize(conf); int bufferSize = getBufferSize(conf);
if (length > -1) { if (length > -1) {
in = new LimitInputStream(in, length); in = new LimitInputStream(in, length);
@ -174,7 +173,7 @@ public class CryptoUtils {
*/ */
public static FSDataInputStream wrapIfNecessary(Configuration conf, public static FSDataInputStream wrapIfNecessary(Configuration conf,
FSDataInputStream in) throws IOException { FSDataInputStream in) throws IOException {
if (isShuffleEncrypted(conf)) { if (isEncryptedSpillEnabled(conf)) {
CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf); CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
int bufferSize = getBufferSize(conf); int bufferSize = getBufferSize(conf);
// Not going to be used... but still has to be read... // Not going to be used... but still has to be read...

View File

@ -18,12 +18,10 @@
package org.apache.hadoop.mapreduce; package org.apache.hadoop.mapreduce;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -42,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -175,13 +172,8 @@ class JobSubmitter {
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen; KeyGenerator keyGen;
try { try {
int keyLen = CryptoUtils.isShuffleEncrypted(conf)
? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
: SHUFFLE_KEY_LENGTH;
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(keyLen); keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e); throw new IOException("Error generating shuffle secret key", e);
} }
@ -189,6 +181,11 @@ class JobSubmitter {
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials()); job.getCredentials());
} }
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
copyAndConfigureFiles(job, submitJobDir); copyAndConfigureFiles(job, submitJobDir);

View File

@ -158,6 +158,7 @@ public class TokenCache {
public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile"; public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
private static final Text JOB_TOKEN = new Text("JobToken"); private static final Text JOB_TOKEN = new Text("JobToken");
private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken"); private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
private static final Text ENC_SPILL_KEY = new Text("MapReduceEncryptedSpillKey");
/** /**
* load job token from a file * load job token from a file
@ -226,6 +227,15 @@ public class TokenCache {
return getSecretKey(credentials, SHUFFLE_TOKEN); return getSecretKey(credentials, SHUFFLE_TOKEN);
} }
@InterfaceAudience.Private
public static void setEncryptedSpillKey(byte[] key, Credentials credentials) {
credentials.addSecretKey(ENC_SPILL_KEY, key);
}
@InterfaceAudience.Private
public static byte[] getEncryptedSpillKey(Credentials credentials) {
return getSecretKey(credentials, ENC_SPILL_KEY);
}
/** /**
* @deprecated Use {@link Credentials#getToken(org.apache.hadoop.io.Text)} * @deprecated Use {@link Credentials#getToken(org.apache.hadoop.io.Text)}
* instead, this method is included for compatibility against Hadoop-1 * instead, this method is included for compatibility against Hadoop-1

View File

@ -127,6 +127,9 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
long compressedLength = ir.partLength; long compressedLength = ir.partLength;
long decompressedLength = ir.rawLength; long decompressedLength = ir.rawLength;
compressedLength -= CryptoUtils.cryptoPadding(job);
decompressedLength -= CryptoUtils.cryptoPadding(job);
// Get the location for the map output - either in-memory or on-disk // Get the location for the map output - either in-memory or on-disk
MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength, MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
id); id);
@ -150,8 +153,7 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
inStream = CryptoUtils.wrapIfNecessary(job, inStream); inStream = CryptoUtils.wrapIfNecessary(job, inStream);
try { try {
inStream.seek(ir.startOffset); inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
} finally { } finally {
try { try {

View File

@ -253,3 +253,11 @@ You can do this on a per-job basis, or by means of a cluster-wide setting in the
To set this property in NodeManager, set it in the `yarn-env.sh` file: To set this property in NodeManager, set it in the `yarn-env.sh` file:
YARN_NODEMANAGER_OPTS="-Djavax.net.debug=all" YARN_NODEMANAGER_OPTS="-Djavax.net.debug=all"
Encrypted Intermediate Data Spill files
---------------------------------------
This capability allows encryption of the intermediate files generated during the merge and shuffle phases.
It can be enabled by setting the `mapreduce.job.encrypted-intermediate-data` job property to `true`.
**NOTE:** Currently, enabling encrypted intermediate data spills would restrict the number of attempts of the job to 1.

View File

@ -87,7 +87,7 @@ public class TestMerger {
jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
TokenCache.setShuffleSecretKey(new byte[16], credentials); TokenCache.setEncryptedSpillKey(new byte[16], credentials);
UserGroupInformation.getCurrentUser().addCredentials(credentials); UserGroupInformation.getCurrentUser().addCredentials(credentials);
testInMemoryAndOnDiskMerger(); testInMemoryAndOnDiskMerger();
} }

View File

@ -52,24 +52,31 @@ public class TestMRIntermediateDataEncryption {
@Test @Test
public void testSingleReducer() throws Exception { public void testSingleReducer() throws Exception {
doEncryptionTest(3, 1, 2); doEncryptionTest(3, 1, 2, false);
}
@Test
public void testUberMode() throws Exception {
doEncryptionTest(3, 1, 2, true);
} }
@Test @Test
public void testMultipleMapsPerNode() throws Exception { public void testMultipleMapsPerNode() throws Exception {
doEncryptionTest(8, 1, 2); doEncryptionTest(8, 1, 2, false);
} }
@Test @Test
public void testMultipleReducers() throws Exception { public void testMultipleReducers() throws Exception {
doEncryptionTest(2, 4, 2); doEncryptionTest(2, 4, 2, false);
} }
public void doEncryptionTest(int numMappers, int numReducers, int numNodes) throws Exception { public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
doEncryptionTest(numMappers, numReducers, numNodes, 1000); boolean isUber) throws Exception {
doEncryptionTest(numMappers, numReducers, numNodes, 1000, isUber);
} }
public void doEncryptionTest(int numMappers, int numReducers, int numNodes, int numLines) throws Exception { public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
int numLines, boolean isUber) throws Exception {
MiniDFSCluster dfsCluster = null; MiniDFSCluster dfsCluster = null;
MiniMRClientCluster mrCluster = null; MiniMRClientCluster mrCluster = null;
FileSystem fileSystem = null; FileSystem fileSystem = null;
@ -85,7 +92,8 @@ public class TestMRIntermediateDataEncryption {
// Generate input. // Generate input.
createInput(fileSystem, numMappers, numLines); createInput(fileSystem, numMappers, numLines);
// Run the test. // Run the test.
runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem, numMappers, numReducers, numLines); runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem,
numMappers, numReducers, numLines, isUber);
} finally { } finally {
if (dfsCluster != null) { if (dfsCluster != null) {
dfsCluster.shutdown(); dfsCluster.shutdown();
@ -111,7 +119,8 @@ public class TestMRIntermediateDataEncryption {
} }
} }
private void runMergeTest(JobConf job, FileSystem fileSystem, int numMappers, int numReducers, int numLines) private void runMergeTest(JobConf job, FileSystem fileSystem, int
numMappers, int numReducers, int numLines, boolean isUber)
throws Exception { throws Exception {
fileSystem.delete(OUTPUT, true); fileSystem.delete(OUTPUT, true);
job.setJobName("Test"); job.setJobName("Test");
@ -133,6 +142,9 @@ public class TestMRIntermediateDataEncryption {
job.setInt("mapreduce.map.maxattempts", 1); job.setInt("mapreduce.map.maxattempts", 1);
job.setInt("mapreduce.reduce.maxattempts", 1); job.setInt("mapreduce.reduce.maxattempts", 1);
job.setInt("mapred.test.num_lines", numLines); job.setInt("mapred.test.num_lines", numLines);
if (isUber) {
job.setBoolean("mapreduce.job.ubertask.enable", true);
}
job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
try { try {
submittedJob = client.submitJob(job); submittedJob = client.submitJob(job);

View File

@ -118,6 +118,7 @@ public class TestMapProgress extends TestCase {
throws IOException, InterruptedException { throws IOException, InterruptedException {
StringBuffer buf = new StringBuffer("Task "); StringBuffer buf = new StringBuffer("Task ");
buf.append(taskId); buf.append(taskId);
if (taskStatus != null) {
buf.append(" making progress to "); buf.append(" making progress to ");
buf.append(taskStatus.getProgress()); buf.append(taskStatus.getProgress());
String state = taskStatus.getStateString(); String state = taskStatus.getStateString();
@ -125,6 +126,7 @@ public class TestMapProgress extends TestCase {
buf.append(" and state of "); buf.append(" and state of ");
buf.append(state); buf.append(state);
} }
}
LOG.info(buf.toString()); LOG.info(buf.toString());
// ignore phase // ignore phase
// ignore counters // ignore counters