HDFS-6160: Merging 1586007 from trunk to branch-2.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1586008 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f10bf59e6e
commit
0761ef1951
|
@ -74,6 +74,8 @@ Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-6169. Move the address in WebImageViewer. (Akira Ajisaka via wheat9)
|
HDFS-6169. Move the address in WebImageViewer. (Akira Ajisaka via wheat9)
|
||||||
|
|
||||||
|
HDFS-6160. TestSafeMode occasionally fails. (Arpit Agarwal)
|
||||||
|
|
||||||
Release 2.4.1 - UNRELEASED
|
Release 2.4.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -1017,6 +1017,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
for(StorageBlockReport r : reports) {
|
for(StorageBlockReport r : reports) {
|
||||||
final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
|
final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
|
||||||
hasStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
|
hasStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
|
||||||
|
metrics.incrStorageBlockReportOps();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nn.getFSImage().isUpgradeFinalized() &&
|
if (nn.getFSImage().isUpgradeFinalized() &&
|
||||||
|
|
|
@ -73,6 +73,8 @@ public class NameNodeMetrics {
|
||||||
MutableCounterLong snapshotDiffReportOps;
|
MutableCounterLong snapshotDiffReportOps;
|
||||||
@Metric("Number of blockReceivedAndDeleted calls")
|
@Metric("Number of blockReceivedAndDeleted calls")
|
||||||
MutableCounterLong blockReceivedAndDeletedOps;
|
MutableCounterLong blockReceivedAndDeletedOps;
|
||||||
|
@Metric("Number of blockReports from individual storages")
|
||||||
|
MutableCounterLong storageBlockReportOps;
|
||||||
|
|
||||||
@Metric("Journal transactions") MutableRate transactions;
|
@Metric("Journal transactions") MutableRate transactions;
|
||||||
@Metric("Journal syncs") MutableRate syncs;
|
@Metric("Journal syncs") MutableRate syncs;
|
||||||
|
@ -222,6 +224,10 @@ public class NameNodeMetrics {
|
||||||
blockReceivedAndDeletedOps.incr();
|
blockReceivedAndDeletedOps.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incrStorageBlockReportOps() {
|
||||||
|
storageBlockReportOps.incr();
|
||||||
|
}
|
||||||
|
|
||||||
public void addTransaction(long latency) {
|
public void addTransaction(long latency) {
|
||||||
transactions.add(latency);
|
transactions.add(latency);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -26,6 +28,8 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -55,12 +59,14 @@ import com.google.common.collect.Lists;
|
||||||
* Tests to verify safe mode correctness.
|
* Tests to verify safe mode correctness.
|
||||||
*/
|
*/
|
||||||
public class TestSafeMode {
|
public class TestSafeMode {
|
||||||
|
public static final Log LOG = LogFactory.getLog(TestSafeMode.class);
|
||||||
private static final Path TEST_PATH = new Path("/test");
|
private static final Path TEST_PATH = new Path("/test");
|
||||||
private static final int BLOCK_SIZE = 1024;
|
private static final int BLOCK_SIZE = 1024;
|
||||||
Configuration conf;
|
Configuration conf;
|
||||||
MiniDFSCluster cluster;
|
MiniDFSCluster cluster;
|
||||||
FileSystem fs;
|
FileSystem fs;
|
||||||
DistributedFileSystem dfs;
|
DistributedFileSystem dfs;
|
||||||
|
private static final String NN_METRICS = "NameNodeActivity";
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void startUp() throws IOException {
|
public void startUp() throws IOException {
|
||||||
|
@ -158,6 +164,7 @@ public class TestSafeMode {
|
||||||
*/
|
*/
|
||||||
@Test(timeout=45000)
|
@Test(timeout=45000)
|
||||||
public void testInitializeReplQueuesEarly() throws Exception {
|
public void testInitializeReplQueuesEarly() throws Exception {
|
||||||
|
LOG.info("Starting testInitializeReplQueuesEarly");
|
||||||
// Spray the blocks around the cluster when we add DNs instead of
|
// Spray the blocks around the cluster when we add DNs instead of
|
||||||
// concentrating all blocks on the first node.
|
// concentrating all blocks on the first node.
|
||||||
BlockManagerTestUtil.setWritingPrefersLocalNode(
|
BlockManagerTestUtil.setWritingPrefersLocalNode(
|
||||||
|
@ -165,9 +172,11 @@ public class TestSafeMode {
|
||||||
|
|
||||||
cluster.startDataNodes(conf, 2, true, StartupOption.REGULAR, null);
|
cluster.startDataNodes(conf, 2, true, StartupOption.REGULAR, null);
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
|
LOG.info("Creating files");
|
||||||
DFSTestUtil.createFile(fs, TEST_PATH, 15*BLOCK_SIZE, (short)1, 1L);
|
DFSTestUtil.createFile(fs, TEST_PATH, 15*BLOCK_SIZE, (short)1, 1L);
|
||||||
|
|
||||||
|
LOG.info("Stopping all DataNodes");
|
||||||
List<DataNodeProperties> dnprops = Lists.newLinkedList();
|
List<DataNodeProperties> dnprops = Lists.newLinkedList();
|
||||||
dnprops.add(cluster.stopDataNode(0));
|
dnprops.add(cluster.stopDataNode(0));
|
||||||
dnprops.add(cluster.stopDataNode(0));
|
dnprops.add(cluster.stopDataNode(0));
|
||||||
|
@ -176,6 +185,7 @@ public class TestSafeMode {
|
||||||
cluster.getConfiguration(0).setFloat(
|
cluster.getConfiguration(0).setFloat(
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY, 1f/15f);
|
DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY, 1f/15f);
|
||||||
|
|
||||||
|
LOG.info("Restarting NameNode");
|
||||||
cluster.restartNameNode();
|
cluster.restartNameNode();
|
||||||
final NameNode nn = cluster.getNameNode();
|
final NameNode nn = cluster.getNameNode();
|
||||||
|
|
||||||
|
@ -189,27 +199,37 @@ public class TestSafeMode {
|
||||||
"until threshold is crossed",
|
"until threshold is crossed",
|
||||||
NameNodeAdapter.safeModeInitializedReplQueues(nn));
|
NameNodeAdapter.safeModeInitializedReplQueues(nn));
|
||||||
|
|
||||||
|
LOG.info("Restarting one DataNode");
|
||||||
cluster.restartDataNode(dnprops.remove(0));
|
cluster.restartDataNode(dnprops.remove(0));
|
||||||
|
|
||||||
// Wait for the block report from the restarted DN to come in.
|
// Wait for block reports from all attached storages of
|
||||||
|
// the restarted DN to come in.
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
public Boolean get() {
|
public Boolean get() {
|
||||||
return NameNodeAdapter.getSafeModeSafeBlocks(nn) > 0;
|
return getLongCounter("StorageBlockReportOps", getMetrics(NN_METRICS)) ==
|
||||||
|
MiniDFSCluster.DIRS_PER_DATANODE;
|
||||||
}
|
}
|
||||||
}, 10, 10000);
|
}, 10, 10000);
|
||||||
// SafeMode is fine-grain synchronized, so the processMisReplicatedBlocks
|
|
||||||
// call is still going on at this point - wait until it's done by grabbing
|
final int safe = NameNodeAdapter.getSafeModeSafeBlocks(nn);
|
||||||
// the lock.
|
assertTrue("Expected first block report to make some blocks safe.", safe > 0);
|
||||||
nn.getNamesystem().writeLock();
|
assertTrue("Did not expect first block report to make all blocks safe.", safe < 15);
|
||||||
nn.getNamesystem().writeUnlock();
|
|
||||||
int safe = NameNodeAdapter.getSafeModeSafeBlocks(nn);
|
|
||||||
assertTrue("Expected first block report to make some but not all blocks " +
|
|
||||||
"safe. Got: " + safe, safe >= 1 && safe < 15);
|
|
||||||
BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
|
|
||||||
|
|
||||||
assertTrue(NameNodeAdapter.safeModeInitializedReplQueues(nn));
|
assertTrue(NameNodeAdapter.safeModeInitializedReplQueues(nn));
|
||||||
assertEquals(15 - safe, nn.getNamesystem().getUnderReplicatedBlocks());
|
|
||||||
|
// Ensure that UnderReplicatedBlocks goes up to 15 - safe. Misreplicated
|
||||||
|
// blocks are processed asynchronously so this may take a few seconds.
|
||||||
|
// Failure here will manifest as a test timeout.
|
||||||
|
BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
|
||||||
|
long underReplicatedBlocks = nn.getNamesystem().getUnderReplicatedBlocks();
|
||||||
|
while (underReplicatedBlocks != (15 - safe)) {
|
||||||
|
LOG.info("UnderReplicatedBlocks expected=" + (15 - safe) +
|
||||||
|
", actual=" + underReplicatedBlocks);
|
||||||
|
Thread.sleep(100);
|
||||||
|
BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
|
||||||
|
underReplicatedBlocks = nn.getNamesystem().getUnderReplicatedBlocks();
|
||||||
|
}
|
||||||
|
|
||||||
cluster.restartDataNodes();
|
cluster.restartDataNodes();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue