HDFS-8646. Prune cached replicas from DatanodeDescriptor state on replica invalidation.

This commit is contained in:
Andrew Wang 2015-06-24 14:42:33 -07:00
parent 4c659ddbf7
commit afe9ea3c12
10 changed files with 122 additions and 15 deletions

View File

@ -949,6 +949,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8542. WebHDFS getHomeDirectory behavior does not match specification.
(Kanaka Kumar Avvaru via jghoman)
HDFS-8546. Prune cached replicas from DatanodeDescriptor state on replica
invalidation. (wang)
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@ -3108,6 +3109,19 @@ public void removeStoredBlock(Block block, DatanodeDescriptor node) {
return;
}
CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
.get(new CachedBlock(block.getBlockId(), (short) 0, false));
if (cblock != null) {
boolean removed = false;
removed |= node.getPendingCached().remove(cblock);
removed |= node.getCached().remove(cblock);
removed |= node.getPendingUncached().remove(cblock);
if (removed) {
blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching "
+ "related lists on node {}", block, node);
}
}
//
// It's possible that the block was removed because of a datanode
// failure. If the block is still valid, check if replication is

View File

@ -709,8 +709,10 @@ private void offerService() throws Exception {
}
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
if (!dn.areCacheReportsDisabledForTests()) {
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
}
//
// There is no work to do; sleep until hearbeat timer elapses,

View File

@ -301,6 +301,7 @@ public static InetSocketAddress createSocketAddr(String target) {
ThreadGroup threadGroup = null;
private DNConf dnConf;
private volatile boolean heartbeatsDisabledForTests = false;
private volatile boolean cacheReportsDisabledForTests = false;
private DataStorage storage = null;
private DatanodeHttpServer httpServer = null;
@ -1055,15 +1056,27 @@ private BPOfferService getBPOSForBlock(ExtendedBlock block)
// used only for testing
@VisibleForTesting
void setHeartbeatsDisabledForTests(
boolean heartbeatsDisabledForTests) {
this.heartbeatsDisabledForTests = heartbeatsDisabledForTests;
}
@VisibleForTesting
boolean areHeartbeatsDisabledForTests() {
return this.heartbeatsDisabledForTests;
}
@VisibleForTesting
void setCacheReportsDisabledForTest(boolean disabled) {
this.cacheReportsDisabledForTests = disabled;
}
@VisibleForTesting
boolean areCacheReportsDisabledForTests() {
return this.cacheReportsDisabledForTests;
}
/**
* This method starts the data node with the specified conf.
*

View File

@ -61,6 +61,7 @@
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
@ -902,9 +903,26 @@ public void setCachedLocations(LocatedBlock block) {
if (cachedBlock == null) {
return;
}
List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED);
for (DatanodeDescriptor datanode : datanodes) {
block.addCachedLoc(datanode);
List<DatanodeDescriptor> cachedDNs = cachedBlock.getDatanodes(Type.CACHED);
for (DatanodeDescriptor datanode : cachedDNs) {
// Filter out cached blocks that do not have a backing replica.
//
// This should not happen since it means the CacheManager thinks
// something is cached that does not exist, but it's a safety
// measure.
boolean found = false;
for (DatanodeInfo loc : block.getLocations()) {
if (loc.equals(datanode)) {
block.addCachedLoc(loc);
found = true;
break;
}
}
if (!found) {
LOG.warn("Datanode {} is not a valid cache location for block {} "
+ "because that node does not have a backing replica!",
datanode, block.getBlock().getBlockName());
}
}
}

View File

@ -6460,6 +6460,7 @@ public void setFSDirectory(FSDirectory dir) {
this.dir = dir;
}
/** @return the cache manager. */
@Override
public CacheManager getCacheManager() {
return cacheManager;
}

View File

@ -29,21 +29,23 @@
@InterfaceAudience.Private
public interface Namesystem extends RwLock, SafeMode {
/** Is this name system running? */
public boolean isRunning();
boolean isRunning();
/** Check if the user has superuser privilege. */
public void checkSuperuserPrivilege() throws AccessControlException;
void checkSuperuserPrivilege() throws AccessControlException;
/** @return the block pool ID */
public String getBlockPoolId();
String getBlockPoolId();
public boolean isInStandbyState();
boolean isInStandbyState();
public boolean isGenStampInFuture(Block block);
boolean isGenStampInFuture(Block block);
public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
public void checkOperation(OperationCategory read) throws StandbyException;
void checkOperation(OperationCategory read) throws StandbyException;
public boolean isInSnapshot(BlockInfoUnderConstruction blockUC);
boolean isInSnapshot(BlockInfoUnderConstruction blockUC);
CacheManager getCacheManager();
}

View File

@ -79,6 +79,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FsShell;
@ -526,6 +527,23 @@ public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
}
}
public static void waitForReplication(final DistributedFileSystem dfs,
final Path file, final short replication, int waitForMillis)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
FileStatus stat = dfs.getFileStatus(file);
return replication == stat.getReplication();
} catch (IOException e) {
LOG.info("getFileStatus on path " + file + " failed!", e);
return false;
}
}
}, 100, waitForMillis);
}
/**
* Keep accessing the given file until the namenode reports that the
* given block in the file contains the given number of corrupt replicas.

View File

@ -23,6 +23,7 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -53,6 +54,16 @@ public static void setHeartbeatsDisabledForTests(DataNode dn,
dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests);
}
/**
* Set if cache reports are disabled for all DNs in a mini cluster.
*/
public static void setCacheReportsDisabledForTests(MiniDFSCluster cluster,
boolean disabled) {
for (DataNode dn : cluster.getDataNodes()) {
dn.setCacheReportsDisabledForTest(disabled);
}
}
public static void triggerDeletionReport(DataNode dn) throws IOException {
for (BPOfferService bpos : dn.getAllBpOs()) {
bpos.triggerDeletionReportForTests();

View File

@ -76,6 +76,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
@ -1510,4 +1511,28 @@ public void testExceedsCapacity() throws Exception {
Thread.sleep(1000);
checkPendingCachedEmpty(cluster);
}
@Test(timeout=60000)
public void testNoBackingReplica() throws Exception {
// Cache all three replicas for a file.
final Path filename = new Path("/noback");
final short replication = (short) 3;
DFSTestUtil.createFile(dfs, filename, 1, replication, 0x0BAC);
dfs.addCachePool(new CachePoolInfo("pool"));
dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().setPool("pool").setPath(filename)
.setReplication(replication).build());
waitForCachedBlocks(namenode, 1, replication, "testNoBackingReplica:1");
// Pause cache reports while we change the replication factor.
// This will orphan some cached replicas.
DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, true);
try {
dfs.setReplication(filename, (short) 1);
DFSTestUtil.waitForReplication(dfs, filename, (short) 1, 30000);
// The cache locations should drop down to 1 even without cache reports.
waitForCachedBlocks(namenode, 1, (short) 1, "testNoBackingReplica:2");
} finally {
DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, false);
}
}
}