HDFS-10301. Remove FBR tracking state to fix false zombie storage detection for interleaving block reports. Contributed by Vinitha Gankidi.

(cherry picked from commit 391ce535a7)
This commit is contained in:
Vinitha Reddy Gankidi 2016-10-14 10:37:44 -07:00 committed by Konstantin V Shvachko
parent b9fdbd7107
commit 2304501bcf
7 changed files with 144 additions and 127 deletions

View File

@ -61,7 +61,6 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@ -1233,6 +1232,8 @@ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
invalidateBlocks.remove(node, block); invalidateBlocks.remove(node, block);
} }
namesystem.checkSafeMode(); namesystem.checkSafeMode();
LOG.info("Removed blocks associated with storage {} from DataNode {}",
storageInfo, node);
} }
/** /**
@ -1942,8 +1943,8 @@ static class StatefulBlockInfo {
*/ */
public boolean processReport(final DatanodeID nodeID, public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage, final DatanodeStorage storage,
final BlockListAsLongs newReport, BlockReportContext context, final BlockListAsLongs newReport,
boolean lastStorageInRpc) throws IOException { BlockReportContext context) throws IOException {
namesystem.writeLock(); namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime; final long endTime;
@ -1997,32 +1998,6 @@ public boolean processReport(final DatanodeID nodeID,
} }
storageInfo.receivedBlockReport(); storageInfo.receivedBlockReport();
if (context != null) {
storageInfo.setLastBlockReportId(context.getReportId());
if (lastStorageInRpc) {
int rpcsSeen = node.updateBlockReportContext(context);
if (rpcsSeen >= context.getTotalRpcs()) {
long leaseId = blockReportLeaseManager.removeLease(node);
BlockManagerFaultInjector.getInstance().
removeBlockReportLease(node, leaseId);
List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
if (zombies.isEmpty()) {
LOG.debug("processReport 0x{}: no zombie storages found.",
Long.toHexString(context.getReportId()));
} else {
for (DatanodeStorageInfo zombie : zombies) {
removeZombieReplicas(context, zombie);
}
}
node.clearBlockReportContext();
} else {
LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
"report.", Long.toHexString(context.getReportId()),
(context.getTotalRpcs() - rpcsSeen)
);
}
}
}
} finally { } finally {
endTime = Time.monotonicNow(); endTime = Time.monotonicNow();
namesystem.writeUnlock(); namesystem.writeUnlock();
@ -2047,30 +2022,25 @@ public boolean processReport(final DatanodeID nodeID,
return !node.hasStaleStorages(); return !node.hasStaleStorages();
} }
private void removeZombieReplicas(BlockReportContext context, public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
DatanodeStorageInfo zombie) { final BlockReportContext context) throws IOException {
LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + namesystem.writeLock();
"longer exists on the DataNode.", DatanodeDescriptor node;
Long.toHexString(context.getReportId()), zombie.getStorageID()); try {
assert(namesystem.hasWriteLock()); node = datanodeManager.getDatanode(nodeID);
Iterator<BlockInfo> iter = zombie.getBlockIterator(); if (context != null) {
int prevBlocks = zombie.numBlocks(); if (context.getTotalRpcs() == context.getCurRpc() + 1) {
while (iter.hasNext()) { long leaseId = this.getBlockReportLeaseManager().removeLease(node);
BlockInfo block = iter.next(); BlockManagerFaultInjector.getInstance().
// We assume that a block can be on only one storage in a DataNode. removeBlockReportLease(node, leaseId);
// That's why we pass in the DatanodeDescriptor rather than the }
// DatanodeStorageInfo. LOG.debug("Processing RPC with index {} out of total {} RPCs in "
// TODO: remove this assumption in case we want to put a block on + "processReport 0x{}", context.getCurRpc(),
// more than one storage on a datanode (and because it's a difficult context.getTotalRpcs(), Long.toHexString(context.getReportId()));
// assumption to really enforce) }
removeStoredBlock(block, zombie.getDatanodeDescriptor()); } finally {
invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block); namesystem.writeUnlock();
} }
assert(zombie.numBlocks() == 0);
LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
"which no longer exists on the DataNode.",
Long.toHexString(context.getReportId()), prevBlocks,
zombie.getStorageID());
} }
/** /**

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -31,7 +30,6 @@
import java.util.Set; import java.util.Set;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -41,7 +39,6 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -53,8 +50,6 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/** /**
* This class extends the DatanodeInfo class with ephemeral information (eg * This class extends the DatanodeInfo class with ephemeral information (eg
* health, capacity, what blocks are associated with the Datanode) that is * health, capacity, what blocks are associated with the Datanode) that is
@ -67,8 +62,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
LoggerFactory.getLogger(DatanodeDescriptor.class); LoggerFactory.getLogger(DatanodeDescriptor.class);
public static final DatanodeDescriptor[] EMPTY_ARRAY = {}; public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
private static final List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
ImmutableList.of();
/** Block and targets pair */ /** Block and targets pair */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -153,10 +146,6 @@ public Type getType() {
public final DecommissioningStatus decommissioningStatus = public final DecommissioningStatus decommissioningStatus =
new DecommissioningStatus(); new DecommissioningStatus();
private long curBlockReportId = 0;
private BitSet curBlockReportRpcsSeen = null;
private final Map<String, DatanodeStorageInfo> storageMap = private final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<>(); new HashMap<>();
@ -253,20 +242,6 @@ public DatanodeDescriptor(DatanodeID nodeID,
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null); updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
} }
public int updateBlockReportContext(BlockReportContext context) {
if (curBlockReportId != context.getReportId()) {
curBlockReportId = context.getReportId();
curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
}
curBlockReportRpcsSeen.set(context.getCurRpc());
return curBlockReportRpcsSeen.cardinality();
}
public void clearBlockReportContext() {
curBlockReportId = 0;
curBlockReportRpcsSeen = null;
}
public CachedBlocksList getPendingCached() { public CachedBlocksList getPendingCached() {
return pendingCached; return pendingCached;
} }
@ -330,31 +305,6 @@ boolean hasStaleStorages() {
} }
} }
List<DatanodeStorageInfo> removeZombieStorages() {
List<DatanodeStorageInfo> zombies = null;
synchronized (storageMap) {
Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
storageMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
DatanodeStorageInfo storageInfo = entry.getValue();
if (storageInfo.getLastBlockReportId() != curBlockReportId) {
LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}",
storageInfo.getStorageID(),
Long.toHexString(storageInfo.getLastBlockReportId()),
Long.toHexString(curBlockReportId));
iter.remove();
if (zombies == null) {
zombies = new LinkedList<>();
}
zombies.add(storageInfo);
}
storageInfo.setLastBlockReportId(0);
}
}
return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
}
public void resetBlocks() { public void resetBlocks() {
setCapacity(0); setCapacity(0);
setRemaining(0); setRemaining(0);

View File

@ -123,9 +123,6 @@ public void remove() {
private volatile BlockInfo blockList = null; private volatile BlockInfo blockList = null;
private int numBlocks = 0; private int numBlocks = 0;
// The ID of the last full block report which updated this storage.
private long lastBlockReportId = 0;
/** The number of block reports received */ /** The number of block reports received */
private int blockReportCount = 0; private int blockReportCount = 0;
@ -190,14 +187,6 @@ public void setUtilizationForTesting(long capacity, long dfsUsed,
this.blockPoolUsed = blockPoolUsed; this.blockPoolUsed = blockPoolUsed;
} }
long getLastBlockReportId() {
return lastBlockReportId;
}
void setLastBlockReportId(long lastBlockReportId) {
this.lastBlockReportId = lastBlockReportId;
}
State getState() { State getState() {
return this.state; return this.state;
} }

View File

@ -1424,11 +1424,13 @@ public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
@Override @Override
public Boolean call() throws IOException { public Boolean call() throws IOException {
return bm.processReport(nodeReg, reports[index].getStorage(), return bm.processReport(nodeReg, reports[index].getStorage(),
blocks, context, (index == reports.length - 1)); blocks, context);
} }
}); });
metrics.incrStorageBlockReportOps(); metrics.incrStorageBlockReportOps();
} }
bm.removeBRLeaseIfNeeded(nodeReg, context);
BlockManagerFaultInjector.getInstance(). BlockManagerFaultInjector.getInstance().
incomingBlockReportRpc(nodeReg, context); incomingBlockReportRpc(nodeReg, context);

View File

@ -83,6 +83,7 @@
import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile; import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -713,12 +714,12 @@ public void testSafeModeIBR() throws Exception {
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, false); BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed // send block report again, should NOT be processed
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, false); BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
// re-register as if node restarted, should update existing node // re-register as if node restarted, should update existing node
@ -729,7 +730,7 @@ public void testSafeModeIBR() throws Exception {
// send block report, should be processed after restart // send block report, should be processed after restart
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, false); BlockListAsLongs.EMPTY, null);
// Reinitialize as registration with empty storage list pruned // Reinitialize as registration with empty storage list pruned
// node.storageMap. // node.storageMap.
ds = node.getStorageInfos()[0]; ds = node.getStorageInfos()[0];
@ -758,7 +759,7 @@ public void testSafeModeIBRAfterIncremental() throws Exception {
reset(node); reset(node);
doReturn(1).when(node).numBlocks(); doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, false); BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
} }
@ -831,7 +832,8 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception {
// Make sure it's the first full report // Make sure it's the first full report
assertEquals(0, ds.getBlockReportCount()); assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
builder.build(), null, false); builder.build(),
new BlockReportContext(1, 0, System.nanoTime(), 0));
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
// verify the storage info is correct // verify the storage info is correct

View File

@ -19,23 +19,22 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -46,7 +45,6 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.BufferedOutputStream;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.File; import java.io.File;
@ -55,13 +53,11 @@
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -159,6 +155,8 @@ public void testStorageWithBlocksIsNotPruned() throws IOException {
public void testRemovingStorageDoesNotProduceZombies() throws Exception { public void testRemovingStorageDoesNotProduceZombies() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
1000);
final int NUM_STORAGES_PER_DN = 2; final int NUM_STORAGES_PER_DN = 2;
final MiniDFSCluster cluster = new MiniDFSCluster final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf).numDataNodes(3) .Builder(conf).numDataNodes(3)
@ -257,7 +255,7 @@ public Boolean get() {
assertEquals(NUM_STORAGES_PER_DN - 1, infos.length); assertEquals(NUM_STORAGES_PER_DN - 1, infos.length);
return true; return true;
} }
}, 10, 30000); }, 1000, 30000);
} finally { } finally {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
@ -365,4 +363,60 @@ public Boolean get() {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout=300000)
public void testNameNodePrunesUnreportedStorages() throws Exception {
Configuration conf = new HdfsConfiguration();
// Create a cluster with one datanode with two storages
MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf).numDataNodes(1)
.storagesPerDatanode(2)
.build();
// Create two files to ensure each storage has a block
DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file1"),
102400, 102400, 102400, (short)1,
0x1BAD5EE);
DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file2"),
102400, 102400, 102400, (short)1,
0x1BAD5EED);
// Get the datanode storages and data directories
DataNode dn = cluster.getDataNodes().get(0);
BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager();
DatanodeDescriptor dnDescriptor = bm.getDatanodeManager().
getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid());
DatanodeStorageInfo[] dnStoragesInfosBeforeRestart =
dnDescriptor.getStorageInfos();
Collection<String> oldDirs = new ArrayList<String>(dn.getConf().
getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
// Keep the first data directory and remove the second.
String newDirs = oldDirs.iterator().next();
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
// Restart the datanode with the new conf
cluster.stopDataNode(0);
cluster.startDataNodes(conf, 1, false, null, null);
dn = cluster.getDataNodes().get(0);
cluster.waitActive();
// Assert that the dnDescriptor has both the storages after restart
assertArrayEquals(dnStoragesInfosBeforeRestart,
dnDescriptor.getStorageInfos());
// Assert that the removed storage is marked as FAILED
// when DN heartbeats to the NN
int numFailedStoragesWithBlocks = 0;
DatanodeStorageInfo failedStorageInfo = null;
for (DatanodeStorageInfo dnStorageInfo: dnDescriptor.getStorageInfos()) {
if (dnStorageInfo.areBlocksOnFailedStorage()) {
numFailedStoragesWithBlocks++;
failedStorageInfo = dnStorageInfo;
}
}
assertEquals(1, numFailedStoragesWithBlocks);
// Heartbeat manager removes the blocks associated with this failed storage
bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
assertTrue(!failedStorageInfo.areBlocksOnFailedStorage());
// pruneStorageMap removes the unreported storage
cluster.triggerHeartbeats();
// Assert that the unreported storage is pruned
assertEquals(DataNode.getStorageLocations(dn.getConf()).size(),
dnDescriptor.getStorageInfos().length);
}
} }

View File

@ -29,7 +29,12 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -50,7 +55,10 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@ -649,6 +657,48 @@ protected Object passThrough(InvocationOnMock invocation)
DFSTestUtil.readFile(fs, filePath); DFSTestUtil.readFile(fs, filePath);
} }
// See HDFS-10301
@Test(timeout = 300000)
public void testInterleavedBlockReports()
throws IOException, ExecutionException, InterruptedException {
int numConcurrentBlockReports = 3;
DataNode dn = cluster.getDataNodes().get(DN_N0);
final String poolId = cluster.getNamesystem().getBlockPoolId();
LOG.info("Block pool id: " + poolId);
final DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
final StorageBlockReport[] reports =
getBlockReports(dn, poolId, true, true);
// Get the list of storage ids associated with the datanode
// before the test
BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager();
final DatanodeDescriptor dnDescriptor =
bm.getDatanodeManager().getDatanode(dn.getDatanodeId());
DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos();
// Send the block report concurrently using
// numThreads=numConcurrentBlockReports
ExecutorService executorService =
Executors.newFixedThreadPool(numConcurrentBlockReports);
List<Future<Void>> futureList = new ArrayList<>(numConcurrentBlockReports);
for (int i = 0; i < numConcurrentBlockReports; i++) {
futureList.add(executorService.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
sendBlockReports(dnR, poolId, reports);
return null;
}
}));
}
for (Future<Void> future : futureList) {
future.get();
}
executorService.shutdown();
// Verify that the storages match before and after the test
Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos());
}
private void waitForTempReplica(Block bl, int DN_N1) throws IOException { private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
final boolean tooLongWait = false; final boolean tooLongWait = false;
final int TIMEOUT = 40000; final int TIMEOUT = 40000;