HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh)

(cherry picked from commit f741476146)
This commit is contained in:
Uma Mahesh 2015-12-16 18:16:39 -08:00
parent e82135df87
commit 3be39d0229
14 changed files with 542 additions and 40 deletions

View File

@ -1637,6 +1637,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9430. Remove waitForLoadingFSImage since checkNNStartup has ensured HDFS-9430. Remove waitForLoadingFSImage since checkNNStartup has ensured
image loaded and namenode started. (Brahma Reddy Battula via mingma) image loaded and namenode started. (Brahma Reddy Battula via mingma)
HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -34,6 +34,11 @@ import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -91,6 +96,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -192,6 +198,10 @@ public class BlockManager implements BlockStatsMXBean {
/** Replication thread. */ /** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor()); final Daemon replicationThread = new Daemon(new ReplicationMonitor());
/** Block report thread for handling async reports. */
private final BlockReportProcessingThread blockReportThread =
new BlockReportProcessingThread();
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */ /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
@ -483,6 +493,7 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.activate(conf); datanodeManager.activate(conf);
this.replicationThread.setName("ReplicationMonitor"); this.replicationThread.setName("ReplicationMonitor");
this.replicationThread.start(); this.replicationThread.start();
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this); mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal); bmSafeMode.activate(blockTotal);
} }
@ -491,7 +502,9 @@ public class BlockManager implements BlockStatsMXBean {
bmSafeMode.close(); bmSafeMode.close();
try { try {
replicationThread.interrupt(); replicationThread.interrupt();
blockReportThread.interrupt();
replicationThread.join(3000); replicationThread.join(3000);
blockReportThread.join(3000);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
} }
datanodeManager.close(); datanodeManager.close();
@ -1877,7 +1890,7 @@ public class BlockManager implements BlockStatsMXBean {
try { try {
node = datanodeManager.getDatanode(nodeID); node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive()) { if (node == null || !node.isRegistered()) {
throw new IOException( throw new IOException(
"ProcessReport from dead or unregistered node: " + nodeID); "ProcessReport from dead or unregistered node: " + nodeID);
} }
@ -3229,17 +3242,23 @@ public class BlockManager implements BlockStatsMXBean {
public void processIncrementalBlockReport(final DatanodeID nodeID, public void processIncrementalBlockReport(final DatanodeID nodeID,
final StorageReceivedDeletedBlocks srdb) throws IOException { final StorageReceivedDeletedBlocks srdb) throws IOException {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
int received = 0;
int deleted = 0;
int receiving = 0;
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive()) { if (node == null || !node.isRegistered()) {
blockLog.warn("BLOCK* processIncrementalBlockReport" blockLog.warn("BLOCK* processIncrementalBlockReport"
+ " is received from dead or unregistered node {}", nodeID); + " is received from dead or unregistered node {}", nodeID);
throw new IOException( throw new IOException(
"Got incremental block report from unregistered or dead node"); "Got incremental block report from unregistered or dead node");
} }
try {
processIncrementalBlockReport(node, srdb);
} catch (Exception ex) {
node.setForceRegistration(true);
throw ex;
}
}
private void processIncrementalBlockReport(final DatanodeDescriptor node,
final StorageReceivedDeletedBlocks srdb) throws IOException {
DatanodeStorageInfo storageInfo = DatanodeStorageInfo storageInfo =
node.getStorageInfo(srdb.getStorage().getStorageID()); node.getStorageInfo(srdb.getStorage().getStorageID());
if (storageInfo == null) { if (storageInfo == null) {
@ -3251,6 +3270,10 @@ public class BlockManager implements BlockStatsMXBean {
storageInfo = node.updateStorage(srdb.getStorage()); storageInfo = node.updateStorage(srdb.getStorage());
} }
int received = 0;
int deleted = 0;
int receiving = 0;
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) { for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
switch (rdbi.getStatus()) { switch (rdbi.getStatus()) {
case DELETED_BLOCK: case DELETED_BLOCK:
@ -3268,17 +3291,17 @@ public class BlockManager implements BlockStatsMXBean {
break; break;
default: default:
String msg = String msg =
"Unknown block status code reported by " + nodeID + "Unknown block status code reported by " + node +
": " + rdbi; ": " + rdbi;
blockLog.warn(msg); blockLog.warn(msg);
assert false : msg; // if assertions are enabled, throw. assert false : msg; // if assertions are enabled, throw.
break; break;
} }
blockLog.debug("BLOCK* block {}: {} is received from {}", blockLog.debug("BLOCK* block {}: {} is received from {}",
rdbi.getStatus(), rdbi.getBlock(), nodeID); rdbi.getStatus(), rdbi.getBlock(), node);
} }
blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from " blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from "
+ "{} receiving: {}, received: {}, deleted: {}", nodeID, receiving, + "{} receiving: {}, received: {}, deleted: {}", node, receiving,
received, deleted); received, deleted);
} }
@ -3879,4 +3902,119 @@ public class BlockManager implements BlockStatsMXBean {
return false; return false;
} }
// async processing of an action, used for IBRs.
public void enqueueBlockOp(final Runnable action) throws IOException {
try {
blockReportThread.enqueue(action);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
// sync batch processing for a full BR.
public <T> T runBlockOp(final Callable<T> action)
throws IOException {
final FutureTask<T> future = new FutureTask<T>(action);
enqueueBlockOp(future);
try {
return future.get();
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause == null) {
cause = ee;
}
if (!(cause instanceof IOException)) {
cause = new IOException(cause);
}
throw (IOException)cause;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException(ie);
}
}
@VisibleForTesting
public void flushBlockOps() throws IOException {
runBlockOp(new Callable<Void>(){
@Override
public Void call() {
return null;
}
});
}
public int getBlockOpQueueLength() {
return blockReportThread.queue.size();
}
private class BlockReportProcessingThread extends Thread {
private static final long MAX_LOCK_HOLD_MS = 4;
private long lastFull = 0;
private final BlockingQueue<Runnable> queue =
new ArrayBlockingQueue<Runnable>(1024);
BlockReportProcessingThread() {
super("Block report processor");
setDaemon(true);
}
@Override
public void run() {
try {
processQueue();
} catch (Throwable t) {
ExitUtil.terminate(1,
getName() + " encountered fatal exception: " + t);
}
}
private void processQueue() {
while (namesystem.isRunning()) {
NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
try {
Runnable action = queue.take();
// batch as many operations in the write lock until the queue
// runs dry, or the max lock hold is reached.
int processed = 0;
namesystem.writeLock();
metrics.setBlockOpsQueued(queue.size() + 1);
try {
long start = Time.monotonicNow();
do {
processed++;
action.run();
if (Time.monotonicNow() - start > MAX_LOCK_HOLD_MS) {
break;
}
action = queue.poll();
} while (action != null);
} finally {
namesystem.writeUnlock();
metrics.addBlockOpsBatched(processed - 1);
}
} catch (InterruptedException e) {
// ignore unless thread was specifically interrupted.
if (Thread.interrupted()) {
break;
}
}
}
queue.clear();
}
void enqueue(Runnable action) throws InterruptedException {
if (!queue.offer(action)) {
if (!isAlive() && namesystem.isRunning()) {
ExitUtil.terminate(1, getName()+" is not running");
}
long now = Time.monotonicNow();
if (now - lastFull > 4000) {
lastFull = now;
LOG.info("Block report queue is full");
}
queue.put(action);
}
}
}
} }

View File

@ -189,6 +189,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
// This is an optimization, because contains takes O(n) time on Arraylist // This is an optimization, because contains takes O(n) time on Arraylist
private boolean isAlive = false; private boolean isAlive = false;
private boolean needKeyUpdate = false; private boolean needKeyUpdate = false;
private boolean forceRegistration = false;
// A system administrator can tune the balancer bandwidth parameter // A system administrator can tune the balancer bandwidth parameter
// (dfs.balance.bandwidthPerSec) dynamically by calling // (dfs.balance.bandwidthPerSec) dynamically by calling
@ -301,7 +302,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
} }
} }
DatanodeStorageInfo[] getStorageInfos() { @VisibleForTesting
public DatanodeStorageInfo[] getStorageInfos() {
synchronized (storageMap) { synchronized (storageMap) {
final Collection<DatanodeStorageInfo> storages = storageMap.values(); final Collection<DatanodeStorageInfo> storages = storageMap.values();
return storages.toArray(new DatanodeStorageInfo[storages.size()]); return storages.toArray(new DatanodeStorageInfo[storages.size()]);
@ -824,6 +826,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
storage.setBlockReportCount(0); storage.setBlockReportCount(0);
} }
heartbeatedSinceRegistration = false; heartbeatedSinceRegistration = false;
forceRegistration = false;
} }
/** /**
@ -906,6 +909,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
return false; return false;
} }
return true; return true;
} }
public void setForceRegistration(boolean force) {
forceRegistration = force;
}
public boolean isRegistered() {
return isAlive() && !forceRegistration;
}
} }

View File

@ -1412,7 +1412,7 @@ public class DatanodeManager {
throw new DisallowedDatanodeException(nodeinfo); throw new DisallowedDatanodeException(nodeinfo);
} }
if (nodeinfo == null || !nodeinfo.isAlive()) { if (nodeinfo == null || !nodeinfo.isRegistered()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER}; return new DatanodeCommand[]{RegisterCommand.REGISTER};
} }
heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity, heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,

View File

@ -151,7 +151,7 @@ public class DatanodeStorageInfo {
this.state = s.getState(); this.state = s.getState();
} }
int getBlockReportCount() { public int getBlockReportCount() {
return blockReportCount; return blockReportCount;
} }

View File

@ -934,7 +934,7 @@ public final class CacheManager {
try { try {
final DatanodeDescriptor datanode = final DatanodeDescriptor datanode =
blockManager.getDatanodeManager().getDatanode(datanodeID); blockManager.getDatanodeManager().getDatanode(datanodeID);
if (datanode == null || !datanode.isAlive()) { if (datanode == null || !datanode.isRegistered()) {
throw new IOException( throw new IOException(
"processCacheReport from dead or unregistered datanode: " + "processCacheReport from dead or unregistered datanode: " +
datanode); datanode);

View File

@ -36,6 +36,7 @@ import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -1358,9 +1359,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
} }
@Override // DatanodeProtocol @Override // DatanodeProtocol
public DatanodeCommand blockReport(DatanodeRegistration nodeReg, public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
String poolId, StorageBlockReport[] reports, String poolId, final StorageBlockReport[] reports,
BlockReportContext context) throws IOException { final BlockReportContext context) throws IOException {
checkNNStartup(); checkNNStartup();
verifyRequest(nodeReg); verifyRequest(nodeReg);
if(blockStateChangeLog.isDebugEnabled()) { if(blockStateChangeLog.isDebugEnabled()) {
@ -1376,8 +1377,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
// for the same node and storage, so the value returned by the last // for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage. // call of this loop is the final updated value for noStaleStorage.
// //
noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(), final int index = r;
blocks, context, (r == reports.length - 1)); noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
@Override
public Boolean call() throws IOException {
return bm.processReport(nodeReg, reports[index].getStorage(),
blocks, context, (index == reports.length - 1));
}
});
metrics.incrStorageBlockReportOps(); metrics.incrStorageBlockReportOps();
} }
BlockManagerFaultInjector.getInstance(). BlockManagerFaultInjector.getInstance().
@ -1407,8 +1414,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
} }
@Override // DatanodeProtocol @Override // DatanodeProtocol
public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
throws IOException {
checkNNStartup(); checkNNStartup();
verifyRequest(nodeReg); verifyRequest(nodeReg);
metrics.incrBlockReceivedAndDeletedOps(); metrics.incrBlockReceivedAndDeletedOps();
@ -1417,8 +1425,22 @@ class NameNodeRpcServer implements NamenodeProtocols {
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length +"from "+nodeReg+" "+receivedAndDeletedBlocks.length
+" blocks."); +" blocks.");
} }
for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) { final BlockManager bm = namesystem.getBlockManager();
namesystem.processIncrementalBlockReport(nodeReg, r); for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
bm.enqueueBlockOp(new Runnable() {
@Override
public void run() {
try {
namesystem.processIncrementalBlockReport(nodeReg, r);
} catch (Exception ex) {
// usually because the node is unregistered/dead. next heartbeat
// will correct the problem
blockStateChangeLog.error(
"*BLOCK* NameNode.blockReceivedAndDeleted: "
+ "failed from " + nodeReg + ": " + ex.getMessage());
}
}
});
} }
} }

View File

@ -76,6 +76,10 @@ public class NameNodeMetrics {
MutableCounterLong blockReceivedAndDeletedOps; MutableCounterLong blockReceivedAndDeletedOps;
@Metric("Number of blockReports from individual storages") @Metric("Number of blockReports from individual storages")
MutableCounterLong storageBlockReportOps; MutableCounterLong storageBlockReportOps;
@Metric("Number of blockReports and blockReceivedAndDeleted queued")
MutableGaugeInt blockOpsQueued;
@Metric("Number of blockReports and blockReceivedAndDeleted batch processed")
MutableCounterLong blockOpsBatched;
@Metric("Number of file system operations") @Metric("Number of file system operations")
public long totalFileOps(){ public long totalFileOps(){
@ -267,6 +271,14 @@ public class NameNodeMetrics {
storageBlockReportOps.incr(); storageBlockReportOps.incr();
} }
public void setBlockOpsQueued(int size) {
blockOpsQueued.set(size);
}
public void addBlockOpsBatched(int count) {
blockOpsBatched.incr(count);
}
public void addTransaction(long latency) { public void addTransaction(long latency) {
transactions.add(latency); transactions.add(latency);
} }

View File

@ -20,21 +20,32 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.Permission; import java.security.Permission;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@ -308,4 +319,131 @@ public class TestDatanodeRegistration {
} }
} }
} }
// IBRs are async operations to free up IPC handlers. This means the IBR
// response will not contain non-IPC level exceptions - which in practice
// should not occur other than dead/unregistered node which will trigger a
// re-registration. If a non-IPC exception does occur, the safety net is
// a forced re-registration on the next heartbeat.
@Test(timeout=10000)
public void testForcedRegistration() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 4);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, Integer.MAX_VALUE);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
cluster.getHttpUri(0);
FSNamesystem fsn = cluster.getNamesystem();
String bpId = fsn.getBlockPoolId();
DataNode dn = cluster.getDataNodes().get(0);
DatanodeDescriptor dnd =
NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
// registration should not change after heartbeat.
assertTrue(dnd.isRegistered());
DatanodeRegistration lastReg = dn.getDNRegistrationForBP(bpId);
waitForHeartbeat(dn, dnd);
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
// force a re-registration on next heartbeat.
dnd.setForceRegistration(true);
assertFalse(dnd.isRegistered());
waitForHeartbeat(dn, dnd);
assertTrue(dnd.isRegistered());
DatanodeRegistration newReg = dn.getDNRegistrationForBP(bpId);
assertNotSame(lastReg, newReg);
lastReg = newReg;
// registration should not change on subsequent heartbeats.
waitForHeartbeat(dn, dnd);
assertTrue(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
assertTrue(waitForBlockReport(dn, dnd));
assertTrue(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
// check that block report is not processed and registration didn't change.
dnd.setForceRegistration(true);
assertFalse(waitForBlockReport(dn, dnd));
assertFalse(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
// heartbeat should trigger re-registration, and next block report should
// not change registration.
waitForHeartbeat(dn, dnd);
assertTrue(dnd.isRegistered());
newReg = dn.getDNRegistrationForBP(bpId);
assertNotSame(lastReg, newReg);
lastReg = newReg;
assertTrue(waitForBlockReport(dn, dnd));
assertTrue(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
// registration doesn't change.
ExtendedBlock eb = new ExtendedBlock(bpId, 1234);
dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
DataNodeTestUtils.triggerDeletionReport(dn);
assertTrue(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
// a failed IBR will effectively unregister the node.
boolean failed = false;
try {
// pass null to cause a failure since there aren't any easy failure
// modes since it shouldn't happen.
fsn.processIncrementalBlockReport(lastReg, null);
} catch (NullPointerException npe) {
failed = true;
}
assertTrue("didn't fail", failed);
assertFalse(dnd.isRegistered());
// should remain unregistered until next heartbeat.
dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
DataNodeTestUtils.triggerDeletionReport(dn);
assertFalse(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
waitForHeartbeat(dn, dnd);
assertTrue(dnd.isRegistered());
assertNotSame(lastReg, dn.getDNRegistrationForBP(bpId));
}
private void waitForHeartbeat(final DataNode dn, final DatanodeDescriptor dnd)
throws Exception {
final long lastUpdate = dnd.getLastUpdateMonotonic();
Thread.sleep(1);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
DataNodeTestUtils.triggerHeartbeat(dn);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return lastUpdate != dnd.getLastUpdateMonotonic();
}
}, 10, 100000);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
private boolean waitForBlockReport(final DataNode dn,
final DatanodeDescriptor dnd) throws Exception {
final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
final long lastCount = storage.getBlockReportCount();
dn.triggerBlockReport(
new BlockReportOptions.Factory().setIncremental(false).build());
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return lastCount != storage.getBlockReportCount();
}
}, 10, 100);
} catch (TimeoutException te) {
return false;
}
return true;
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -34,8 +35,20 @@ import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
@ -60,8 +73,12 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -831,4 +848,158 @@ public class TestBlockManager {
Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks, Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks,
null, excessTypes)); null, excessTypes));
} }
@Test
public void testBlockReportQueueing() throws Exception {
Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
final FSNamesystem fsn = cluster.getNamesystem();
final BlockManager bm = fsn.getBlockManager();
final ExecutorService executor = Executors.newCachedThreadPool();
final CyclicBarrier startBarrier = new CyclicBarrier(2);
final CountDownLatch endLatch = new CountDownLatch(3);
// create a task intended to block while processing, thus causing
// the queue to backup. simulates how a full BR is processed.
FutureTask<?> blockingOp = new FutureTask<Void>(
new Callable<Void>(){
@Override
public Void call() throws IOException {
return bm.runBlockOp(new Callable<Void>() {
@Override
public Void call()
throws InterruptedException, BrokenBarrierException {
// use a barrier to control the blocking.
startBarrier.await();
endLatch.countDown();
return null;
}
});
}
});
// create an async task. simulates how an IBR is processed.
Callable<?> asyncOp = new Callable<Void>(){
@Override
public Void call() throws IOException {
bm.enqueueBlockOp(new Runnable() {
@Override
public void run() {
// use the latch to signal if the op has run.
endLatch.countDown();
}
});
return null;
}
};
// calling get forces its execution so we can test if it's blocked.
Future<?> blockedFuture = executor.submit(blockingOp);
boolean isBlocked = false;
try {
// wait 1s for the future to block. it should run instantaneously.
blockedFuture.get(1, TimeUnit.SECONDS);
} catch (TimeoutException te) {
isBlocked = true;
}
assertTrue(isBlocked);
// should effectively return immediately since calls are queued.
// however they should be backed up in the queue behind the blocking
// operation.
executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
// check the async calls are queued, and first is still blocked.
assertEquals(2, bm.getBlockOpQueueLength());
assertFalse(blockedFuture.isDone());
// unblock the queue, wait for last op to complete, check the blocked
// call has returned
startBarrier.await(1, TimeUnit.SECONDS);
assertTrue(endLatch.await(1, TimeUnit.SECONDS));
assertEquals(0, bm.getBlockOpQueueLength());
assertTrue(blockingOp.isDone());
} finally {
cluster.shutdown();
}
}
// spam the block manager with IBRs to verify queuing is occurring.
@Test
public void testAsyncIBR() throws Exception {
Logger.getRootLogger().setLevel(Level.WARN);
// will create files with many small blocks.
final int blkSize = 4*1024;
final int fileSize = blkSize * 100;
final byte[] buf = new byte[2*blkSize];
final int numWriters = 4;
final int repl = 3;
final CyclicBarrier barrier = new CyclicBarrier(numWriters);
final CountDownLatch writeLatch = new CountDownLatch(numWriters);
final AtomicBoolean failure = new AtomicBoolean();
final Configuration conf = new HdfsConfiguration();
conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, blkSize);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(8).build();
try {
cluster.waitActive();
// create multiple writer threads to create a file with many blocks.
// will test that concurrent writing causes IBR batching in the NN
Thread[] writers = new Thread[numWriters];
for (int i=0; i < writers.length; i++) {
final Path p = new Path("/writer"+i);
writers[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
FileSystem fs = cluster.getFileSystem();
FSDataOutputStream os =
fs.create(p, true, buf.length, (short)repl, blkSize);
// align writers for maximum chance of IBR batching.
barrier.await();
int remaining = fileSize;
while (remaining > 0) {
os.write(buf);
remaining -= buf.length;
}
os.close();
} catch (Exception e) {
e.printStackTrace();
failure.set(true);
}
// let main thread know we are done.
writeLatch.countDown();
}
});
writers[i].start();
}
// when and how many IBRs are queued is indeterminate, so just watch
// the metrics and verify something was queued at during execution.
boolean sawQueued = false;
while (!writeLatch.await(10, TimeUnit.MILLISECONDS)) {
assertFalse(failure.get());
MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
long queued = MetricsAsserts.getIntGauge("BlockOpsQueued", rb);
sawQueued |= (queued > 0);
}
assertFalse(failure.get());
assertTrue(sawQueued);
// verify that batching of the IBRs occurred.
MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb);
assertTrue(batched > 0);
} finally {
cluster.shutdown();
}
}
} }

View File

@ -304,7 +304,8 @@ public class TestPendingReplication {
reportDnNum++; reportDnNum++;
} }
} }
// IBRs are async, make sure the NN processes all of them.
cluster.getNamesystem().getBlockManager().flushBlockOps();
assertEquals(DATANODE_COUNT - 3, assertEquals(DATANODE_COUNT - 3,
blkManager.pendingReplications.getNumReplicas(blocks[0])); blkManager.pendingReplications.getNumReplicas(blocks[0]));
@ -322,6 +323,7 @@ public class TestPendingReplication {
} }
} }
cluster.getNamesystem().getBlockManager().flushBlockOps();
assertEquals(DATANODE_COUNT - 3, assertEquals(DATANODE_COUNT - 3,
blkManager.pendingReplications.getNumReplicas(blocks[0])); blkManager.pendingReplications.getNumReplicas(blocks[0]));

View File

@ -113,9 +113,13 @@ public abstract class BlockReportTestBase {
@After @After
public void shutDownCluster() throws IOException { public void shutDownCluster() throws IOException {
fs.close(); if (fs != null) {
cluster.shutdownDataNodes(); fs.close();
cluster.shutdown(); }
if (cluster != null) {
cluster.shutdownDataNodes();
cluster.shutdown();
}
} }
protected static void resetConfiguration() { protected static void resetConfiguration() {

View File

@ -26,7 +26,6 @@ import static org.junit.Assert.*;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.UUID; import java.util.UUID;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -187,7 +186,9 @@ public class TestIncrementalBrVariations {
} }
// Make sure that the deleted block from each storage was picked up // Make sure that the deleted block from each storage was picked up
// by the NameNode. // by the NameNode. IBRs are async, make sure the NN processes
// all of them.
cluster.getNamesystem().getBlockManager().flushBlockOps();
assertThat(cluster.getNamesystem().getMissingBlocksCount(), assertThat(cluster.getNamesystem().getMissingBlocksCount(),
is((long) reports.length)); is((long) reports.length));
} }
@ -256,7 +257,8 @@ public class TestIncrementalBrVariations {
// Send the report to the NN. // Send the report to the NN.
cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports); cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
// IBRs are async, make sure the NN processes all of them.
cluster.getNamesystem().getBlockManager().flushBlockOps();
// Make sure that the NN has learned of the new storage. // Make sure that the NN has learned of the new storage.
DatanodeStorageInfo storageInfo = cluster.getNameNode() DatanodeStorageInfo storageInfo = cluster.getNameNode()
.getNamesystem() .getNamesystem()

View File

@ -101,13 +101,13 @@ public class TestDeadDatanode {
StorageReceivedDeletedBlocks[] storageBlocks = { StorageReceivedDeletedBlocks[] storageBlocks = {
new StorageReceivedDeletedBlocks(reg.getDatanodeUuid(), blocks) }; new StorageReceivedDeletedBlocks(reg.getDatanodeUuid(), blocks) };
// Ensure blockReceived call from dead datanode is rejected with IOException // Ensure blockReceived call from dead datanode is not rejected with
try { // IOException, since it's async, but the node remains unregistered.
dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks); dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
fail("Expected IOException is not thrown"); BlockManager bm = cluster.getNamesystem().getBlockManager();
} catch (IOException ex) { // IBRs are async, make sure the NN processes all of them.
// Expected bm.flushBlockOps();
} assertFalse(bm.getDatanodeManager().getDatanode(reg).isRegistered());
// Ensure blockReport from dead datanode is rejected with IOException // Ensure blockReport from dead datanode is rejected with IOException
StorageBlockReport[] report = { new StorageBlockReport( StorageBlockReport[] report = { new StorageBlockReport(