diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2d7d1706e6b..77c5974ccf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2499,6 +2499,8 @@ Release 2.8.0 - UNRELEASED HDFS-9430. Remove waitForLoadingFSImage since checkNNStartup has ensured image loaded and namenode started. (Brahma Reddy Battula via mingma) + HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 929672629b4..80c6c456693 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -35,6 +35,11 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; @@ -100,6 +105,7 @@ import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.Time; @@ -201,6 +207,10 @@ public class BlockManager implements BlockStatsMXBean { /** Replication thread. */ final Daemon replicationThread = new Daemon(new ReplicationMonitor()); + /** Block report thread for handling async reports. */ + private final BlockReportProcessingThread blockReportThread = + new BlockReportProcessingThread(); + /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */ final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); @@ -506,6 +516,7 @@ public class BlockManager implements BlockStatsMXBean { datanodeManager.activate(conf); this.replicationThread.setName("ReplicationMonitor"); this.replicationThread.start(); + this.blockReportThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); } @@ -514,7 +525,9 @@ public class BlockManager implements BlockStatsMXBean { bmSafeMode.close(); try { replicationThread.interrupt(); + blockReportThread.interrupt(); replicationThread.join(3000); + blockReportThread.join(3000); } catch (InterruptedException ie) { } datanodeManager.close(); @@ -2042,7 +2055,7 @@ public class BlockManager implements BlockStatsMXBean { try { node = datanodeManager.getDatanode(nodeID); - if (node == null || !node.isAlive()) { + if (node == null || !node.isRegistered()) { throw new IOException( "ProcessReport from dead or unregistered node: " + nodeID); } @@ -3550,17 +3563,23 @@ public class BlockManager implements BlockStatsMXBean { public void processIncrementalBlockReport(final DatanodeID nodeID, final StorageReceivedDeletedBlocks srdb) throws IOException { assert namesystem.hasWriteLock(); - int received = 0; - int deleted = 0; - int receiving = 0; final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); - if (node == null || !node.isAlive()) { + if (node == null || !node.isRegistered()) { blockLog.warn("BLOCK* processIncrementalBlockReport" + " is received from dead or unregistered node {}", nodeID); throw new IOException( "Got incremental block report from unregistered or dead node"); } + try { + processIncrementalBlockReport(node, srdb); + } catch (Exception ex) { + node.setForceRegistration(true); + throw ex; + } + } + private void processIncrementalBlockReport(final DatanodeDescriptor node, + final StorageReceivedDeletedBlocks srdb) throws IOException { DatanodeStorageInfo storageInfo = node.getStorageInfo(srdb.getStorage().getStorageID()); if (storageInfo == null) { @@ -3572,6 +3591,10 @@ public class BlockManager implements BlockStatsMXBean { storageInfo = node.updateStorage(srdb.getStorage()); } + int received = 0; + int deleted = 0; + int receiving = 0; + for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) { switch (rdbi.getStatus()) { case DELETED_BLOCK: @@ -3589,17 +3612,17 @@ public class BlockManager implements BlockStatsMXBean { break; default: String msg = - "Unknown block status code reported by " + nodeID + + "Unknown block status code reported by " + node + ": " + rdbi; blockLog.warn(msg); assert false : msg; // if assertions are enabled, throw. break; } blockLog.debug("BLOCK* block {}: {} is received from {}", - rdbi.getStatus(), rdbi.getBlock(), nodeID); + rdbi.getStatus(), rdbi.getBlock(), node); } blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from " - + "{} receiving: {}, received: {}, deleted: {}", nodeID, receiving, + + "{} receiving: {}, received: {}, deleted: {}", node, receiving, received, deleted); } @@ -4266,4 +4289,119 @@ public class BlockManager implements BlockStatsMXBean { return false; } + // async processing of an action, used for IBRs. + public void enqueueBlockOp(final Runnable action) throws IOException { + try { + blockReportThread.enqueue(action); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + } + + // sync batch processing for a full BR. + public T runBlockOp(final Callable action) + throws IOException { + final FutureTask future = new FutureTask(action); + enqueueBlockOp(future); + try { + return future.get(); + } catch (ExecutionException ee) { + Throwable cause = ee.getCause(); + if (cause == null) { + cause = ee; + } + if (!(cause instanceof IOException)) { + cause = new IOException(cause); + } + throw (IOException)cause; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException(ie); + } + } + + @VisibleForTesting + public void flushBlockOps() throws IOException { + runBlockOp(new Callable(){ + @Override + public Void call() { + return null; + } + }); + } + + public int getBlockOpQueueLength() { + return blockReportThread.queue.size(); + } + + private class BlockReportProcessingThread extends Thread { + private static final long MAX_LOCK_HOLD_MS = 4; + private long lastFull = 0; + + private final BlockingQueue queue = + new ArrayBlockingQueue(1024); + + BlockReportProcessingThread() { + super("Block report processor"); + setDaemon(true); + } + + @Override + public void run() { + try { + processQueue(); + } catch (Throwable t) { + ExitUtil.terminate(1, + getName() + " encountered fatal exception: " + t); + } + } + + private void processQueue() { + while (namesystem.isRunning()) { + NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); + try { + Runnable action = queue.take(); + // batch as many operations in the write lock until the queue + // runs dry, or the max lock hold is reached. + int processed = 0; + namesystem.writeLock(); + metrics.setBlockOpsQueued(queue.size() + 1); + try { + long start = Time.monotonicNow(); + do { + processed++; + action.run(); + if (Time.monotonicNow() - start > MAX_LOCK_HOLD_MS) { + break; + } + action = queue.poll(); + } while (action != null); + } finally { + namesystem.writeUnlock(); + metrics.addBlockOpsBatched(processed - 1); + } + } catch (InterruptedException e) { + // ignore unless thread was specifically interrupted. + if (Thread.interrupted()) { + break; + } + } + } + queue.clear(); + } + + void enqueue(Runnable action) throws InterruptedException { + if (!queue.offer(action)) { + if (!isAlive() && namesystem.isRunning()) { + ExitUtil.terminate(1, getName()+" is not running"); + } + long now = Time.monotonicNow(); + if (now - lastFull > 4000) { + lastFull = now; + LOG.info("Block report queue is full"); + } + queue.put(action); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 6709390d008..fde0215a3ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -190,7 +190,8 @@ public class DatanodeDescriptor extends DatanodeInfo { // This is an optimization, because contains takes O(n) time on Arraylist private boolean isAlive = false; private boolean needKeyUpdate = false; - + private boolean forceRegistration = false; + // A system administrator can tune the balancer bandwidth parameter // (dfs.balance.bandwidthPerSec) dynamically by calling // "dfsadmin -setBalanacerBandwidth ", at which point the @@ -863,6 +864,7 @@ public class DatanodeDescriptor extends DatanodeInfo { storage.setBlockReportCount(0); } heartbeatedSinceRegistration = false; + forceRegistration = false; } /** @@ -949,6 +951,14 @@ public class DatanodeDescriptor extends DatanodeInfo { return false; } return true; - } + } + + public void setForceRegistration(boolean force) { + forceRegistration = force; + } + + public boolean isRegistered() { + return isAlive() && !forceRegistration; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index d535397de0a..cdd5b9e21e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1418,7 +1418,7 @@ public class DatanodeManager { throw new DisallowedDatanodeException(nodeinfo); } - if (nodeinfo == null || !nodeinfo.isAlive()) { + if (nodeinfo == null || !nodeinfo.isRegistered()) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index bb9a706d32d..1f1b24b12c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -152,7 +152,7 @@ public class DatanodeStorageInfo { this.state = s.getState(); } - int getBlockReportCount() { + public int getBlockReportCount() { return blockReportCount; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index 4fd9ca84fa5..b1f936bc4d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -934,7 +934,7 @@ public final class CacheManager { try { final DatanodeDescriptor datanode = blockManager.getDatanodeManager().getDatanode(datanodeID); - if (datanode == null || !datanode.isAlive()) { + if (datanode == null || !datanode.isRegistered()) { throw new IOException( "processCacheReport from dead or unregistered datanode: " + datanode); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 490f3e403e9..67f13869dd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -36,6 +36,7 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import com.google.common.collect.Lists; @@ -1365,9 +1366,9 @@ class NameNodeRpcServer implements NamenodeProtocols { } @Override // DatanodeProtocol - public DatanodeCommand blockReport(DatanodeRegistration nodeReg, - String poolId, StorageBlockReport[] reports, - BlockReportContext context) throws IOException { + public DatanodeCommand blockReport(final DatanodeRegistration nodeReg, + String poolId, final StorageBlockReport[] reports, + final BlockReportContext context) throws IOException { checkNNStartup(); verifyRequest(nodeReg); if(blockStateChangeLog.isDebugEnabled()) { @@ -1383,8 +1384,14 @@ class NameNodeRpcServer implements NamenodeProtocols { // for the same node and storage, so the value returned by the last // call of this loop is the final updated value for noStaleStorage. // - noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(), - blocks, context, (r == reports.length - 1)); + final int index = r; + noStaleStorages = bm.runBlockOp(new Callable() { + @Override + public Boolean call() throws IOException { + return bm.processReport(nodeReg, reports[index].getStorage(), + blocks, context, (index == reports.length - 1)); + } + }); metrics.incrStorageBlockReportOps(); } BlockManagerFaultInjector.getInstance(). @@ -1414,8 +1421,9 @@ class NameNodeRpcServer implements NamenodeProtocols { } @Override // DatanodeProtocol - public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, - StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { + public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg, + String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) + throws IOException { checkNNStartup(); verifyRequest(nodeReg); metrics.incrBlockReceivedAndDeletedOps(); @@ -1424,8 +1432,22 @@ class NameNodeRpcServer implements NamenodeProtocols { +"from "+nodeReg+" "+receivedAndDeletedBlocks.length +" blocks."); } - for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) { - namesystem.processIncrementalBlockReport(nodeReg, r); + final BlockManager bm = namesystem.getBlockManager(); + for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) { + bm.enqueueBlockOp(new Runnable() { + @Override + public void run() { + try { + namesystem.processIncrementalBlockReport(nodeReg, r); + } catch (Exception ex) { + // usually because the node is unregistered/dead. next heartbeat + // will correct the problem + blockStateChangeLog.error( + "*BLOCK* NameNode.blockReceivedAndDeleted: " + + "failed from " + nodeReg + ": " + ex.getMessage()); + } + } + }); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java index 31bc16479b3..54b5c6ebf4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java @@ -76,6 +76,10 @@ public class NameNodeMetrics { MutableCounterLong blockReceivedAndDeletedOps; @Metric("Number of blockReports from individual storages") MutableCounterLong storageBlockReportOps; + @Metric("Number of blockReports and blockReceivedAndDeleted queued") + MutableGaugeInt blockOpsQueued; + @Metric("Number of blockReports and blockReceivedAndDeleted batch processed") + MutableCounterLong blockOpsBatched; @Metric("Number of file system operations") public long totalFileOps(){ @@ -267,6 +271,14 @@ public class NameNodeMetrics { storageBlockReportOps.incr(); } + public void setBlockOpsQueued(int size) { + blockOpsQueued.set(size); + } + + public void addBlockOpsBatched(int count) { + blockOpsBatched.incr(count); + } + public void addTransaction(long latency) { transactions.add(latency); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java index 211e6aac85b..587032183c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java @@ -20,21 +20,32 @@ package org.apache.hadoop.hdfs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.VersionInfo; import org.junit.Test; +import com.google.common.base.Supplier; + import java.net.InetSocketAddress; import java.security.Permission; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.*; import static org.mockito.Mockito.doReturn; @@ -307,4 +318,131 @@ public class TestDatanodeRegistration { } } } -} + + // IBRs are async operations to free up IPC handlers. This means the IBR + // response will not contain non-IPC level exceptions - which in practice + // should not occur other than dead/unregistered node which will trigger a + // re-registration. If a non-IPC exception does occur, the safety net is + // a forced re-registration on the next heartbeat. + @Test(timeout=10000) + public void testForcedRegistration() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 4); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, Integer.MAX_VALUE); + + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + cluster.getHttpUri(0); + FSNamesystem fsn = cluster.getNamesystem(); + String bpId = fsn.getBlockPoolId(); + + DataNode dn = cluster.getDataNodes().get(0); + DatanodeDescriptor dnd = + NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId()); + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + DatanodeStorageInfo storage = dnd.getStorageInfos()[0]; + + // registration should not change after heartbeat. + assertTrue(dnd.isRegistered()); + DatanodeRegistration lastReg = dn.getDNRegistrationForBP(bpId); + waitForHeartbeat(dn, dnd); + assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); + + // force a re-registration on next heartbeat. + dnd.setForceRegistration(true); + assertFalse(dnd.isRegistered()); + waitForHeartbeat(dn, dnd); + assertTrue(dnd.isRegistered()); + DatanodeRegistration newReg = dn.getDNRegistrationForBP(bpId); + assertNotSame(lastReg, newReg); + lastReg = newReg; + + // registration should not change on subsequent heartbeats. + waitForHeartbeat(dn, dnd); + assertTrue(dnd.isRegistered()); + assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); + assertTrue(waitForBlockReport(dn, dnd)); + assertTrue(dnd.isRegistered()); + assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); + + // check that block report is not processed and registration didn't change. + dnd.setForceRegistration(true); + assertFalse(waitForBlockReport(dn, dnd)); + assertFalse(dnd.isRegistered()); + assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); + + // heartbeat should trigger re-registration, and next block report should + // not change registration. + waitForHeartbeat(dn, dnd); + assertTrue(dnd.isRegistered()); + newReg = dn.getDNRegistrationForBP(bpId); + assertNotSame(lastReg, newReg); + lastReg = newReg; + assertTrue(waitForBlockReport(dn, dnd)); + assertTrue(dnd.isRegistered()); + assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); + + // registration doesn't change. + ExtendedBlock eb = new ExtendedBlock(bpId, 1234); + dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID()); + DataNodeTestUtils.triggerDeletionReport(dn); + assertTrue(dnd.isRegistered()); + assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); + + // a failed IBR will effectively unregister the node. + boolean failed = false; + try { + // pass null to cause a failure since there aren't any easy failure + // modes since it shouldn't happen. + fsn.processIncrementalBlockReport(lastReg, null); + } catch (NullPointerException npe) { + failed = true; + } + assertTrue("didn't fail", failed); + assertFalse(dnd.isRegistered()); + + // should remain unregistered until next heartbeat. + dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID()); + DataNodeTestUtils.triggerDeletionReport(dn); + assertFalse(dnd.isRegistered()); + assertSame(lastReg, dn.getDNRegistrationForBP(bpId)); + waitForHeartbeat(dn, dnd); + assertTrue(dnd.isRegistered()); + assertNotSame(lastReg, dn.getDNRegistrationForBP(bpId)); + } + + private void waitForHeartbeat(final DataNode dn, final DatanodeDescriptor dnd) + throws Exception { + final long lastUpdate = dnd.getLastUpdateMonotonic(); + Thread.sleep(1); + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); + DataNodeTestUtils.triggerHeartbeat(dn); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return lastUpdate != dnd.getLastUpdateMonotonic(); + } + }, 10, 100000); + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + } + + private boolean waitForBlockReport(final DataNode dn, + final DatanodeDescriptor dnd) throws Exception { + final DatanodeStorageInfo storage = dnd.getStorageInfos()[0]; + final long lastCount = storage.getBlockReportCount(); + dn.triggerBlockReport( + new BlockReportOptions.Factory().setIncremental(false).build()); + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return lastCount != storage.getBlockReportCount(); + } + }, 10, 100); + } catch (TimeoutException te) { + return false; + } + return true; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 5df73cecf7c..411a759e4df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -34,8 +35,20 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; @@ -60,8 +73,12 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.MetricsAsserts; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -832,4 +849,158 @@ public class TestBlockManager { Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks, null, excessTypes)); } -} + + @Test + public void testBlockReportQueueing() throws Exception { + Configuration conf = new HdfsConfiguration(); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + final FSNamesystem fsn = cluster.getNamesystem(); + final BlockManager bm = fsn.getBlockManager(); + final ExecutorService executor = Executors.newCachedThreadPool(); + + final CyclicBarrier startBarrier = new CyclicBarrier(2); + final CountDownLatch endLatch = new CountDownLatch(3); + + // create a task intended to block while processing, thus causing + // the queue to backup. simulates how a full BR is processed. + FutureTask blockingOp = new FutureTask( + new Callable(){ + @Override + public Void call() throws IOException { + return bm.runBlockOp(new Callable() { + @Override + public Void call() + throws InterruptedException, BrokenBarrierException { + // use a barrier to control the blocking. + startBarrier.await(); + endLatch.countDown(); + return null; + } + }); + } + }); + + // create an async task. simulates how an IBR is processed. + Callable asyncOp = new Callable(){ + @Override + public Void call() throws IOException { + bm.enqueueBlockOp(new Runnable() { + @Override + public void run() { + // use the latch to signal if the op has run. + endLatch.countDown(); + } + }); + return null; + } + }; + + // calling get forces its execution so we can test if it's blocked. + Future blockedFuture = executor.submit(blockingOp); + boolean isBlocked = false; + try { + // wait 1s for the future to block. it should run instantaneously. + blockedFuture.get(1, TimeUnit.SECONDS); + } catch (TimeoutException te) { + isBlocked = true; + } + assertTrue(isBlocked); + + // should effectively return immediately since calls are queued. + // however they should be backed up in the queue behind the blocking + // operation. + executor.submit(asyncOp).get(1, TimeUnit.SECONDS); + executor.submit(asyncOp).get(1, TimeUnit.SECONDS); + + // check the async calls are queued, and first is still blocked. + assertEquals(2, bm.getBlockOpQueueLength()); + assertFalse(blockedFuture.isDone()); + + // unblock the queue, wait for last op to complete, check the blocked + // call has returned + startBarrier.await(1, TimeUnit.SECONDS); + assertTrue(endLatch.await(1, TimeUnit.SECONDS)); + assertEquals(0, bm.getBlockOpQueueLength()); + assertTrue(blockingOp.isDone()); + } finally { + cluster.shutdown(); + } + } + + // spam the block manager with IBRs to verify queuing is occurring. + @Test + public void testAsyncIBR() throws Exception { + Logger.getRootLogger().setLevel(Level.WARN); + + // will create files with many small blocks. + final int blkSize = 4*1024; + final int fileSize = blkSize * 100; + final byte[] buf = new byte[2*blkSize]; + final int numWriters = 4; + final int repl = 3; + + final CyclicBarrier barrier = new CyclicBarrier(numWriters); + final CountDownLatch writeLatch = new CountDownLatch(numWriters); + final AtomicBoolean failure = new AtomicBoolean(); + + final Configuration conf = new HdfsConfiguration(); + conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, blkSize); + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(8).build(); + + try { + cluster.waitActive(); + // create multiple writer threads to create a file with many blocks. + // will test that concurrent writing causes IBR batching in the NN + Thread[] writers = new Thread[numWriters]; + for (int i=0; i < writers.length; i++) { + final Path p = new Path("/writer"+i); + writers[i] = new Thread(new Runnable() { + @Override + public void run() { + try { + FileSystem fs = cluster.getFileSystem(); + FSDataOutputStream os = + fs.create(p, true, buf.length, (short)repl, blkSize); + // align writers for maximum chance of IBR batching. + barrier.await(); + int remaining = fileSize; + while (remaining > 0) { + os.write(buf); + remaining -= buf.length; + } + os.close(); + } catch (Exception e) { + e.printStackTrace(); + failure.set(true); + } + // let main thread know we are done. + writeLatch.countDown(); + } + }); + writers[i].start(); + } + + // when and how many IBRs are queued is indeterminate, so just watch + // the metrics and verify something was queued at during execution. + boolean sawQueued = false; + while (!writeLatch.await(10, TimeUnit.MILLISECONDS)) { + assertFalse(failure.get()); + MetricsRecordBuilder rb = getMetrics("NameNodeActivity"); + long queued = MetricsAsserts.getIntGauge("BlockOpsQueued", rb); + sawQueued |= (queued > 0); + } + assertFalse(failure.get()); + assertTrue(sawQueued); + + // verify that batching of the IBRs occurred. + MetricsRecordBuilder rb = getMetrics("NameNodeActivity"); + long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb); + assertTrue(batched > 0); + } finally { + cluster.shutdown(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java index 3d399a2721a..b5b0cf21df3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java @@ -304,7 +304,8 @@ public class TestPendingReplication { reportDnNum++; } } - + // IBRs are async, make sure the NN processes all of them. + cluster.getNamesystem().getBlockManager().flushBlockOps(); assertEquals(DATANODE_COUNT - 3, blkManager.pendingReplications.getNumReplicas(blocks[0])); @@ -322,6 +323,7 @@ public class TestPendingReplication { } } + cluster.getNamesystem().getBlockManager().flushBlockOps(); assertEquals(DATANODE_COUNT - 3, blkManager.pendingReplications.getNumReplicas(blocks[0])); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index c4a2d06c0e9..0a57005ab69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -113,9 +113,13 @@ public abstract class BlockReportTestBase { @After public void shutDownCluster() throws IOException { - fs.close(); - cluster.shutdownDataNodes(); - cluster.shutdown(); + if (fs != null) { + fs.close(); + } + if (cluster != null) { + cluster.shutdownDataNodes(); + cluster.shutdown(); + } } protected static void resetConfiguration() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java index d8c651f5fe9..0944396807b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java @@ -26,7 +26,6 @@ import static org.junit.Assert.*; import java.io.IOException; import java.net.InetSocketAddress; import java.util.UUID; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -187,7 +186,9 @@ public class TestIncrementalBrVariations { } // Make sure that the deleted block from each storage was picked up - // by the NameNode. + // by the NameNode. IBRs are async, make sure the NN processes + // all of them. + cluster.getNamesystem().getBlockManager().flushBlockOps(); assertThat(cluster.getNamesystem().getMissingBlocksCount(), is((long) reports.length)); } @@ -248,7 +249,8 @@ public class TestIncrementalBrVariations { // Send the report to the NN. cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports); - + // IBRs are async, make sure the NN processes all of them. + cluster.getNamesystem().getBlockManager().flushBlockOps(); // Make sure that the NN has learned of the new storage. DatanodeStorageInfo storageInfo = cluster.getNameNode() .getNamesystem() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index ec3d924264a..875ba4a224d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -100,14 +100,14 @@ public class TestDeadDatanode { null) }; StorageReceivedDeletedBlocks[] storageBlocks = { new StorageReceivedDeletedBlocks(reg.getDatanodeUuid(), blocks) }; - - // Ensure blockReceived call from dead datanode is rejected with IOException - try { - dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks); - fail("Expected IOException is not thrown"); - } catch (IOException ex) { - // Expected - } + + // Ensure blockReceived call from dead datanode is not rejected with + // IOException, since it's async, but the node remains unregistered. + dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks); + BlockManager bm = cluster.getNamesystem().getBlockManager(); + // IBRs are async, make sure the NN processes all of them. + bm.flushBlockOps(); + assertFalse(bm.getDatanodeManager().getDatanode(reg).isRegistered()); // Ensure blockReport from dead datanode is rejected with IOException StorageBlockReport[] report = { new StorageBlockReport(