HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh)

(cherry picked from commit f741476146)
This commit is contained in:
Uma Mahesh 2015-12-16 18:16:39 -08:00
parent 93c483889c
commit e6dc6f24a2
14 changed files with 542 additions and 40 deletions

View File

@ -1586,6 +1586,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9429. Tests in TestDFSAdminWithHA intermittently fail with
EOFException (Xiao Chen via Colin P. McCabe)
HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh)
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -34,6 +34,11 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
@ -90,6 +95,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
@ -186,6 +192,10 @@ public class BlockManager implements BlockStatsMXBean {
/** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
/** Block report thread for handling async reports. */
private final BlockReportProcessingThread blockReportThread =
new BlockReportProcessingThread();
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
@ -470,13 +480,16 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.activate(conf);
this.replicationThread.setName("ReplicationMonitor");
this.replicationThread.start();
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
}
public void close() {
try {
replicationThread.interrupt();
blockReportThread.interrupt();
replicationThread.join(3000);
blockReportThread.join(3000);
} catch (InterruptedException ie) {
}
datanodeManager.close();
@ -1805,7 +1818,7 @@ public class BlockManager implements BlockStatsMXBean {
try {
node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive()) {
if (node == null || !node.isRegistered()) {
throw new IOException(
"ProcessReport from dead or unregistered node: " + nodeID);
}
@ -3157,17 +3170,23 @@ public class BlockManager implements BlockStatsMXBean {
public void processIncrementalBlockReport(final DatanodeID nodeID,
final StorageReceivedDeletedBlocks srdb) throws IOException {
assert namesystem.hasWriteLock();
int received = 0;
int deleted = 0;
int receiving = 0;
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive()) {
if (node == null || !node.isRegistered()) {
blockLog.warn("BLOCK* processIncrementalBlockReport"
+ " is received from dead or unregistered node {}", nodeID);
throw new IOException(
"Got incremental block report from unregistered or dead node");
}
try {
processIncrementalBlockReport(node, srdb);
} catch (Exception ex) {
node.setForceRegistration(true);
throw ex;
}
}
private void processIncrementalBlockReport(final DatanodeDescriptor node,
final StorageReceivedDeletedBlocks srdb) throws IOException {
DatanodeStorageInfo storageInfo =
node.getStorageInfo(srdb.getStorage().getStorageID());
if (storageInfo == null) {
@ -3179,6 +3198,10 @@ public class BlockManager implements BlockStatsMXBean {
storageInfo = node.updateStorage(srdb.getStorage());
}
int received = 0;
int deleted = 0;
int receiving = 0;
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
switch (rdbi.getStatus()) {
case DELETED_BLOCK:
@ -3196,17 +3219,17 @@ public class BlockManager implements BlockStatsMXBean {
break;
default:
String msg =
"Unknown block status code reported by " + nodeID +
"Unknown block status code reported by " + node +
": " + rdbi;
blockLog.warn(msg);
assert false : msg; // if assertions are enabled, throw.
break;
}
blockLog.debug("BLOCK* block {}: {} is received from {}",
rdbi.getStatus(), rdbi.getBlock(), nodeID);
rdbi.getStatus(), rdbi.getBlock(), node);
}
blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from "
+ "{} receiving: {}, received: {}, deleted: {}", nodeID, receiving,
+ "{} receiving: {}, received: {}, deleted: {}", node, receiving,
received, deleted);
}
@ -3807,4 +3830,119 @@ public class BlockManager implements BlockStatsMXBean {
return false;
}
// async processing of an action, used for IBRs.
public void enqueueBlockOp(final Runnable action) throws IOException {
try {
blockReportThread.enqueue(action);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
// sync batch processing for a full BR.
public <T> T runBlockOp(final Callable<T> action)
throws IOException {
final FutureTask<T> future = new FutureTask<T>(action);
enqueueBlockOp(future);
try {
return future.get();
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause == null) {
cause = ee;
}
if (!(cause instanceof IOException)) {
cause = new IOException(cause);
}
throw (IOException)cause;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException(ie);
}
}
@VisibleForTesting
public void flushBlockOps() throws IOException {
runBlockOp(new Callable<Void>(){
@Override
public Void call() {
return null;
}
});
}
public int getBlockOpQueueLength() {
return blockReportThread.queue.size();
}
private class BlockReportProcessingThread extends Thread {
private static final long MAX_LOCK_HOLD_MS = 4;
private long lastFull = 0;
private final BlockingQueue<Runnable> queue =
new ArrayBlockingQueue<Runnable>(1024);
BlockReportProcessingThread() {
super("Block report processor");
setDaemon(true);
}
@Override
public void run() {
try {
processQueue();
} catch (Throwable t) {
ExitUtil.terminate(1,
getName() + " encountered fatal exception: " + t);
}
}
private void processQueue() {
while (namesystem.isRunning()) {
NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
try {
Runnable action = queue.take();
// batch as many operations in the write lock until the queue
// runs dry, or the max lock hold is reached.
int processed = 0;
namesystem.writeLock();
metrics.setBlockOpsQueued(queue.size() + 1);
try {
long start = Time.monotonicNow();
do {
processed++;
action.run();
if (Time.monotonicNow() - start > MAX_LOCK_HOLD_MS) {
break;
}
action = queue.poll();
} while (action != null);
} finally {
namesystem.writeUnlock();
metrics.addBlockOpsBatched(processed - 1);
}
} catch (InterruptedException e) {
// ignore unless thread was specifically interrupted.
if (Thread.interrupted()) {
break;
}
}
}
queue.clear();
}
void enqueue(Runnable action) throws InterruptedException {
if (!queue.offer(action)) {
if (!isAlive() && namesystem.isRunning()) {
ExitUtil.terminate(1, getName()+" is not running");
}
long now = Time.monotonicNow();
if (now - lastFull > 4000) {
lastFull = now;
LOG.info("Block report queue is full");
}
queue.put(action);
}
}
}
}

View File

@ -189,6 +189,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
// This is an optimization, because contains takes O(n) time on Arraylist
private boolean isAlive = false;
private boolean needKeyUpdate = false;
private boolean forceRegistration = false;
// A system administrator can tune the balancer bandwidth parameter
// (dfs.balance.bandwidthPerSec) dynamically by calling
@ -301,7 +302,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
DatanodeStorageInfo[] getStorageInfos() {
@VisibleForTesting
public DatanodeStorageInfo[] getStorageInfos() {
synchronized (storageMap) {
final Collection<DatanodeStorageInfo> storages = storageMap.values();
return storages.toArray(new DatanodeStorageInfo[storages.size()]);
@ -824,6 +826,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
storage.setBlockReportCount(0);
}
heartbeatedSinceRegistration = false;
forceRegistration = false;
}
/**
@ -906,6 +909,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
return false;
}
return true;
}
}
public void setForceRegistration(boolean force) {
forceRegistration = force;
}
public boolean isRegistered() {
return isAlive() && !forceRegistration;
}
}

View File

@ -1343,7 +1343,7 @@ public class DatanodeManager {
throw new DisallowedDatanodeException(nodeinfo);
}
if (nodeinfo == null || !nodeinfo.isAlive()) {
if (nodeinfo == null || !nodeinfo.isRegistered()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}

View File

@ -151,7 +151,7 @@ public class DatanodeStorageInfo {
this.state = s.getState();
}
int getBlockReportCount() {
public int getBlockReportCount() {
return blockReportCount;
}

View File

@ -934,7 +934,7 @@ public final class CacheManager {
try {
final DatanodeDescriptor datanode =
blockManager.getDatanodeManager().getDatanode(datanodeID);
if (datanode == null || !datanode.isAlive()) {
if (datanode == null || !datanode.isRegistered()) {
throw new IOException(
"processCacheReport from dead or unregistered datanode: " +
datanode);

View File

@ -36,6 +36,7 @@ import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import com.google.common.collect.Lists;
@ -1358,9 +1359,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // DatanodeProtocol
public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
String poolId, StorageBlockReport[] reports,
BlockReportContext context) throws IOException {
public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
String poolId, final StorageBlockReport[] reports,
final BlockReportContext context) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
if(blockStateChangeLog.isDebugEnabled()) {
@ -1376,8 +1377,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
// for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage.
//
noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
blocks, context, (r == reports.length - 1));
final int index = r;
noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
@Override
public Boolean call() throws IOException {
return bm.processReport(nodeReg, reports[index].getStorage(),
blocks, context, (index == reports.length - 1));
}
});
metrics.incrStorageBlockReportOps();
}
BlockManagerFaultInjector.getInstance().
@ -1407,8 +1414,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // DatanodeProtocol
public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
metrics.incrBlockReceivedAndDeletedOps();
@ -1417,8 +1425,22 @@ class NameNodeRpcServer implements NamenodeProtocols {
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length
+" blocks.");
}
for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
namesystem.processIncrementalBlockReport(nodeReg, r);
final BlockManager bm = namesystem.getBlockManager();
for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
bm.enqueueBlockOp(new Runnable() {
@Override
public void run() {
try {
namesystem.processIncrementalBlockReport(nodeReg, r);
} catch (Exception ex) {
// usually because the node is unregistered/dead. next heartbeat
// will correct the problem
blockStateChangeLog.error(
"*BLOCK* NameNode.blockReceivedAndDeleted: "
+ "failed from " + nodeReg + ": " + ex.getMessage());
}
}
});
}
}

View File

@ -76,6 +76,10 @@ public class NameNodeMetrics {
MutableCounterLong blockReceivedAndDeletedOps;
@Metric("Number of blockReports from individual storages")
MutableCounterLong storageBlockReportOps;
@Metric("Number of blockReports and blockReceivedAndDeleted queued")
MutableGaugeInt blockOpsQueued;
@Metric("Number of blockReports and blockReceivedAndDeleted batch processed")
MutableCounterLong blockOpsBatched;
@Metric("Number of file system operations")
public long totalFileOps(){
@ -267,6 +271,14 @@ public class NameNodeMetrics {
storageBlockReportOps.incr();
}
public void setBlockOpsQueued(int size) {
blockOpsQueued.set(size);
}
public void addBlockOpsBatched(int count) {
blockOpsBatched.incr(count);
}
public void addTransaction(long latency) {
transactions.add(latency);
}

View File

@ -20,21 +20,32 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.VersionInfo;
import org.junit.Test;
import com.google.common.base.Supplier;
import java.net.InetSocketAddress;
import java.security.Permission;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*;
import static org.mockito.Mockito.doReturn;
@ -308,4 +319,131 @@ public class TestDatanodeRegistration {
}
}
}
// IBRs are async operations to free up IPC handlers. This means the IBR
// response will not contain non-IPC level exceptions - which in practice
// should not occur other than dead/unregistered node which will trigger a
// re-registration. If a non-IPC exception does occur, the safety net is
// a forced re-registration on the next heartbeat.
@Test(timeout=10000)
public void testForcedRegistration() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 4);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, Integer.MAX_VALUE);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
cluster.getHttpUri(0);
FSNamesystem fsn = cluster.getNamesystem();
String bpId = fsn.getBlockPoolId();
DataNode dn = cluster.getDataNodes().get(0);
DatanodeDescriptor dnd =
NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
// registration should not change after heartbeat.
assertTrue(dnd.isRegistered());
DatanodeRegistration lastReg = dn.getDNRegistrationForBP(bpId);
waitForHeartbeat(dn, dnd);
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
// force a re-registration on next heartbeat.
dnd.setForceRegistration(true);
assertFalse(dnd.isRegistered());
waitForHeartbeat(dn, dnd);
assertTrue(dnd.isRegistered());
DatanodeRegistration newReg = dn.getDNRegistrationForBP(bpId);
assertNotSame(lastReg, newReg);
lastReg = newReg;
// registration should not change on subsequent heartbeats.
waitForHeartbeat(dn, dnd);
assertTrue(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
assertTrue(waitForBlockReport(dn, dnd));
assertTrue(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
// check that block report is not processed and registration didn't change.
dnd.setForceRegistration(true);
assertFalse(waitForBlockReport(dn, dnd));
assertFalse(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
// heartbeat should trigger re-registration, and next block report should
// not change registration.
waitForHeartbeat(dn, dnd);
assertTrue(dnd.isRegistered());
newReg = dn.getDNRegistrationForBP(bpId);
assertNotSame(lastReg, newReg);
lastReg = newReg;
assertTrue(waitForBlockReport(dn, dnd));
assertTrue(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
// registration doesn't change.
ExtendedBlock eb = new ExtendedBlock(bpId, 1234);
dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
DataNodeTestUtils.triggerDeletionReport(dn);
assertTrue(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
// a failed IBR will effectively unregister the node.
boolean failed = false;
try {
// pass null to cause a failure since there aren't any easy failure
// modes since it shouldn't happen.
fsn.processIncrementalBlockReport(lastReg, null);
} catch (NullPointerException npe) {
failed = true;
}
assertTrue("didn't fail", failed);
assertFalse(dnd.isRegistered());
// should remain unregistered until next heartbeat.
dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
DataNodeTestUtils.triggerDeletionReport(dn);
assertFalse(dnd.isRegistered());
assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
waitForHeartbeat(dn, dnd);
assertTrue(dnd.isRegistered());
assertNotSame(lastReg, dn.getDNRegistrationForBP(bpId));
}
private void waitForHeartbeat(final DataNode dn, final DatanodeDescriptor dnd)
throws Exception {
final long lastUpdate = dnd.getLastUpdateMonotonic();
Thread.sleep(1);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
DataNodeTestUtils.triggerHeartbeat(dn);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return lastUpdate != dnd.getLastUpdateMonotonic();
}
}, 10, 100000);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
private boolean waitForBlockReport(final DataNode dn,
final DatanodeDescriptor dnd) throws Exception {
final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
final long lastCount = storage.getBlockReportCount();
dn.triggerBlockReport(
new BlockReportOptions.Factory().setIncremental(false).build());
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return lastCount != storage.getBlockReportCount();
}
}, 10, 100);
} catch (TimeoutException te) {
return false;
}
return true;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -34,8 +35,20 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
@ -60,8 +73,12 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -831,4 +848,158 @@ public class TestBlockManager {
Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks,
null, excessTypes));
}
@Test
public void testBlockReportQueueing() throws Exception {
Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
final FSNamesystem fsn = cluster.getNamesystem();
final BlockManager bm = fsn.getBlockManager();
final ExecutorService executor = Executors.newCachedThreadPool();
final CyclicBarrier startBarrier = new CyclicBarrier(2);
final CountDownLatch endLatch = new CountDownLatch(3);
// create a task intended to block while processing, thus causing
// the queue to backup. simulates how a full BR is processed.
FutureTask<?> blockingOp = new FutureTask<Void>(
new Callable<Void>(){
@Override
public Void call() throws IOException {
return bm.runBlockOp(new Callable<Void>() {
@Override
public Void call()
throws InterruptedException, BrokenBarrierException {
// use a barrier to control the blocking.
startBarrier.await();
endLatch.countDown();
return null;
}
});
}
});
// create an async task. simulates how an IBR is processed.
Callable<?> asyncOp = new Callable<Void>(){
@Override
public Void call() throws IOException {
bm.enqueueBlockOp(new Runnable() {
@Override
public void run() {
// use the latch to signal if the op has run.
endLatch.countDown();
}
});
return null;
}
};
// calling get forces its execution so we can test if it's blocked.
Future<?> blockedFuture = executor.submit(blockingOp);
boolean isBlocked = false;
try {
// wait 1s for the future to block. it should run instantaneously.
blockedFuture.get(1, TimeUnit.SECONDS);
} catch (TimeoutException te) {
isBlocked = true;
}
assertTrue(isBlocked);
// should effectively return immediately since calls are queued.
// however they should be backed up in the queue behind the blocking
// operation.
executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
// check the async calls are queued, and first is still blocked.
assertEquals(2, bm.getBlockOpQueueLength());
assertFalse(blockedFuture.isDone());
// unblock the queue, wait for last op to complete, check the blocked
// call has returned
startBarrier.await(1, TimeUnit.SECONDS);
assertTrue(endLatch.await(1, TimeUnit.SECONDS));
assertEquals(0, bm.getBlockOpQueueLength());
assertTrue(blockingOp.isDone());
} finally {
cluster.shutdown();
}
}
// spam the block manager with IBRs to verify queuing is occurring.
@Test
public void testAsyncIBR() throws Exception {
Logger.getRootLogger().setLevel(Level.WARN);
// will create files with many small blocks.
final int blkSize = 4*1024;
final int fileSize = blkSize * 100;
final byte[] buf = new byte[2*blkSize];
final int numWriters = 4;
final int repl = 3;
final CyclicBarrier barrier = new CyclicBarrier(numWriters);
final CountDownLatch writeLatch = new CountDownLatch(numWriters);
final AtomicBoolean failure = new AtomicBoolean();
final Configuration conf = new HdfsConfiguration();
conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, blkSize);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(8).build();
try {
cluster.waitActive();
// create multiple writer threads to create a file with many blocks.
// will test that concurrent writing causes IBR batching in the NN
Thread[] writers = new Thread[numWriters];
for (int i=0; i < writers.length; i++) {
final Path p = new Path("/writer"+i);
writers[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
FileSystem fs = cluster.getFileSystem();
FSDataOutputStream os =
fs.create(p, true, buf.length, (short)repl, blkSize);
// align writers for maximum chance of IBR batching.
barrier.await();
int remaining = fileSize;
while (remaining > 0) {
os.write(buf);
remaining -= buf.length;
}
os.close();
} catch (Exception e) {
e.printStackTrace();
failure.set(true);
}
// let main thread know we are done.
writeLatch.countDown();
}
});
writers[i].start();
}
// when and how many IBRs are queued is indeterminate, so just watch
// the metrics and verify something was queued at during execution.
boolean sawQueued = false;
while (!writeLatch.await(10, TimeUnit.MILLISECONDS)) {
assertFalse(failure.get());
MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
long queued = MetricsAsserts.getIntGauge("BlockOpsQueued", rb);
sawQueued |= (queued > 0);
}
assertFalse(failure.get());
assertTrue(sawQueued);
// verify that batching of the IBRs occurred.
MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb);
assertTrue(batched > 0);
} finally {
cluster.shutdown();
}
}
}

View File

@ -304,7 +304,8 @@ public class TestPendingReplication {
reportDnNum++;
}
}
// IBRs are async, make sure the NN processes all of them.
cluster.getNamesystem().getBlockManager().flushBlockOps();
assertEquals(DATANODE_COUNT - 3,
blkManager.pendingReplications.getNumReplicas(blocks[0]));
@ -322,6 +323,7 @@ public class TestPendingReplication {
}
}
cluster.getNamesystem().getBlockManager().flushBlockOps();
assertEquals(DATANODE_COUNT - 3,
blkManager.pendingReplications.getNumReplicas(blocks[0]));

View File

@ -113,9 +113,13 @@ public abstract class BlockReportTestBase {
@After
public void shutDownCluster() throws IOException {
fs.close();
cluster.shutdownDataNodes();
cluster.shutdown();
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdownDataNodes();
cluster.shutdown();
}
}
protected static void resetConfiguration() {

View File

@ -26,7 +26,6 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -187,7 +186,9 @@ public class TestIncrementalBrVariations {
}
// Make sure that the deleted block from each storage was picked up
// by the NameNode.
// by the NameNode. IBRs are async, make sure the NN processes
// all of them.
cluster.getNamesystem().getBlockManager().flushBlockOps();
assertThat(cluster.getNamesystem().getMissingBlocksCount(),
is((long) reports.length));
}
@ -256,7 +257,8 @@ public class TestIncrementalBrVariations {
// Send the report to the NN.
cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
// IBRs are async, make sure the NN processes all of them.
cluster.getNamesystem().getBlockManager().flushBlockOps();
// Make sure that the NN has learned of the new storage.
DatanodeStorageInfo storageInfo = cluster.getNameNode()
.getNamesystem()

View File

@ -101,13 +101,13 @@ public class TestDeadDatanode {
StorageReceivedDeletedBlocks[] storageBlocks = {
new StorageReceivedDeletedBlocks(reg.getDatanodeUuid(), blocks) };
// Ensure blockReceived call from dead datanode is rejected with IOException
try {
dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
fail("Expected IOException is not thrown");
} catch (IOException ex) {
// Expected
}
// Ensure blockReceived call from dead datanode is not rejected with
// IOException, since it's async, but the node remains unregistered.
dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
BlockManager bm = cluster.getNamesystem().getBlockManager();
// IBRs are async, make sure the NN processes all of them.
bm.flushBlockOps();
assertFalse(bm.getDatanodeManager().getDatanode(reg).isRegistered());
// Ensure blockReport from dead datanode is rejected with IOException
StorageBlockReport[] report = { new StorageBlockReport(