HDFS-5366. recaching improvements (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1541647 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-11-13 18:18:37 +00:00
parent 744506f34d
commit 3c591aa442
12 changed files with 228 additions and 86 deletions

View File

@ -114,7 +114,17 @@ public static class POSIX {
public static interface CacheTracker { public static interface CacheTracker {
public void fadvise(String identifier, long offset, long len, int flags); public void fadvise(String identifier, long offset, long len, int flags);
} }
public static CacheManipulator cacheManipulator = new CacheManipulator();
@VisibleForTesting
public static class CacheManipulator {
public void mlock(String identifier, ByteBuffer buffer,
long len) throws IOException {
POSIX.mlock(buffer, len);
}
}
static { static {
if (NativeCodeLoader.isNativeCodeLoaded()) { if (NativeCodeLoader.isNativeCodeLoaded()) {
try { try {
@ -249,7 +259,7 @@ static native void munlock_native(
* *
* @throws NativeIOException * @throws NativeIOException
*/ */
public static void mlock(ByteBuffer buffer, long len) static void mlock(ByteBuffer buffer, long len)
throws IOException { throws IOException {
assertCodeLoaded(); assertCodeLoaded();
if (!buffer.isDirect()) { if (!buffer.isDirect()) {

View File

@ -194,6 +194,8 @@ Trunk (Unreleased)
HDFS-5485. Add command-line support for modifyDirective. (cmccabe) HDFS-5485. Add command-line support for modifyDirective. (cmccabe)
HDFS-5366. recaching improvements (cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)

View File

@ -342,6 +342,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true; public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval"; public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3; public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
public static final String DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS = "dfs.namenode.path.based.cache.retry.interval.ms";
public static final long DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 60000L;
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval"; public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
public static final int DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT = 30; public static final int DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT = 30;
public static final String DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY = "dfs.namenode.decommission.nodes.per.interval"; public static final String DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY = "dfs.namenode.decommission.nodes.per.interval";

View File

@ -204,6 +204,7 @@ private void rescan() {
namesystem.writeLock(); namesystem.writeLock();
try { try {
rescanCachedBlockMap(); rescanCachedBlockMap();
blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();
} }
@ -316,17 +317,21 @@ private void rescanCachedBlockMap() {
int numCached = cached.size(); int numCached = cached.size();
if (numCached >= neededCached) { if (numCached >= neededCached) {
// If we have enough replicas, drop all pending cached. // If we have enough replicas, drop all pending cached.
for (DatanodeDescriptor datanode : pendingCached) { for (Iterator<DatanodeDescriptor> iter = pendingCached.iterator();
iter.hasNext(); ) {
DatanodeDescriptor datanode = iter.next();
datanode.getPendingCached().remove(cblock); datanode.getPendingCached().remove(cblock);
iter.remove();
} }
pendingCached.clear();
} }
if (numCached < neededCached) { if (numCached < neededCached) {
// If we don't have enough replicas, drop all pending uncached. // If we don't have enough replicas, drop all pending uncached.
for (DatanodeDescriptor datanode : pendingUncached) { for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator();
iter.hasNext(); ) {
DatanodeDescriptor datanode = iter.next();
datanode.getPendingUncached().remove(cblock); datanode.getPendingUncached().remove(cblock);
iter.remove();
} }
pendingUncached.clear();
} }
int neededUncached = numCached - int neededUncached = numCached -
(pendingUncached.size() + neededCached); (pendingUncached.size() + neededCached);

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/** /**
* This class extends the DatanodeInfo class with ephemeral information (eg * This class extends the DatanodeInfo class with ephemeral information (eg
@ -159,6 +158,12 @@ public CachedBlocksList getPendingUncached() {
return pendingUncached; return pendingUncached;
} }
/**
* The time when the last batch of caching directives was sent, in
* monotonic milliseconds.
*/
private long lastCachingDirectiveSentTimeMs;
/** /**
* Head of the list of blocks on the datanode * Head of the list of blocks on the datanode
*/ */
@ -696,4 +701,20 @@ public String dumpDatanode() {
} }
return sb.toString(); return sb.toString();
} }
/**
* @return The time at which we last sent caching directives to this
* DataNode, in monotonic milliseconds.
*/
public long getLastCachingDirectiveSentTimeMs() {
return this.lastCachingDirectiveSentTimeMs;
}
/**
* @param time The time at which we last sent caching directives to this
* DataNode, in monotonic milliseconds.
*/
public void setLastCachingDirectiveSentTimeMs(long time) {
this.lastCachingDirectiveSentTimeMs = time;
}
} }

View File

@ -149,7 +149,7 @@ public class DatanodeManager {
* Whether we should tell datanodes what to cache in replies to * Whether we should tell datanodes what to cache in replies to
* heartbeat messages. * heartbeat messages.
*/ */
private boolean sendCachingCommands = false; private boolean shouldSendCachingCommands = false;
/** /**
* The number of datanodes for each software version. This list should change * The number of datanodes for each software version. This list should change
@ -159,6 +159,16 @@ public class DatanodeManager {
private HashMap<String, Integer> datanodesSoftwareVersions = private HashMap<String, Integer> datanodesSoftwareVersions =
new HashMap<String, Integer>(4, 0.75f); new HashMap<String, Integer>(4, 0.75f);
/**
* The minimum time between resending caching directives to Datanodes,
* in milliseconds.
*
* Note that when a rescan happens, we will send the new directives
* as soon as possible. This timeout only applies to resending
* directives that we've already sent.
*/
private final long timeBetweenResendingCachingDirectivesMs;
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem, DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException { final Configuration conf) throws IOException {
this.namesystem = namesystem; this.namesystem = namesystem;
@ -241,6 +251,9 @@ public class DatanodeManager {
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY + DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
" = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " + " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
"It should be a positive non-zero float value, not greater than 1.0f."); "It should be a positive non-zero float value, not greater than 1.0f.");
this.timeBetweenResendingCachingDirectivesMs = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT);
} }
private static long getStaleIntervalFromConf(Configuration conf, private static long getStaleIntervalFromConf(Configuration conf,
@ -1307,17 +1320,28 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
blockPoolId, blks)); blockPoolId, blks));
} }
DatanodeCommand pendingCacheCommand = boolean sendingCachingCommands = false;
getCacheCommand(nodeinfo.getPendingCached(), nodeinfo, long nowMs = Time.monotonicNow();
DatanodeProtocol.DNA_CACHE, blockPoolId); if (shouldSendCachingCommands &&
if (pendingCacheCommand != null) { ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
cmds.add(pendingCacheCommand); timeBetweenResendingCachingDirectivesMs)) {
} DatanodeCommand pendingCacheCommand =
DatanodeCommand pendingUncacheCommand = getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo, DatanodeProtocol.DNA_CACHE, blockPoolId);
DatanodeProtocol.DNA_UNCACHE, blockPoolId); if (pendingCacheCommand != null) {
if (pendingUncacheCommand != null) { cmds.add(pendingCacheCommand);
cmds.add(pendingUncacheCommand); sendingCachingCommands = true;
}
DatanodeCommand pendingUncacheCommand =
getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
DatanodeProtocol.DNA_UNCACHE, blockPoolId);
if (pendingUncacheCommand != null) {
cmds.add(pendingUncacheCommand);
sendingCachingCommands = true;
}
if (sendingCachingCommands) {
nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
}
} }
blockManager.addKeyUpdateCommand(cmds, nodeinfo); blockManager.addKeyUpdateCommand(cmds, nodeinfo);
@ -1355,19 +1379,13 @@ private DatanodeCommand getCacheCommand(CachedBlocksList list,
if (length == 0) { if (length == 0) {
return null; return null;
} }
// Read and clear the existing cache commands. // Read the existing cache commands.
long[] blockIds = new long[length]; long[] blockIds = new long[length];
int i = 0; int i = 0;
for (Iterator<CachedBlock> iter = list.iterator(); for (Iterator<CachedBlock> iter = list.iterator();
iter.hasNext(); ) { iter.hasNext(); ) {
CachedBlock cachedBlock = iter.next(); CachedBlock cachedBlock = iter.next();
blockIds[i++] = cachedBlock.getBlockId(); blockIds[i++] = cachedBlock.getBlockId();
iter.remove();
}
if (!sendCachingCommands) {
// Do not send caching commands unless the FSNamesystem told us we
// should.
return null;
} }
return new BlockIdCommand(action, poolId, blockIds); return new BlockIdCommand(action, poolId, blockIds);
} }
@ -1416,12 +1434,24 @@ public void clearPendingQueues() {
} }
} }
/**
* Reset the lastCachingDirectiveSentTimeMs field of all the DataNodes we
* know about.
*/
public void resetLastCachingDirectiveSentTime() {
synchronized (datanodeMap) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.setLastCachingDirectiveSentTimeMs(0L);
}
}
}
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName() + ": " + host2DatanodeMap; return getClass().getSimpleName() + ": " + host2DatanodeMap;
} }
public void setSendCachingCommands(boolean sendCachingCommands) { public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
this.sendCachingCommands = sendCachingCommands; this.shouldSendCachingCommands = shouldSendCachingCommands;
} }
} }

View File

@ -289,6 +289,10 @@ synchronized void cacheBlock(long blockId, String bpid,
mappableBlockMap.put(key, new Value(null, State.CACHING)); mappableBlockMap.put(key, new Value(null, State.CACHING));
volumeExecutor.execute( volumeExecutor.execute(
new CachingTask(key, blockFileName, length, genstamp)); new CachingTask(key, blockFileName, length, genstamp));
if (LOG.isDebugEnabled()) {
LOG.debug("Initiating caching for Block with id " + blockId +
", pool " + bpid);
}
} }
synchronized void uncacheBlock(String bpid, long blockId) { synchronized void uncacheBlock(String bpid, long blockId) {
@ -427,6 +431,10 @@ public void run() {
mappableBlock.close(); mappableBlock.close();
} }
numBlocksFailedToCache.incrementAndGet(); numBlocksFailedToCache.incrementAndGet();
synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key);
}
} }
} }
} }

View File

@ -44,20 +44,6 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class MappableBlock implements Closeable { public class MappableBlock implements Closeable {
public static interface Mlocker {
void mlock(MappedByteBuffer mmap, long length) throws IOException;
}
private static class PosixMlocker implements Mlocker {
public void mlock(MappedByteBuffer mmap, long length)
throws IOException {
NativeIO.POSIX.mlock(mmap, length);
}
}
@VisibleForTesting
public static Mlocker mlocker = new PosixMlocker();
private MappedByteBuffer mmap; private MappedByteBuffer mmap;
private final long length; private final long length;
@ -96,7 +82,7 @@ public static MappableBlock load(long length,
throw new IOException("Block InputStream has no FileChannel."); throw new IOException("Block InputStream has no FileChannel.");
} }
mmap = blockChannel.map(MapMode.READ_ONLY, 0, length); mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
mlocker.mlock(mmap, length); NativeIO.POSIX.cacheManipulator.mlock(blockFileName, mmap, length);
verifyChecksum(length, metaIn, blockChannel, blockFileName); verifyChecksum(length, metaIn, blockChannel, blockFileName);
mappableBlock = new MappableBlock(mmap, length); mappableBlock = new MappableBlock(mmap, length);
} finally { } finally {

View File

@ -1014,7 +1014,7 @@ void startActiveServices() throws IOException {
nnEditLogRoller.start(); nnEditLogRoller.start();
cacheManager.activate(); cacheManager.activate();
blockManager.getDatanodeManager().setSendCachingCommands(true); blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
} finally { } finally {
writeUnlock(); writeUnlock();
startingActiveService = false; startingActiveService = false;
@ -1065,7 +1065,7 @@ void stopActiveServices() {
dir.fsImage.updateLastAppliedTxIdFromWritten(); dir.fsImage.updateLastAppliedTxIdFromWritten();
} }
cacheManager.deactivate(); cacheManager.deactivate();
blockManager.getDatanodeManager().setSendCachingCommands(false); blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
} finally { } finally {
writeUnlock(); writeUnlock();
} }

View File

@ -1516,6 +1516,18 @@
</description> </description>
</property> </property>
<property>
<name>dfs.namenode.path.based.cache.retry.interval.ms</name>
<value>60000</value>
<description>
When the NameNode needs to uncache something that is cached, or cache
something that is not cached, it must direct the DataNodes to do so by
sending a DNA_CACHE or DNA_UNCACHE command in response to a DataNode
heartbeat. This parameter controls how frequently the NameNode will
resend these commands.
</description>
</property>
<property> <property>
<name>dfs.datanode.fsdatasetcache.max.threads.per.volume</name> <name>dfs.datanode.fsdatasetcache.max.threads.per.volume</name>
<value>4</value> <value>4</value>

View File

@ -28,8 +28,10 @@
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.MappedByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -49,7 +51,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock.Mlocker;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
@ -60,6 +61,7 @@
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.test.MetricsAsserts;
@ -87,8 +89,7 @@ public class TestFsDatasetCache {
private static FsDatasetSpi<?> fsd; private static FsDatasetSpi<?> fsd;
private static DatanodeProtocolClientSideTranslatorPB spyNN; private static DatanodeProtocolClientSideTranslatorPB spyNN;
private static PageRounder rounder = new PageRounder(); private static PageRounder rounder = new PageRounder();
private static CacheManipulator prevCacheManipulator;
private Mlocker mlocker;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -96,6 +97,8 @@ public void setUp() throws Exception {
assumeTrue(NativeIO.getMemlockLimit() >= CACHE_CAPACITY); assumeTrue(NativeIO.getMemlockLimit() >= CACHE_CAPACITY);
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
500);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
CACHE_CAPACITY); CACHE_CAPACITY);
@ -113,8 +116,19 @@ public void setUp() throws Exception {
fsd = dn.getFSDataset(); fsd = dn.getFSDataset();
spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn); spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
// Save the current mlocker and replace it at the end of the test
mlocker = MappableBlock.mlocker; prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
// Save the current CacheManipulator and replace it at the end of the test
// Stub out mlock calls to avoid failing when not enough memory is lockable
// by the operating system.
NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
@Override
public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException {
LOG.info("mlocking " + identifier);
}
};
} }
@After @After
@ -125,8 +139,8 @@ public void tearDown() throws Exception {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
// Restore the original mlocker // Restore the original CacheManipulator
MappableBlock.mlocker = mlocker; NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
} }
private static void setHeartbeatResponse(DatanodeCommand[] cmds) private static void setHeartbeatResponse(DatanodeCommand[] cmds)
@ -214,8 +228,7 @@ public Boolean get() {
return expected; return expected;
} }
@Test(timeout=600000) private void testCacheAndUncacheBlock() throws Exception {
public void testCacheAndUncacheBlock() throws Exception {
LOG.info("beginning testCacheAndUncacheBlock"); LOG.info("beginning testCacheAndUncacheBlock");
final int NUM_BLOCKS = 5; final int NUM_BLOCKS = 5;
@ -268,6 +281,42 @@ public void testCacheAndUncacheBlock() throws Exception {
LOG.info("finishing testCacheAndUncacheBlock"); LOG.info("finishing testCacheAndUncacheBlock");
} }
@Test(timeout=600000)
public void testCacheAndUncacheBlockSimple() throws Exception {
testCacheAndUncacheBlock();
}
/**
* Run testCacheAndUncacheBlock with some failures injected into the mlock
* call. This tests the ability of the NameNode to resend commands.
*/
@Test(timeout=600000)
public void testCacheAndUncacheBlockWithRetries() throws Exception {
CacheManipulator prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
try {
NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
private final Set<String> seenIdentifiers = new HashSet<String>();
@Override
public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException {
if (seenIdentifiers.contains(identifier)) {
// mlock succeeds the second time.
LOG.info("mlocking " + identifier);
return;
}
seenIdentifiers.add(identifier);
throw new IOException("injecting IOException during mlock of " +
identifier);
}
};
testCacheAndUncacheBlock();
} finally {
NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
}
}
@Test(timeout=600000) @Test(timeout=600000)
public void testFilesExceedMaxLockedMemory() throws Exception { public void testFilesExceedMaxLockedMemory() throws Exception {
LOG.info("beginning testFilesExceedMaxLockedMemory"); LOG.info("beginning testFilesExceedMaxLockedMemory");
@ -357,10 +406,11 @@ public void testUncachingBlocksBeforeCachingFinishes() throws Exception {
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
assertEquals("Unexpected amount of cache used", current, cacheUsed); assertEquals("Unexpected amount of cache used", current, cacheUsed);
MappableBlock.mlocker = new MappableBlock.Mlocker() { NativeIO.POSIX.cacheManipulator = new NativeIO.POSIX.CacheManipulator() {
@Override @Override
public void mlock(MappedByteBuffer mmap, long length) throws IOException { public void mlock(String identifier,
LOG.info("An mlock operation is starting."); ByteBuffer mmap, long length) throws IOException {
LOG.info("An mlock operation is starting on " + identifier);
try { try {
Thread.sleep(3000); Thread.sleep(3000);
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -33,6 +33,7 @@
import java.io.IOException; import java.io.IOException;
import java.nio.MappedByteBuffer; import java.nio.MappedByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
@ -60,6 +61,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -81,15 +83,7 @@ public class TestPathBasedCacheRequests {
static private MiniDFSCluster cluster; static private MiniDFSCluster cluster;
static private DistributedFileSystem dfs; static private DistributedFileSystem dfs;
static private NamenodeProtocols proto; static private NamenodeProtocols proto;
static private CacheManipulator prevCacheManipulator;
static {
MappableBlock.mlocker = new MappableBlock.Mlocker() {
@Override
public void mlock(MappedByteBuffer mmap, long length) throws IOException {
// Stubbed out for testing
}
};
}
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
@ -101,6 +95,18 @@ public void setup() throws Exception {
cluster.waitActive(); cluster.waitActive();
dfs = cluster.getFileSystem(); dfs = cluster.getFileSystem();
proto = cluster.getNameNodeRpc(); proto = cluster.getNameNodeRpc();
prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
// Save the current CacheManipulator and replace it at the end of the test
// Stub out mlock calls to avoid failing when not enough memory is lockable
// by the operating system.
NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
@Override
public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException {
LOG.info("mlocking " + identifier);
}
};
} }
@After @After
@ -108,6 +114,8 @@ public void teardown() throws Exception {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
// Restore the original CacheManipulator
NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -552,8 +560,8 @@ public void testCacheManagerRestart() throws Exception {
* @throws Exception * @throws Exception
*/ */
private static void waitForCachedBlocks(NameNode nn, private static void waitForCachedBlocks(NameNode nn,
final int expectedCachedBlocks, final int expectedCachedReplicas) final int expectedCachedBlocks, final int expectedCachedReplicas,
throws Exception { final String logString) throws Exception {
final FSNamesystem namesystem = nn.getNamesystem(); final FSNamesystem namesystem = nn.getNamesystem();
final CacheManager cacheManager = namesystem.getCacheManager(); final CacheManager cacheManager = namesystem.getCacheManager();
LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " + LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " +
@ -581,9 +589,9 @@ public Boolean get() {
(numCachedReplicas == expectedCachedReplicas)) { (numCachedReplicas == expectedCachedReplicas)) {
return true; return true;
} else { } else {
LOG.info("cached blocks: have " + numCachedBlocks + LOG.info(logString + " cached blocks: have " + numCachedBlocks +
" / " + expectedCachedBlocks); " / " + expectedCachedBlocks + ". " +
LOG.info("cached replicas: have " + numCachedReplicas + "cached replicas: have " + numCachedReplicas +
" / " + expectedCachedReplicas); " / " + expectedCachedReplicas);
return false; return false;
} }
@ -681,7 +689,7 @@ public void testWaitForCachedReplicas() throws Exception {
paths.add(p.toUri().getPath()); paths.add(p.toUri().getPath());
} }
// Check the initial statistics at the namenode // Check the initial statistics at the namenode
waitForCachedBlocks(namenode, 0, 0); waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
// Cache and check each path in sequence // Cache and check each path in sequence
int expected = 0; int expected = 0;
for (int i=0; i<numFiles; i++) { for (int i=0; i<numFiles; i++) {
@ -692,7 +700,8 @@ public void testWaitForCachedReplicas() throws Exception {
build(); build();
nnRpc.addPathBasedCacheDirective(directive); nnRpc.addPathBasedCacheDirective(directive);
expected += numBlocksPerFile; expected += numBlocksPerFile;
waitForCachedBlocks(namenode, expected, expected); waitForCachedBlocks(namenode, expected, expected,
"testWaitForCachedReplicas:1");
} }
// Uncache and check each path in sequence // Uncache and check each path in sequence
RemoteIterator<PathBasedCacheDirective> entries = RemoteIterator<PathBasedCacheDirective> entries =
@ -701,7 +710,8 @@ public void testWaitForCachedReplicas() throws Exception {
PathBasedCacheDirective directive = entries.next(); PathBasedCacheDirective directive = entries.next();
nnRpc.removePathBasedCacheDirective(directive.getId()); nnRpc.removePathBasedCacheDirective(directive.getId());
expected -= numBlocksPerFile; expected -= numBlocksPerFile;
waitForCachedBlocks(namenode, expected, expected); waitForCachedBlocks(namenode, expected, expected,
"testWaitForCachedReplicas:2");
} }
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
@ -735,7 +745,8 @@ public void testAddingPathBasedCacheDirectivesWhenCachingIsDisabled()
paths.add(p.toUri().getPath()); paths.add(p.toUri().getPath());
} }
// Check the initial statistics at the namenode // Check the initial statistics at the namenode
waitForCachedBlocks(namenode, 0, 0); waitForCachedBlocks(namenode, 0, 0,
"testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:0");
// Cache and check each path in sequence // Cache and check each path in sequence
int expected = 0; int expected = 0;
for (int i=0; i<numFiles; i++) { for (int i=0; i<numFiles; i++) {
@ -745,10 +756,12 @@ public void testAddingPathBasedCacheDirectivesWhenCachingIsDisabled()
setPool(pool). setPool(pool).
build(); build();
dfs.addPathBasedCacheDirective(directive); dfs.addPathBasedCacheDirective(directive);
waitForCachedBlocks(namenode, expected, 0); waitForCachedBlocks(namenode, expected, 0,
"testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:1");
} }
Thread.sleep(20000); Thread.sleep(20000);
waitForCachedBlocks(namenode, expected, 0); waitForCachedBlocks(namenode, expected, 0,
"testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:2");
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
@ -781,7 +794,8 @@ public void testWaitForCachedReplicasInDirectory() throws Exception {
FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile, FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
(int)BLOCK_SIZE, (short)3, false); (int)BLOCK_SIZE, (short)3, false);
} }
waitForCachedBlocks(namenode, 0, 0); waitForCachedBlocks(namenode, 0, 0,
"testWaitForCachedReplicasInDirectory:0");
// cache entire directory // cache entire directory
long id = dfs.addPathBasedCacheDirective( long id = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder(). new PathBasedCacheDirective.Builder().
@ -789,10 +803,12 @@ public void testWaitForCachedReplicasInDirectory() throws Exception {
setReplication((short)2). setReplication((short)2).
setPool(pool). setPool(pool).
build()); build());
waitForCachedBlocks(namenode, 4, 8); waitForCachedBlocks(namenode, 4, 8,
"testWaitForCachedReplicasInDirectory:1");
// remove and watch numCached go to 0 // remove and watch numCached go to 0
dfs.removePathBasedCacheDirective(id); dfs.removePathBasedCacheDirective(id);
waitForCachedBlocks(namenode, 0, 0); waitForCachedBlocks(namenode, 0, 0,
"testWaitForCachedReplicasInDirectory:2");
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
@ -830,7 +846,7 @@ public void testReplicationFactor() throws Exception {
FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile, FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
(int)BLOCK_SIZE, (short)3, false); (int)BLOCK_SIZE, (short)3, false);
} }
waitForCachedBlocks(namenode, 0, 0); waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
checkNumCachedReplicas(dfs, paths, 0, 0); checkNumCachedReplicas(dfs, paths, 0, 0);
// cache directory // cache directory
long id = dfs.addPathBasedCacheDirective( long id = dfs.addPathBasedCacheDirective(
@ -839,7 +855,7 @@ public void testReplicationFactor() throws Exception {
setReplication((short)1). setReplication((short)1).
setPool(pool). setPool(pool).
build()); build());
waitForCachedBlocks(namenode, 4, 4); waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
checkNumCachedReplicas(dfs, paths, 4, 4); checkNumCachedReplicas(dfs, paths, 4, 4);
// step up the replication factor // step up the replication factor
for (int i=2; i<=3; i++) { for (int i=2; i<=3; i++) {
@ -848,7 +864,7 @@ public void testReplicationFactor() throws Exception {
setId(id). setId(id).
setReplication((short)i). setReplication((short)i).
build()); build());
waitForCachedBlocks(namenode, 4, 4*i); waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
checkNumCachedReplicas(dfs, paths, 4, 4*i); checkNumCachedReplicas(dfs, paths, 4, 4*i);
} }
// step it down // step it down
@ -858,12 +874,12 @@ public void testReplicationFactor() throws Exception {
setId(id). setId(id).
setReplication((short)i). setReplication((short)i).
build()); build());
waitForCachedBlocks(namenode, 4, 4*i); waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
checkNumCachedReplicas(dfs, paths, 4, 4*i); checkNumCachedReplicas(dfs, paths, 4, 4*i);
} }
// remove and watch numCached go to 0 // remove and watch numCached go to 0
dfs.removePathBasedCacheDirective(id); dfs.removePathBasedCacheDirective(id);
waitForCachedBlocks(namenode, 0, 0); waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
checkNumCachedReplicas(dfs, paths, 0, 0); checkNumCachedReplicas(dfs, paths, 0, 0);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();