diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index dc917026be4..0617a5b1238 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -68,6 +68,8 @@ Release 0.23.2 - UNRELEASED NEW FEATURES IMPROVEMENTS + HADOOP-8048. Allow merging of Credentials (Daryn Sharp via tgraves) + HADOOP-8032. mvn site:stage-deploy should be able to use the scp protocol to stage documents (Ravi Prakash via tgraves) @@ -75,6 +77,8 @@ Release 0.23.2 - UNRELEASED (szetszwo) OPTIMIZATIONS + HADOOP-8071. Avoid an extra packet in client code when nagling is + disabled. (todd) BUG FIXES @@ -85,6 +89,14 @@ Release 0.23.2 - UNRELEASED HADOOP-8035 Hadoop Maven site is inefficient and runs phases redundantly (abayer via tucu) + HADOOP-8051 HttpFS documentation it is not wired to the generated site (tucu) + + HADOOP-8055. Hadoop tarball distribution lacks a core-site.xml (harsh) + + HADOOP-8052. Hadoop Metrics2 should emit Float.MAX_VALUE (instead of + Double.MAX_VALUE) to avoid making Ganglia's gmetad core. (Varun Kapoor + via mattf) + Release 0.23.1 - 2012-02-08 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/conf/core-site.xml b/hadoop-common-project/hadoop-common/src/main/conf/core-site.xml new file mode 100644 index 00000000000..d2ddf893e49 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/conf/core-site.xml @@ -0,0 +1,20 @@ + + + + + + + + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index eb8e704f5f5..bc751f20f2d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -799,9 +799,12 @@ public class Client { header.write(d); call.rpcRequest.write(d); byte[] data = d.getData(); - int dataLength = d.getLength(); - out.writeInt(dataLength); //first put the data length - out.write(data, 0, dataLength);//write the data + int dataLength = d.getLength() - 4; + data[0] = (byte)((dataLength >>> 24) & 0xff); + data[1] = (byte)((dataLength >>> 16) & 0xff); + data[2] = (byte)((dataLength >>> 8) & 0xff); + data[3] = (byte)(dataLength & 0xff); + out.write(data, 0, dataLength + 4);//write the data out.flush(); } } catch(IOException e) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/SampleStat.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/SampleStat.java index f154269698a..589062a691c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/SampleStat.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/SampleStat.java @@ -143,8 +143,16 @@ public class SampleStat { @SuppressWarnings("PublicInnerClass") public static class MinMax { - private double min = Double.MAX_VALUE; - private double max = Double.MIN_VALUE; + // Float.MAX_VALUE is used rather than Double.MAX_VALUE, even though the + // min and max variables are of type double. + // Float.MAX_VALUE is big enough, and using Double.MAX_VALUE makes + // Ganglia core due to buffer overflow. + // The same reasoning applies to the MIN_VALUE counterparts. + static final double DEFAULT_MIN_VALUE = Float.MAX_VALUE; + static final double DEFAULT_MAX_VALUE = Float.MIN_VALUE; + + private double min = DEFAULT_MIN_VALUE; + private double max = DEFAULT_MAX_VALUE; public void add(double value) { if (value > max) max = value; @@ -155,8 +163,8 @@ public class SampleStat { public double max() { return max; } public void reset() { - min = Double.MAX_VALUE; - max = Double.MIN_VALUE; + min = DEFAULT_MIN_VALUE; + max = DEFAULT_MAX_VALUE; } public void reset(MinMax other) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java index 152a35496b8..9883604a2f9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java @@ -230,14 +230,34 @@ public class Credentials implements Writable { /** * Copy all of the credentials from one credential object into another. + * Existing secrets and tokens are overwritten. * @param other the credentials to copy */ public void addAll(Credentials other) { + addAll(other, true); + } + + /** + * Copy all of the credentials from one credential object into another. + * Existing secrets and tokens are not overwritten. + * @param other the credentials to copy + */ + public void mergeAll(Credentials other) { + addAll(other, false); + } + + private void addAll(Credentials other, boolean overwrite) { for(Map.Entry secret: other.secretKeysMap.entrySet()) { - secretKeysMap.put(secret.getKey(), secret.getValue()); + Text key = secret.getKey(); + if (!secretKeysMap.containsKey(key) || overwrite) { + secretKeysMap.put(key, secret.getValue()); + } } for(Map.Entry> token: other.tokenMap.entrySet()){ - tokenMap.put(token.getKey(), token.getValue()); + Text key = token.getKey(); + if (!tokenMap.containsKey(key) || overwrite) { + tokenMap.put(key, token.getValue()); + } } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleStat.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleStat.java index 36ca6bb1664..0fb0ad8ace9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleStat.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleStat.java @@ -36,8 +36,8 @@ public class TestSampleStat { assertEquals("mean", 0.0, stat.mean(), EPSILON); assertEquals("variance", 0.0, stat.variance(), EPSILON); assertEquals("stddev", 0.0, stat.stddev(), EPSILON); - assertEquals("min", Double.MAX_VALUE, stat.min(), EPSILON); - assertEquals("max", Double.MIN_VALUE, stat.max(), EPSILON); + assertEquals("min", SampleStat.MinMax.DEFAULT_MIN_VALUE, stat.min(), EPSILON); + assertEquals("max", SampleStat.MinMax.DEFAULT_MAX_VALUE, stat.max(), EPSILON); stat.add(3); assertEquals("num samples", 1L, stat.numSamples()); @@ -60,8 +60,8 @@ public class TestSampleStat { assertEquals("mean", 0.0, stat.mean(), EPSILON); assertEquals("variance", 0.0, stat.variance(), EPSILON); assertEquals("stddev", 0.0, stat.stddev(), EPSILON); - assertEquals("min", Double.MAX_VALUE, stat.min(), EPSILON); - assertEquals("max", Double.MIN_VALUE, stat.max(), EPSILON); + assertEquals("min", SampleStat.MinMax.DEFAULT_MIN_VALUE, stat.min(), EPSILON); + assertEquals("max", SampleStat.MinMax.DEFAULT_MAX_VALUE, stat.max(), EPSILON); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestCredentials.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestCredentials.java index 7bedd2d028a..56b5c32521d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestCredentials.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestCredentials.java @@ -137,4 +137,81 @@ public class TestCredentials { } tmpFileName.delete(); } + + static Text secret[] = { + new Text("secret1"), + new Text("secret2"), + new Text("secret3"), + new Text("secret4") + }; + static Text service[] = { + new Text("service1"), + new Text("service2"), + new Text("service3"), + new Text("service4") + }; + static Token token[] = { + new Token(), + new Token(), + new Token(), + new Token() + }; + + @Test + public void addAll() { + Credentials creds = new Credentials(); + creds.addToken(service[0], token[0]); + creds.addToken(service[1], token[1]); + creds.addSecretKey(secret[0], secret[0].getBytes()); + creds.addSecretKey(secret[1], secret[1].getBytes()); + + Credentials credsToAdd = new Credentials(); + // one duplicate with different value, one new + credsToAdd.addToken(service[0], token[3]); + credsToAdd.addToken(service[2], token[2]); + credsToAdd.addSecretKey(secret[0], secret[3].getBytes()); + credsToAdd.addSecretKey(secret[2], secret[2].getBytes()); + + creds.addAll(credsToAdd); + assertEquals(3, creds.numberOfTokens()); + assertEquals(3, creds.numberOfSecretKeys()); + // existing token & secret should be overwritten + assertEquals(token[3], creds.getToken(service[0])); + assertEquals(secret[3], new Text(creds.getSecretKey(secret[0]))); + // non-duplicate token & secret should be present + assertEquals(token[1], creds.getToken(service[1])); + assertEquals(secret[1], new Text(creds.getSecretKey(secret[1]))); + // new token & secret should be added + assertEquals(token[2], creds.getToken(service[2])); + assertEquals(secret[2], new Text(creds.getSecretKey(secret[2]))); + } + + @Test + public void mergeAll() { + Credentials creds = new Credentials(); + creds.addToken(service[0], token[0]); + creds.addToken(service[1], token[1]); + creds.addSecretKey(secret[0], secret[0].getBytes()); + creds.addSecretKey(secret[1], secret[1].getBytes()); + + Credentials credsToAdd = new Credentials(); + // one duplicate with different value, one new + credsToAdd.addToken(service[0], token[3]); + credsToAdd.addToken(service[2], token[2]); + credsToAdd.addSecretKey(secret[0], secret[3].getBytes()); + credsToAdd.addSecretKey(secret[2], secret[2].getBytes()); + + creds.mergeAll(credsToAdd); + assertEquals(3, creds.numberOfTokens()); + assertEquals(3, creds.numberOfSecretKeys()); + // existing token & secret should not be overwritten + assertEquals(token[0], creds.getToken(service[0])); + assertEquals(secret[0], new Text(creds.getSecretKey(secret[0]))); + // non-duplicate token & secret should be present + assertEquals(token[1], creds.getToken(service[1])); + assertEquals(secret[1], new Text(creds.getSecretKey(secret[1]))); + // new token & secret should be added + assertEquals(token[2], creds.getToken(service[2])); + assertEquals(secret[2], new Text(creds.getSecretKey(secret[2]))); } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/site/site.xml b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/site/site.xml index d6424ebc2f9..01b35e0ae15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/site/site.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/site/site.xml @@ -14,21 +14,16 @@ --> - + + org.apache.maven.skins + maven-stylus-skin + 1.2 + - -   - - - - org.apache.maven.skins - maven-stylus-skin - 1.2 - - - - - - + + + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index de7bc757bc0..73febce94d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -130,8 +130,14 @@ Release 0.23.2 - UNRELEASED NEW FEATURES + HDFS-2943. Expose last checkpoint time and transaction stats as JMX + metrics. (atm) + IMPROVEMENTS + HDFS-2931. Switch DataNode's BlockVolumeChoosingPolicy to private-audience. + (harsh via szetszwo) + OPTIMIZATIONS BUG FIXES @@ -140,6 +146,14 @@ Release 0.23.2 - UNRELEASED HDFS-2764. TestBackupNode is racy. (atm) + HDFS-2869. Fix an error in the webhdfs docs for the mkdir op (harsh) + + HDFS-776. Fix exception handling in Balancer. (Uma Maheswara Rao G + via szetszwo) + + HDFS-2815. Namenode sometimes oes not come out of safemode during + NN crash + restart. (Uma Maheswara Rao via suresh) + Release 0.23.1 - 2012-02-08 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/webhdfs.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/webhdfs.xml index 32a26f0c4a0..43764ca2758 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/webhdfs.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/webhdfs.xml @@ -349,7 +349,7 @@ Hello, webhdfs user!
  • Submit a HTTP PUT request. -curl -i -X PUT "http://<HOST>:<PORT>/<PATH>?op=MKDIRS[&permission=<OCTAL>]" + curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=MKDIRS[&permission=<OCTAL>]" The client receives a response with a boolean JSON object: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index ebd9783fc14..83822e4c31e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -125,6 +125,10 @@ class NameNodeConnector { if (!isBlockTokenEnabled) { return BlockTokenSecretManager.DUMMY_TOKEN; } else { + if (!shouldRun) { + throw new IOException( + "Can not get access token. BlockKeyUpdater is not running"); + } return blockTokenSecretManager.generateToken(null, eb, EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE, BlockTokenSecretManager.AccessMode.COPY)); @@ -221,16 +225,20 @@ class NameNodeConnector { */ class BlockKeyUpdater implements Runnable { public void run() { - while (shouldRun) { - try { - blockTokenSecretManager.setKeys(namenode.getBlockKeys()); - } catch (Exception e) { - LOG.error("Failed to set keys", e); - } - try { + try { + while (shouldRun) { + try { + blockTokenSecretManager.setKeys(namenode.getBlockKeys()); + } catch (IOException e) { + LOG.error("Failed to set keys", e); + } Thread.sleep(keyUpdaterInterval); - } catch (InterruptedException ie) { } + } catch (InterruptedException e) { + LOG.info("InterruptedException in block key updater thread", e); + } catch (Throwable e) { + LOG.error("Exception in block key updater thread", e); + shouldRun = false; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java index 49e0f464d91..31cf30a925d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java @@ -28,10 +28,12 @@ import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterfa * BlockVolumeChoosingPolicy allows a DataNode to * specify what policy is to be used while choosing * a volume for a block request. - * + * + * Note: This is an evolving i/f and is only for + * advanced use. + * ***************************************************/ -@InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceAudience.Private public interface BlockVolumeChoosingPolicy { /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index e65150ce1ad..dade5aba813 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -710,7 +710,7 @@ public class FSImage implements Closeable { long txId = loader.getLoadedImageTxId(); LOG.info("Loaded image for txid " + txId + " from " + curFile); lastAppliedTxId = txId; - storage.setMostRecentCheckpointTxId(txId); + storage.setMostRecentCheckpointInfo(txId, curFile.lastModified()); } /** @@ -726,7 +726,7 @@ public class FSImage implements Closeable { saver.save(newFile, txid, source, compression); MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest()); - storage.setMostRecentCheckpointTxId(txid); + storage.setMostRecentCheckpointInfo(txid, Util.now()); } /** @@ -988,7 +988,7 @@ public class FSImage implements Closeable { // advertise it as such to other checkpointers // from now on if (txid > storage.getMostRecentCheckpointTxId()) { - storage.setMostRecentCheckpointTxId(txid); + storage.setMostRecentCheckpointInfo(txid, Util.now()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index e4f03386456..92899797e12 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1933,7 +1933,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean enforcePermission) throws AccessControlException, SafeModeException, UnresolvedLinkException, IOException { - boolean deleteNow = false; ArrayList collectedBlocks = new ArrayList(); writeLock(); @@ -1951,10 +1950,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (!dir.delete(src, collectedBlocks)) { return false; } - deleteNow = collectedBlocks.size() <= BLOCK_DELETION_INCREMENT; - if (deleteNow) { // Perform small deletes right away - removeBlocks(collectedBlocks); - } } finally { writeUnlock(); } @@ -1963,9 +1958,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, writeLock(); try { - if (!deleteNow) { - removeBlocks(collectedBlocks); // Incremental deletion of blocks - } + removeBlocks(collectedBlocks); // Incremental deletion of blocks } finally { writeUnlock(); } @@ -2659,6 +2652,31 @@ public class FSNamesystem implements Namesystem, FSClusterStats, public int getExpiredHeartbeats() { return datanodeStatistics.getExpiredHeartbeats(); } + + @Metric({"TransactionsSinceLastCheckpoint", + "Number of transactions since last checkpoint"}) + public long getTransactionsSinceLastCheckpoint() { + return getEditLog().getLastWrittenTxId() - + getFSImage().getStorage().getMostRecentCheckpointTxId(); + } + + @Metric({"TransactionsSinceLastLogRoll", + "Number of transactions since last edit log roll"}) + public long getTransactionsSinceLastLogRoll() { + return (getEditLog().getLastWrittenTxId() - + getEditLog().getCurSegmentTxId()) + 1; + } + + @Metric({"LastWrittenTransactionId", "Transaction ID written to the edit log"}) + public long getLastWrittenTransactionId() { + return getEditLog().getLastWrittenTxId(); + } + + @Metric({"LastCheckpointTime", + "Time in milliseconds since the epoch of the last checkpoint"}) + public long getLastCheckpointTime() { + return getFSImage().getStorage().getMostRecentCheckpointTime(); + } /** @see ClientProtocol#getStats() */ long[] getStats() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java index 31e8a7ec205..4b282917211 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java @@ -125,6 +125,11 @@ public class NNStorage extends Storage implements Closeable { * that have since been written to the edit log. */ protected long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID; + + /** + * Time of the last checkpoint, in milliseconds since the epoch. + */ + private long mostRecentCheckpointTime = 0; /** * list of failed (and thus removed) storages @@ -417,18 +422,29 @@ public class NNStorage extends Storage implements Closeable { } /** - * Set the transaction ID of the last checkpoint + * Set the transaction ID and time of the last checkpoint + * + * @param txid transaction id of the last checkpoint + * @param time time of the last checkpoint, in millis since the epoch */ - void setMostRecentCheckpointTxId(long txid) { + void setMostRecentCheckpointInfo(long txid, long time) { this.mostRecentCheckpointTxId = txid; + this.mostRecentCheckpointTime = time; } /** - * Return the transaction ID of the last checkpoint. + * @return the transaction ID of the last checkpoint. */ long getMostRecentCheckpointTxId() { return mostRecentCheckpointTxId; } + + /** + * @return the time of the most recent checkpoint in millis since the epoch. + */ + long getMostRecentCheckpointTime() { + return mostRecentCheckpointTime; + } /** * Write a small file in all available storage directories that diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java index 6ca0ffe7b31..6d06da49683 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java @@ -57,14 +57,12 @@ public class TestBalancerWithMultipleNameNodes { ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.OFF); ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.OFF); ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.OFF); -// ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF); } private static final long CAPACITY = 500L; private static final String RACK0 = "/rack0"; private static final String RACK1 = "/rack1"; - private static final String RACK2 = "/rack2"; private static final String FILE_NAME = "/tmp.txt"; private static final Path FILE_PATH = new Path(FILE_NAME); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java index 3426a5ad1e3..1a9db4a63e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java @@ -20,13 +20,12 @@ package org.apache.hadoop.hdfs.server.namenode.metrics; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.*; import java.io.DataInputStream; import java.io.IOException; import java.util.Random; -import junit.framework.TestCase; - import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; @@ -39,17 +38,21 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.test.MetricsAsserts; import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; /** * Test for metrics published by the Namenode */ -public class TestNameNodeMetrics extends TestCase { +public class TestNameNodeMetrics { private static final Configuration CONF = new HdfsConfiguration(); private static final int DFS_REPLICATION_INTERVAL = 1; private static final Path TEST_ROOT_DIR_PATH = @@ -81,8 +84,8 @@ public class TestNameNodeMetrics extends TestCase { return new Path(TEST_ROOT_DIR_PATH, fileName); } - @Override - protected void setUp() throws Exception { + @Before + public void setUp() throws Exception { cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build(); cluster.waitActive(); namesystem = cluster.getNamesystem(); @@ -90,8 +93,8 @@ public class TestNameNodeMetrics extends TestCase { fs = (DistributedFileSystem) cluster.getFileSystem(); } - @Override - protected void tearDown() throws Exception { + @After + public void tearDown() throws Exception { cluster.shutdown(); } @@ -115,6 +118,7 @@ public class TestNameNodeMetrics extends TestCase { } /** Test metrics associated with addition of a file */ + @Test public void testFileAdd() throws Exception { // Add files with 100 blocks final Path file = getTestPath("testFileAdd"); @@ -161,6 +165,7 @@ public class TestNameNodeMetrics extends TestCase { } /** Corrupt a block and ensure metrics reflects it */ + @Test public void testCorruptBlock() throws Exception { // Create a file with single block with two replicas final Path file = getTestPath("testCorruptBlock"); @@ -186,6 +191,7 @@ public class TestNameNodeMetrics extends TestCase { /** Create excess blocks by reducing the replication factor for * for a file and ensure metrics reflects it */ + @Test public void testExcessBlocks() throws Exception { Path file = getTestPath("testExcessBlocks"); createFile(file, 100, (short)2); @@ -198,6 +204,7 @@ public class TestNameNodeMetrics extends TestCase { } /** Test to ensure metrics reflects missing blocks */ + @Test public void testMissingBlock() throws Exception { // Create a file with single block with two replicas Path file = getTestPath("testMissingBlocks"); @@ -216,6 +223,7 @@ public class TestNameNodeMetrics extends TestCase { assertGauge("UnderReplicatedBlocks", 0L, getMetrics(NS_METRICS)); } + @Test public void testRenameMetrics() throws Exception { Path src = getTestPath("src"); createFile(src, 100, (short)1); @@ -240,7 +248,8 @@ public class TestNameNodeMetrics extends TestCase { * * @throws IOException in case of an error */ - public void testGetBlockLocationMetric() throws Exception{ + @Test + public void testGetBlockLocationMetric() throws Exception { Path file1_Path = new Path(TEST_ROOT_DIR_PATH, "file1.dat"); // When cluster starts first time there are no file (read,create,open) @@ -268,4 +277,46 @@ public class TestNameNodeMetrics extends TestCase { updateMetrics(); assertCounter("GetBlockLocations", 3L, getMetrics(NN_METRICS)); } + + /** + * Test NN checkpoint and transaction-related metrics. + */ + @Test + public void testTransactionAndCheckpointMetrics() throws Exception { + long lastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime", + getMetrics(NS_METRICS)); + + assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS)); + assertGauge("LastWrittenTransactionId", 1L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS)); + + fs.mkdirs(new Path(TEST_ROOT_DIR_PATH, "/tmp")); + updateMetrics(); + + assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS)); + assertGauge("LastWrittenTransactionId", 2L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastCheckpoint", 2L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastLogRoll", 2L, getMetrics(NS_METRICS)); + + cluster.getNameNodeRpc().rollEditLog(); + updateMetrics(); + + assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS)); + assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS)); + + cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER); + cluster.getNameNodeRpc().saveNamespace(); + cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + updateMetrics(); + + long newLastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime", + getMetrics(NS_METRICS)); + assertTrue(lastCkptTime < newLastCkptTime); + assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS)); + } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5a13eefcd9b..2c2f37f6e30 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -28,6 +28,8 @@ Release 0.23-PB - Unreleased Release 0.23.2 - UNRELEASED + INCOMPATIBLE CHANGES + NEW FEATURES IMPROVEMENTS @@ -36,9 +38,17 @@ Release 0.23.2 - UNRELEASED BUG FIXES + MAPREDUCE-3680. FifoScheduler web service rest API can print out invalid + JSON. (B Anil Kumar via tgraves) + + MAPREDUCE-3852. Test TestLinuxResourceCalculatorPlugin failing. (Thomas + Graves via mahadev) + Release 0.23.1 - 2012-02-08 - NEW FEATURES + NEW FEATURES + + MAPREDUCE-778. Rumen Anonymizer. (Amar Kamat and Chris Douglas via amarrk) MAPREDUCE-3121. NodeManager should handle disk-failures (Ravi Gummadi via mahadev) @@ -51,6 +61,7 @@ Release 0.23.1 - 2012-02-08 MAPREDUCE-778. Rumen Anonymizer. (Amar Kamat and Chris Douglas via amarrk) IMPROVEMENTS + MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk) MAPREDUCE-3597. [Rumen] Rumen should provide APIs to access all the @@ -58,9 +69,10 @@ Release 0.23.1 - 2012-02-08 MAPREDUCE-3375. [Gridmix] Memory Emulation system tests. (Vinay Thota via amarrk) - MAPREDUCE-3840. JobEndNotifier doesn't use the proxyToUse during connecting - (Ravi Prakash via bobby) + MAPREDUCE-3840. JobEndNotifier doesn't use the proxyToUse during connecting + (Ravi Prakash via bobby) + MAPREDUCE-2733. [Gridmix] Gridmix3 cpu emulation system tests. (Vinay Thota via amarrk) @@ -236,10 +248,6 @@ Release 0.23.1 - 2012-02-08 acmurthy) BUG FIXES - MAPREDUCE-3770. Zombie.getJobConf() results into NPE. (amarrk) - - MAPREDUCE-3804. yarn webapp interface vulnerable to cross scripting attacks - (Dave Thompson via bobby) MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and ResourceUsageMatcher. (amarrk) @@ -719,6 +727,9 @@ Release 0.23.1 - 2012-02-08 MAPREDUCE-3808. Fixed an NPE in FileOutputCommitter for jobs with maps but no reduces. (Robert Joseph Evans via vinodkv) + MAPREDUCE-3804. yarn webapp interface vulnerable to cross scripting attacks + (Dave Thompson via bobby) + MAPREDUCE-3354. Changed scripts so that jobhistory server is started by bin/mapred instead of bin/yarn. (Jonathan Eagles via acmurthy) @@ -731,7 +742,6 @@ Release 0.23.1 - 2012-02-08 MAPREDUCE-3697. Support binary compatibility for Counters after MAPREDUCE-901. (mahadev via acmurthy) - MAPREDUCE-3817. Fixed bin/mapred to allow running of distcp and archive jobs. (Arpit Gupta via acmurthy) @@ -768,6 +778,17 @@ Release 0.23.1 - 2012-02-08 MAPREDUCE-3828. Ensure that urls in single-node mode are correct. (sseth via acmurthy) + MAPREDUCE-3770. Zombie.getJobConf() results into NPE. (amarrk) + + MAPREDUCE-3843. Job summary log file found missing on the RM host + (Anupam Seth via tgraves) + + MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then + the recovery. (vinodkv) + + MAPREDUCE-3802. Added test to validate that AM can crash multiple times and + still can recover successfully after MAPREDUCE-3846. (vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES @@ -2658,6 +2679,9 @@ Release 0.22.1 - Unreleased BUG FIXES + MAPREDUCE-3837. Job tracker is not able to recover jobs after crash. + (Mayank Bansal via shv) + Release 0.22.0 - 2011-11-29 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh b/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh index 6fc3ee7e863..2272ae9564d 100644 --- a/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh +++ b/hadoop-mapreduce-project/bin/mr-jobhistory-daemon.sh @@ -20,6 +20,9 @@ # # Environment Variables # +# HADOOP_LOGFILE Hadoop log file. +# HADOOP_ROOT_LOGGER Hadoop root logger. +# HADOOP_JHS_LOGGER Hadoop JobSummary logger. # YARN_CONF_DIR Alternate conf dir. Default is ${YARN_HOME}/conf. # YARN_LOG_DIR Where log files are stored. PWD by default. # YARN_MASTER host:path where hadoop code should be rsync'd from @@ -86,8 +89,9 @@ if [ "$YARN_PID_DIR" = "" ]; then fi # some variables -export YARN_LOGFILE=yarn-$YARN_IDENT_STRING-$command-$HOSTNAME.log -export YARN_ROOT_LOGGER=${YARN_ROOT_LOGGER:-INFO,DRFA} +export HADOOP_LOGFILE=yarn-$YARN_IDENT_STRING-$command-$HOSTNAME.log +export HADOOP_ROOT_LOGGER=${HADOOP_ROOT_LOGGER:-INFO,DRFA} +export HADOOP_JHS_LOGGER=${HADOOP_JHS_LOGGER:-INFO,JSA} log=$YARN_LOG_DIR/yarn-$YARN_IDENT_STRING-$command-$HOSTNAME.out pid=$YARN_PID_DIR/yarn-$YARN_IDENT_STRING-$command.pid diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 7e8c3163bc3..212c86cf77e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -244,7 +244,7 @@ public class JobHistoryEventHandler extends AbstractService while (!stopped && !Thread.currentThread().isInterrupted()) { // Log the size of the history-event-queue every so often. - if (eventCounter % 1000 == 0) { + if (eventCounter != 0 && eventCounter % 1000 == 0) { eventCounter = 0; LOG.info("Size of the JobHistory event queue is " + eventQueue.size()); @@ -464,8 +464,10 @@ public class JobHistoryEventHandler extends AbstractService } processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); - LOG.info("In HistoryEventHandler " - + event.getHistoryEvent().getEventType()); + if (LOG.isDebugEnabled()) { + LOG.debug("In HistoryEventHandler " + + event.getHistoryEvent().getEventType()); + } } catch (IOException e) { LOG.error("Error writing History Event: " + event.getHistoryEvent(), e); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 6097e377d18..6c45574b7dc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -26,7 +26,6 @@ import java.security.PrivilegedExceptionAction; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; @@ -123,7 +123,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; * The information is shared across different components using AppContext. */ -@SuppressWarnings("deprecation") +@SuppressWarnings("rawtypes") public class MRAppMaster extends CompositeService { private static final Log LOG = LogFactory.getLog(MRAppMaster.class); @@ -138,7 +138,7 @@ public class MRAppMaster extends CompositeService { private final int nmPort; private final int nmHttpPort; protected final MRAppMetrics metrics; - private Set completedTasksFromPreviousRun; + private Map completedTasksFromPreviousRun; private List amInfos; private AppContext context; private Dispatcher dispatcher; @@ -596,7 +596,7 @@ public class MRAppMaster extends CompositeService { return dispatcher; } - public Set getCompletedTaskFromPreviousRun() { + public Map getCompletedTaskFromPreviousRun() { return completedTasksFromPreviousRun; } @@ -737,7 +737,6 @@ public class MRAppMaster extends CompositeService { return jobs; } - @SuppressWarnings("rawtypes") @Override public EventHandler getEventHandler() { return dispatcher.getEventHandler(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index e647dc31c97..cd357a23da1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; @@ -133,7 +134,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private float cleanupWeight = 0.05f; private float mapWeight = 0.0f; private float reduceWeight = 0.0f; - private final Set completedTasksFromPreviousRun; + private final Map completedTasksFromPreviousRun; private final List amInfos; private final Lock readLock; private final Lock writeLock; @@ -376,7 +377,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, TaskAttemptListener taskAttemptListener, JobTokenSecretManager jobTokenSecretManager, Credentials fsTokenCredentials, Clock clock, - Set completedTasksFromPreviousRun, MRAppMetrics metrics, + Map completedTasksFromPreviousRun, MRAppMetrics metrics, OutputCommitter committer, boolean newApiCommitter, String userName, long appSubmitTime, List amInfos) { this.applicationAttemptId = applicationAttemptId; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java index 5bf3d94c877..5b0901eba2d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java @@ -19,13 +19,14 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import java.util.Collection; -import java.util.Set; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -38,7 +39,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.EventHandler; -@SuppressWarnings({ "rawtypes", "deprecation" }) +@SuppressWarnings({ "rawtypes" }) public class MapTaskImpl extends TaskImpl { private final TaskSplitMetaInfo taskSplitMetaInfo; @@ -49,7 +50,7 @@ public class MapTaskImpl extends TaskImpl { TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, committer, jobToken, fsTokens, clock, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java index a2f386aaab0..4258fdfbc31 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java @@ -19,13 +19,14 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import java.util.Collection; -import java.util.Set; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.ReduceTaskAttemptImpl; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; @@ -37,7 +38,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.EventHandler; -@SuppressWarnings({ "rawtypes", "deprecation" }) +@SuppressWarnings({ "rawtypes" }) public class ReduceTaskImpl extends TaskImpl { private final int numMapTasks; @@ -47,7 +48,7 @@ public class ReduceTaskImpl extends TaskImpl { int numMapTasks, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf, taskAttemptListener, committer, jobToken, fsTokens, clock, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 9dc135dc1be..e472e99cd21 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -18,13 +18,14 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -35,8 +36,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; @@ -66,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; @@ -208,8 +213,23 @@ public abstract class TaskImpl implements Task, EventHandler { private final StateMachine stateMachine; - - protected int nextAttemptNumber; + + // By default, the next TaskAttempt number is zero. Changes during recovery + protected int nextAttemptNumber = 0; + private List taskAttemptsFromPreviousGeneration = + new ArrayList(); + + private static final class RecoverdAttemptsComparator implements + Comparator { + @Override + public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) { + long diff = attempt1.getStartTime() - attempt2.getStartTime(); + return diff == 0 ? 0 : (diff < 0 ? -1 : 1); + } + } + + private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR = + new RecoverdAttemptsComparator(); //should be set to one which comes first //saying COMMIT_PENDING @@ -230,7 +250,7 @@ public abstract class TaskImpl implements Task, EventHandler { TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { this.conf = conf; this.clock = clock; @@ -243,10 +263,7 @@ public abstract class TaskImpl implements Task, EventHandler { // have a convention that none of the overrides depends on any // fields that need initialization. maxAttempts = getMaxAttempts(); - taskId = recordFactory.newRecordInstance(TaskId.class); - taskId.setJobId(jobId); - taskId.setId(partition); - taskId.setTaskType(taskType); + taskId = MRBuilderUtils.newTaskId(jobId, partition, taskType); this.partition = partition; this.taskAttemptListener = taskAttemptListener; this.eventHandler = eventHandler; @@ -255,17 +272,37 @@ public abstract class TaskImpl implements Task, EventHandler { this.jobToken = jobToken; this.metrics = metrics; + // See if this is from a previous generation. if (completedTasksFromPreviousRun != null - && completedTasksFromPreviousRun.contains(taskId)) { + && completedTasksFromPreviousRun.containsKey(taskId)) { + // This task has TaskAttempts from previous generation. We have to replay + // them. LOG.info("Task is from previous run " + taskId); - startCount = startCount - 1; + TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId); + Map allAttempts = + taskInfo.getAllTaskAttempts(); + taskAttemptsFromPreviousGeneration = new ArrayList(); + taskAttemptsFromPreviousGeneration.addAll(allAttempts.values()); + Collections.sort(taskAttemptsFromPreviousGeneration, + RECOVERED_ATTEMPTS_COMPARATOR); } - //attempt ids are generated based on MR app startCount so that attempts - //from previous lives don't overstep the current one. - //this assumes that a task won't have more than 1000 attempts in its single - //life - nextAttemptNumber = (startCount - 1) * 1000; + if (taskAttemptsFromPreviousGeneration.isEmpty()) { + // All the previous attempts are exhausted, now start with a new + // generation. + + // All the new TaskAttemptIDs are generated based on MR + // ApplicationAttemptID so that attempts from previous lives don't + // over-step the current one. This assumes that a task won't have more + // than 1000 attempts in its single generation, which is very reasonable. + // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts + // and requires serious medical attention. + nextAttemptNumber = (startCount - 1) * 1000; + } else { + // There are still some TaskAttempts from previous generation, use them + nextAttemptNumber = + taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId(); + } // This "this leak" is okay because the retained pointer is in an // instance variable. @@ -390,17 +427,23 @@ public abstract class TaskImpl implements Task, EventHandler { //this is always called in read/write lock private long getLaunchTime() { - long launchTime = 0; + long taskLaunchTime = 0; + boolean launchTimeSet = false; for (TaskAttempt at : attempts.values()) { - //select the least launch time of all attempts - if (launchTime == 0 || launchTime > at.getLaunchTime()) { - launchTime = at.getLaunchTime(); + // select the least launch time of all attempts + long attemptLaunchTime = at.getLaunchTime(); + if (attemptLaunchTime != 0 && !launchTimeSet) { + // For the first non-zero launch time + launchTimeSet = true; + taskLaunchTime = attemptLaunchTime; + } else if (attemptLaunchTime != 0 && taskLaunchTime > attemptLaunchTime) { + taskLaunchTime = attemptLaunchTime; } } - if (launchTime == 0) { + if (!launchTimeSet) { return this.scheduledTime; } - return launchTime; + return taskLaunchTime; } //this is always called in read/write lock @@ -525,7 +568,16 @@ public abstract class TaskImpl implements Task, EventHandler { attempts.put(attempt.getID(), attempt); break; } - ++nextAttemptNumber; + + // Update nextATtemptNumber + if (taskAttemptsFromPreviousGeneration.isEmpty()) { + ++nextAttemptNumber; + } else { + // There are still some TaskAttempts from previous generation, use them + nextAttemptNumber = + taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId(); + } + ++numberUncompletedAttempts; //schedule the nextAttemptNumber if (failedAttempts > 0) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java index 95c4919d224..c7134a46bd7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java @@ -19,8 +19,9 @@ package org.apache.hadoop.mapreduce.v2.app.recover; import java.util.List; -import java.util.Set; +import java.util.Map; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.yarn.Clock; @@ -32,7 +33,7 @@ public interface Recovery { Clock getClock(); - Set getCompletedTasks(); + Map getCompletedTasks(); List getAMInfos(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index e6831f83557..3bf6e075849 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; @@ -153,8 +153,8 @@ public class RecoveryService extends CompositeService implements Recovery { } @Override - public Set getCompletedTasks() { - return completedTasks.keySet(); + public Map getCompletedTasks() { + return completedTasks; } @Override @@ -189,7 +189,8 @@ public class RecoveryService extends CompositeService implements Recovery { getConfig()); //read the previous history file historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile( - histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1))); + histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1))); + LOG.info("History file is at " + historyFile); in = fc.open(historyFile); JobHistoryParser parser = new JobHistoryParser(in); jobInfo = parser.parse(); @@ -242,7 +243,7 @@ public class RecoveryService extends CompositeService implements Recovery { if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) { TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) .getTaskAttemptID()); - LOG.info("Attempt start time " + attInfo.getStartTime()); + LOG.info("Recovered Attempt start time " + attInfo.getStartTime()); clock.setTime(attInfo.getStartTime()); } else if (event.getType() == TaskAttemptEventType.TA_DONE @@ -250,7 +251,7 @@ public class RecoveryService extends CompositeService implements Recovery { || event.getType() == TaskAttemptEventType.TA_KILL) { TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) .getTaskAttemptID()); - LOG.info("Attempt finish time " + attInfo.getFinishTime()); + LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime()); clock.setTime(attInfo.getFinishTime()); } @@ -380,17 +381,17 @@ public class RecoveryService extends CompositeService implements Recovery { } // send the done event - LOG.info("Sending done event to " + aId); + LOG.info("Sending done event to recovered attempt " + aId); actualHandler.handle(new TaskAttemptEvent(aId, TaskAttemptEventType.TA_DONE)); break; case KILLED: - LOG.info("Sending kill event to " + aId); + LOG.info("Sending kill event to recovered attempt " + aId); actualHandler.handle(new TaskAttemptEvent(aId, TaskAttemptEventType.TA_KILL)); break; default: - LOG.info("Sending fail event to " + aId); + LOG.info("Sending fail event to recovered attempt " + aId); actualHandler.handle(new TaskAttemptEvent(aId, TaskAttemptEventType.TA_FAILMSG)); break; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 376227b51ef..509c49e9cc0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.job.Job; @@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.util.ReflectionUtils; @@ -74,7 +76,14 @@ public class TestRecovery { private Text val1 = new Text("val1"); private Text val2 = new Text("val2"); - + /** + * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt + * completely disappears because of failed launch, one attempt gets killed and + * one attempt succeeds. AM crashes after the first tasks finishes and + * recovers completely and succeeds in the second generation. + * + * @throws Exception + */ @Test public void testCrashed() throws Exception { @@ -112,7 +121,8 @@ public class TestRecovery { // reduces must be in NEW state Assert.assertEquals("Reduce Task state not correct", TaskState.RUNNING, reduceTask.getReport().getTaskState()); - + + /////////// Play some games with the TaskAttempts of the first task ////// //send the fail signal to the 1st map task attempt app.getContext().getEventHandler().handle( new TaskAttemptEvent( @@ -120,29 +130,31 @@ public class TestRecovery { TaskAttemptEventType.TA_FAILMSG)); app.waitForState(task1Attempt1, TaskAttemptState.FAILED); - - while (mapTask1.getAttempts().size() != 2) { + + int timeOut = 0; + while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) { Thread.sleep(2000); LOG.info("Waiting for next attempt to start"); } + Assert.assertEquals(2, mapTask1.getAttempts().size()); Iterator itr = mapTask1.getAttempts().values().iterator(); itr.next(); TaskAttempt task1Attempt2 = itr.next(); - app.waitForState(task1Attempt2, TaskAttemptState.RUNNING); - - //send the kill signal to the 1st map 2nd attempt + // This attempt will automatically fail because of the way ContainerLauncher + // is setup + // This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846 app.getContext().getEventHandler().handle( - new TaskAttemptEvent( - task1Attempt2.getID(), - TaskAttemptEventType.TA_KILL)); - - app.waitForState(task1Attempt2, TaskAttemptState.KILLED); - - while (mapTask1.getAttempts().size() != 3) { + new TaskAttemptEvent(task1Attempt2.getID(), + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); + app.waitForState(task1Attempt2, TaskAttemptState.FAILED); + + timeOut = 0; + while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) { Thread.sleep(2000); LOG.info("Waiting for next attempt to start"); } + Assert.assertEquals(3, mapTask1.getAttempts().size()); itr = mapTask1.getAttempts().values().iterator(); itr.next(); itr.next(); @@ -150,12 +162,36 @@ public class TestRecovery { app.waitForState(task1Attempt3, TaskAttemptState.RUNNING); - //send the done signal to the 1st map 3rd attempt + //send the kill signal to the 1st map 3rd attempt app.getContext().getEventHandler().handle( new TaskAttemptEvent( task1Attempt3.getID(), + TaskAttemptEventType.TA_KILL)); + + app.waitForState(task1Attempt3, TaskAttemptState.KILLED); + + timeOut = 0; + while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) { + Thread.sleep(2000); + LOG.info("Waiting for next attempt to start"); + } + Assert.assertEquals(4, mapTask1.getAttempts().size()); + itr = mapTask1.getAttempts().values().iterator(); + itr.next(); + itr.next(); + itr.next(); + TaskAttempt task1Attempt4 = itr.next(); + + app.waitForState(task1Attempt4, TaskAttemptState.RUNNING); + + //send the done signal to the 1st map 4th attempt + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt4.getID(), TaskAttemptEventType.TA_DONE)); + /////////// End of games with the TaskAttempts of the first task ////// + //wait for first map task to complete app.waitForState(mapTask1, TaskState.SUCCEEDED); long task1StartTime = mapTask1.getReport().getStartTime(); @@ -241,6 +277,136 @@ public class TestRecovery { // available in the failed attempt should be available here } + @Test + public void testMultipleCrashes() throws Exception { + + int runCount = 0; + MRApp app = + new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, + ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task reduceTask = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + + TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next(); + TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next(); + + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task1Attempt1, TaskAttemptState.RUNNING); + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + // reduces must be in NEW state + Assert.assertEquals("Reduce Task state not correct", + TaskState.RUNNING, reduceTask.getReport().getTaskState()); + + //send the done signal to the 1st map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for first map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Crash the app + app.stop(); + + //rerun + //in rerun the 1st map will be recovered from previous run + app = + new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + reduceTask = it.next(); + + // first map will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + app.waitForState(mapTask2, TaskState.RUNNING); + + task2Attempt = mapTask2.getAttempts().values().iterator().next(); + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + //send the done signal to the 2nd map task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapTask2.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + //wait to get it completed + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + // Crash the app again. + app.stop(); + + //rerun + //in rerun the 1st and 2nd map will be recovered from previous run + app = + new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + reduceTask = it.next(); + + // The maps will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + //wait for reduce to be running before sending done + app.waitForState(reduceTask, TaskState.RUNNING); + //send the done signal to the reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduceTask.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + } + @Test public void testOutputRecovery() throws Exception { int runCount = 0; @@ -552,7 +718,7 @@ public class TestRecovery { } - class MRAppWithHistory extends MRApp { + static class MRAppWithHistory extends MRApp { public MRAppWithHistory(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) { super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); @@ -567,7 +733,17 @@ public class TestRecovery { @Override protected ContainerLauncher createContainerLauncher(AppContext context) { - MockContainerLauncher launcher = new MockContainerLauncher(); + MockContainerLauncher launcher = new MockContainerLauncher() { + @Override + public void handle(ContainerLauncherEvent event) { + TaskAttemptId taskAttemptID = event.getTaskAttemptID(); + // Pass everything except the 2nd attempt of the first task. + if (taskAttemptID.getId() != 1 + || taskAttemptID.getTaskId().getId() != 0) { + super.handle(event); + } + } + }; launcher.shufflePort = 5467; return launcher; } @@ -581,7 +757,7 @@ public class TestRecovery { } } - class RecoveryServiceWithCustomDispatcher extends RecoveryService { + static class RecoveryServiceWithCustomDispatcher extends RecoveryService { public RecoveryServiceWithCustomDispatcher( ApplicationAttemptId applicationAttemptId, Clock clock, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index 4dcb96a561c..dcc9b07cc38 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -25,7 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Set; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -72,7 +73,7 @@ public class TestTaskImpl { private Path remoteJobConfFile; private Collection> fsTokens; private Clock clock; - private Set completedTasksFromPreviousRun; + private Map completedTasksFromPreviousRun; private MRAppMetrics metrics; private TaskImpl mockTask; private ApplicationId appId; @@ -96,7 +97,7 @@ public class TestTaskImpl { TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { super(jobId, taskType , partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, committer, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index ebdb4160ee8..ddabb4c52f6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -@SuppressWarnings("deprecation") public class TypeConverter { private static RecordFactory recordFactory; @@ -116,8 +115,8 @@ public class TypeConverter { } public static org.apache.hadoop.mapred.TaskID fromYarn(TaskId id) { - return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()), fromYarn(id.getTaskType()), - id.getId()); + return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()), + fromYarn(id.getTaskType()), id.getId()); } public static TaskId toYarn(org.apache.hadoop.mapreduce.TaskID id) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 3d904f3a8c2..6fcf1fedd9d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -148,7 +148,12 @@ public class FifoScheduler implements ResourceScheduler { QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName()); queueInfo.setCapacity(1.0f); - queueInfo.setCurrentCapacity((float)usedResource.getMemory() / clusterResource.getMemory()); + if (clusterResource.getMemory() == 0) { + queueInfo.setCurrentCapacity(0.0f); + } else { + queueInfo.setCurrentCapacity((float) usedResource.getMemory() + / clusterResource.getMemory()); + } queueInfo.setMaximumCapacity(1.0f); queueInfo.setChildQueues(new ArrayList()); queueInfo.setQueueState(QueueState.RUNNING); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index a50d4c21700..88e6e63e17b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.Application; @@ -74,6 +75,13 @@ public class TestFifoScheduler { .getRMContext()); } + @Test + public void testFifoSchedulerCapacityWhenNoNMs() { + FifoScheduler scheduler = new FifoScheduler(); + QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); + Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity()); + } + @Test public void testAppAttemptMetrics() throws Exception { AsyncDispatcher dispatcher = new InlineDispatcher(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java index fc73c0251a4..bf0df460ac9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java @@ -539,7 +539,7 @@ public class TestRMWebServices extends JerseyTest { assertEquals("type doesn't match", "fifoScheduler", type); assertEquals("qstate doesn't match", QueueState.RUNNING.toString(), state); assertEquals("capacity doesn't match", 1.0, capacity, 0.0); - assertEquals("usedCapacity doesn't match", Float.NaN, usedCapacity, 0.0); + assertEquals("usedCapacity doesn't match", 0.0, usedCapacity, 0.0); assertEquals("minQueueMemoryCapacity doesn't match", 1024, minQueueCapacity); assertEquals("maxQueueMemoryCapacity doesn't match", 10240, maxQueueCapacity); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm index 67db4b13aea..eca68a234bf 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm @@ -437,32 +437,32 @@ Hadoop MapReduce Next Generation - Cluster Setup Format a new distributed filesystem: ---- - $ $HADOOP_PREFIX_HOME/bin/hdfs namenode -format + $ $HADOOP_PREFIX/bin/hdfs namenode -format ---- Start the HDFS with the following command, run on the designated NameNode: ---- - $ $HADOOP_PREFIX_HOME/bin/hdfs start namenode --config $HADOOP_CONF_DIR + $ $HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode ---- Run a script to start DataNodes on all slaves: ---- - $ $HADOOP_PREFIX_HOME/bin/hdfs start datanode --config $HADOOP_CONF_DIR + $ $HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start datanode ---- Start the YARN with the following command, run on the designated ResourceManager: ---- - $ $YARN_HOME/bin/yarn start resourcemanager --config $HADOOP_CONF_DIR + $ $YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR start resourcemanager ---- Run a script to start NodeManagers on all slaves: ---- - $ $YARN_HOME/bin/yarn start nodemanager --config $HADOOP_CONF_DIR + $ $YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR start nodemanager ---- Start a standalone WebAppProxy server. If multiple servers @@ -476,7 +476,7 @@ Hadoop MapReduce Next Generation - Cluster Setup designated server: ---- - $ $YARN_HOME/bin/mapred start historyserver --config $YARN_CONF_DIR + $ $HADOOP_PREFIX/sbin/mr-jobhistory-daemon.sh start historyserver --config $HADOOP_CONF_DIR ---- * Hadoop Shutdown @@ -485,26 +485,26 @@ Hadoop MapReduce Next Generation - Cluster Setup NameNode: ---- - $ $HADOOP_PREFIX_HOME/bin/hdfs stop namenode --config $HADOOP_CONF_DIR + $ $HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs stop namenode ---- Run a script to stop DataNodes on all slaves: ---- - $ $HADOOP_PREFIX_HOME/bin/hdfs stop datanode --config $HADOOP_CONF_DIR + $ $HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs stop datanode ---- Stop the ResourceManager with the following command, run on the designated ResourceManager: ---- - $ $YARN_HOME/bin/yarn stop resourcemanager --config $HADOOP_CONF_DIR + $ $YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR stop resourcemanager ---- Run a script to stop NodeManagers on all slaves: ---- - $ $YARN_HOME/bin/yarn stop nodemanager --config $HADOOP_CONF_DIR + $ $YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR stop nodemanager ---- Stop the WebAppProxy server. If multiple servers are used with load @@ -519,7 +519,7 @@ Hadoop MapReduce Next Generation - Cluster Setup designated server: ---- - $ $YARN_HOME/bin/mapred stop historyserver --config $YARN_CONF_DIR + $ $HADOOP_PREFIX/sbin/mr-jobhistory-daemon.sh stop historyserver --config $HADOOP_CONF_DIR ---- @@ -978,34 +978,34 @@ KVNO Timestamp Principal Format a new distributed filesystem as : ---- -[hdfs]$ $HADOOP_PREFIX_HOME/bin/hdfs namenode -format +[hdfs]$ $HADOOP_PREFIX/bin/hdfs namenode -format ---- Start the HDFS with the following command, run on the designated NameNode as : ---- -[hdfs]$ $HADOOP_PREFIX_HOME/bin/hdfs start namenode --config $HADOOP_CONF_DIR +[hdfs]$ $HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode ---- Run a script to start DataNodes on all slaves as with a special environment variable <<>> set to : ---- -[root]$ HADOOP_SECURE_DN_USER=hdfs $HADOOP_PREFIX_HOME/bin/hdfs start datanode --config $HADOOP_CONF_DIR +[root]$ HADOOP_SECURE_DN_USER=hdfs $HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start datanode ---- Start the YARN with the following command, run on the designated ResourceManager as : ---- -[yarn]$ $YARN_HOME/bin/yarn start resourcemanager --config $HADOOP_CONF_DIR +[yarn]$ $YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR start resourcemanager ---- Run a script to start NodeManagers on all slaves as : ---- -[yarn]$ $YARN_HOME/bin/yarn start nodemanager --config $HADOOP_CONF_DIR +[yarn]$ $YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR start nodemanager ---- Start a standalone WebAppProxy server. Run on the WebAppProxy @@ -1020,7 +1020,7 @@ KVNO Timestamp Principal designated server as : ---- -[mapred]$ $YARN_HOME/bin/mapred start historyserver --config $YARN_CONF_DIR +[mapred]$ $HADOOP_PREFIX/sbin/mr-jobhistory-daemon.sh start historyserver --config $HADOOP_CONF_DIR ---- * Hadoop Shutdown @@ -1029,26 +1029,26 @@ KVNO Timestamp Principal as : ---- -[hdfs]$ $HADOOP_PREFIX_HOME/bin/hdfs stop namenode --config $HADOOP_CONF_DIR +[hdfs]$ $HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs stop namenode ---- Run a script to stop DataNodes on all slaves as : ---- -[root]$ $HADOOP_PREFIX_HOME/bin/hdfs stop datanode --config $HADOOP_CONF_DIR +[root]$ $HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs stop datanode ---- Stop the ResourceManager with the following command, run on the designated ResourceManager as : ---- -[yarn]$ $YARN_HOME/bin/yarn stop resourcemanager --config $HADOOP_CONF_DIR +[yarn]$ $YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR stop resourcemanager ---- Run a script to stop NodeManagers on all slaves as : ---- -[yarn]$ $YARN_HOME/bin/yarn stop nodemanager --config $HADOOP_CONF_DIR +[yarn]$ $YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR stop nodemanager ---- Stop the WebAppProxy server. Run on the WebAppProxy server as @@ -1063,7 +1063,7 @@ KVNO Timestamp Principal designated server as : ---- -[mapred]$ $YARN_HOME/bin/mapred stop historyserver --config $YARN_CONF_DIR +[mapred]$ $HADOOP_PREFIX/sbin/mr-jobhistory-daemon.sh stop historyserver --config $HADOOP_CONF_DIR ---- * {Web Interfaces} diff --git a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java index 4207ce70b7d..c578635a0d9 100644 --- a/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java +++ b/hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTracker.java @@ -1192,13 +1192,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, try { Path jobInfoFile = getSystemFileForJob(jobId); FSDataInputStream in = fs.open(jobInfoFile); - JobInfo token = new JobInfo(); + final JobInfo token = new JobInfo(); token.readFields(in); in.close(); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(token.getUser().toString()); - submitJob(token.getJobID(), restartCount, - ugi, token.getJobSubmitDir().toString(), true, null); + final UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(token.getUser().toString()); + ugi.doAs(new PrivilegedExceptionAction() { + public JobStatus run() throws IOException ,InterruptedException{ + return submitJob(token.getJobID(), restartCount, + ugi, token.getJobSubmitDir().toString(), true, null); + }}); + recovered++; } catch (Exception e) { LOG.warn("Could not recover job " + jobId, e); diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index a4bd4ce30b3..6c242a09d3c 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -733,6 +733,25 @@ + + org.apache.maven.plugins + maven-antrun-plugin + + + create-testdirs + validate + + run + + + + + + + + + + org.apache.maven.plugins maven-enforcer-plugin diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml index ce5a4e7320a..27f9b7b9834 100644 --- a/hadoop-project/src/site/site.xml +++ b/hadoop-project/src/site/site.xml @@ -55,6 +55,7 @@ +