HDFS-6107. When a block cannot be cached due to limited space on the DataNode, it becomes uncacheable (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1578515 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-03-17 19:00:53 +00:00
parent ed83fe61e2
commit 453c36b191
6 changed files with 106 additions and 14 deletions

View File

@ -445,6 +445,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5981. PBImageXmlWriter generates malformed XML.
(Haohui Mai via cnauroth)
HDFS-6107. When a block can't be cached due to limited space on the
DataNode, that block becomes uncacheable (cmccabe)
BREAKDOWN OF HDFS-4685 SUBTASKS AND RELATED JIRAS
HDFS-5596. Implement RPC stubs. (Haohui Mai via cnauroth)

View File

@ -598,14 +598,12 @@ class BPOfferService {
blockIdCmd.getBlockPoolId() + " of [" +
blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length);
break;
case DatanodeProtocol.DNA_UNCACHE:
LOG.info("DatanodeCommand action: DNA_UNCACHE for " +
blockIdCmd.getBlockPoolId() + " of [" +
blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length);
break;
case DatanodeProtocol.DNA_SHUTDOWN:
// TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command

View File

@ -1041,7 +1041,7 @@ public class DataNode extends Configured
}
}
DataNodeMetrics getMetrics() {
public DataNodeMetrics getMetrics() {
return metrics;
}

View File

@ -337,15 +337,16 @@ public class FsDatasetCache {
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
key.getBlockId(), length, genstamp);
long newUsedBytes = usedBytesCount.reserve(length);
boolean reservedBytes = false;
try {
if (newUsedBytes < 0) {
LOG.warn("Failed to cache " + key + ": could not reserve " + length +
" more bytes in the cache: " +
DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
" of " + maxBytes + " exceeded.");
numBlocksFailedToCache.incrementAndGet();
return;
}
try {
reservedBytes = true;
try {
blockIn = (FileInputStream)dataset.getBlockInputStream(extBlk, 0);
metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk)
@ -391,10 +392,13 @@ public class FsDatasetCache {
}
dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key);
numBlocksCached.addAndGet(1);
dataset.datanode.getMetrics().incrBlocksCached(1);
success = true;
} finally {
if (!success) {
if (reservedBytes) {
newUsedBytes = usedBytesCount.release(length);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Caching of " + key + " was aborted. We are now " +
"caching only " + newUsedBytes + " + bytes in total.");
@ -439,6 +443,7 @@ public class FsDatasetCache {
long newUsedBytes =
usedBytesCount.release(value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1);
dataset.datanode.getMetrics().incrBlocksUncached(1);
if (LOG.isDebugEnabled()) {
LOG.debug("Uncaching of " + key + " completed. " +
"usedBytes = " + newUsedBytes);

View File

@ -1143,6 +1143,11 @@ public class DFSTestUtil {
}
return false;
}
LOG.info("verifyExpectedCacheUsage: got " +
curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " +
curBlocks + "/" + expectedBlocks + " blocks cached. " +
"memlock limit = " +
NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
return true;
}
}, 100, 60000);

View File

@ -40,12 +40,15 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@ -80,6 +83,7 @@ import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import com.google.common.base.Supplier;
import com.google.common.primitives.Ints;
public class TestFsDatasetCache {
private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
@ -349,10 +353,13 @@ public class TestFsDatasetCache {
fsd.getNumBlocksFailedToCache() > 0);
// Uncache the n-1 files
int curCachedBlocks = 16;
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
total -= rounder.round(fileSizes[i]);
DFSTestUtil.verifyExpectedCacheUsage(total, 4 * (numFiles - 2 - i), fsd);
long uncachedBytes = rounder.round(fileSizes[i]);
total -= uncachedBytes;
curCachedBlocks -= uncachedBytes / BLOCK_SIZE;
DFSTestUtil.verifyExpectedCacheUsage(total, curCachedBlocks, fsd);
}
LOG.info("finishing testFilesExceedMaxLockedMemory");
}
@ -491,4 +498,78 @@ public class TestFsDatasetCache {
MetricsAsserts.assertCounter("BlocksCached", 1l, dnMetrics);
MetricsAsserts.assertCounter("BlocksUncached", 1l, dnMetrics);
}
@Test(timeout=60000)
public void testReCacheAfterUncache() throws Exception {
final int TOTAL_BLOCKS_PER_CACHE =
Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
BlockReaderTestUtil.enableHdfsCachingTracing();
Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
// Create a small file
final Path SMALL_FILE = new Path("/smallFile");
DFSTestUtil.createFile(fs, SMALL_FILE,
BLOCK_SIZE, (short)1, 0xcafe);
// Create a file that will take up the whole cache
final Path BIG_FILE = new Path("/bigFile");
DFSTestUtil.createFile(fs, BIG_FILE,
TOTAL_BLOCKS_PER_CACHE * BLOCK_SIZE, (short)1, 0xbeef);
final DistributedFileSystem dfs = cluster.getFileSystem();
dfs.addCachePool(new CachePoolInfo("pool"));
final long bigCacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
.setPool("pool").setPath(BIG_FILE).setReplication((short)1).build());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksCached =
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
if (blocksCached != TOTAL_BLOCKS_PER_CACHE) {
LOG.info("waiting for " + TOTAL_BLOCKS_PER_CACHE + " to " +
"be cached. Right now only " + blocksCached + " blocks are cached.");
return false;
}
LOG.info(TOTAL_BLOCKS_PER_CACHE + " blocks are now cached.");
return true;
}
}, 1000, 30000);
// Try to cache a smaller file. It should fail.
final long shortCacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
.setPool("pool").setPath(SMALL_FILE).setReplication((short)1).build());
Thread.sleep(10000);
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
Assert.assertEquals(TOTAL_BLOCKS_PER_CACHE,
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics));
// Uncache the big file and verify that the small file can now be
// cached (regression test for HDFS-6107)
dfs.removeCacheDirective(bigCacheDirectiveId);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
RemoteIterator<CacheDirectiveEntry> iter;
try {
iter = dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().build());
CacheDirectiveEntry entry;
do {
entry = iter.next();
} while (entry.getInfo().getId() != shortCacheDirectiveId);
if (entry.getStats().getFilesCached() != 1) {
LOG.info("waiting for directive " + shortCacheDirectiveId +
" to be cached. stats = " + entry.getStats());
return false;
}
LOG.info("directive " + shortCacheDirectiveId + " has been cached.");
} catch (IOException e) {
Assert.fail("unexpected exception" + e.toString());
}
return true;
}
}, 1000, 30000);
}
}