diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index 820f1068977..8a77bede4f2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -114,7 +114,17 @@ public class NativeIO { public static interface CacheTracker { public void fadvise(String identifier, long offset, long len, int flags); } - + + public static CacheManipulator cacheManipulator = new CacheManipulator(); + + @VisibleForTesting + public static class CacheManipulator { + public void mlock(String identifier, ByteBuffer buffer, + long len) throws IOException { + POSIX.mlock(buffer, len); + } + } + static { if (NativeCodeLoader.isNativeCodeLoaded()) { try { @@ -249,7 +259,7 @@ public class NativeIO { * * @throws NativeIOException */ - public static void mlock(ByteBuffer buffer, long len) + static void mlock(ByteBuffer buffer, long len) throws IOException { assertCodeLoaded(); if (!buffer.isDirect()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7586f950a56..044ff44ddf2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -194,6 +194,8 @@ Trunk (Unreleased) HDFS-5485. Add command-line support for modifyDirective. (cmccabe) + HDFS-5366. recaching improvements (cmccabe) + OPTIMIZATIONS HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 0483ce8de0b..10a46aa72d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -342,6 +342,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true; public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval"; public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3; + public static final String DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS = "dfs.namenode.path.based.cache.retry.interval.ms"; + public static final long DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 60000L; public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval"; public static final int DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT = 30; public static final String DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY = "dfs.namenode.decommission.nodes.per.interval"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index 5d9c39c16a0..1cef3ee9251 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -204,6 +204,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { namesystem.writeLock(); try { rescanCachedBlockMap(); + blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime(); } finally { namesystem.writeUnlock(); } @@ -316,17 +317,21 @@ public class CacheReplicationMonitor extends Thread implements Closeable { int numCached = cached.size(); if (numCached >= neededCached) { // If we have enough replicas, drop all pending cached. - for (DatanodeDescriptor datanode : pendingCached) { + for (Iterator iter = pendingCached.iterator(); + iter.hasNext(); ) { + DatanodeDescriptor datanode = iter.next(); datanode.getPendingCached().remove(cblock); + iter.remove(); } - pendingCached.clear(); } if (numCached < neededCached) { // If we don't have enough replicas, drop all pending uncached. - for (DatanodeDescriptor datanode : pendingUncached) { + for (Iterator iter = pendingUncached.iterator(); + iter.hasNext(); ) { + DatanodeDescriptor datanode = iter.next(); datanode.getPendingUncached().remove(cblock); + iter.remove(); } - pendingUncached.clear(); } int neededUncached = numCached - (pendingUncached.size() + neededCached); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index f7b43e4e05b..cf4a12cccba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -34,7 +34,6 @@ import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; /** * This class extends the DatanodeInfo class with ephemeral information (eg @@ -159,6 +158,12 @@ public class DatanodeDescriptor extends DatanodeInfo { return pendingUncached; } + /** + * The time when the last batch of caching directives was sent, in + * monotonic milliseconds. + */ + private long lastCachingDirectiveSentTimeMs; + /** * Head of the list of blocks on the datanode */ @@ -696,4 +701,20 @@ public class DatanodeDescriptor extends DatanodeInfo { } return sb.toString(); } + + /** + * @return The time at which we last sent caching directives to this + * DataNode, in monotonic milliseconds. + */ + public long getLastCachingDirectiveSentTimeMs() { + return this.lastCachingDirectiveSentTimeMs; + } + + /** + * @param time The time at which we last sent caching directives to this + * DataNode, in monotonic milliseconds. + */ + public void setLastCachingDirectiveSentTimeMs(long time) { + this.lastCachingDirectiveSentTimeMs = time; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index bfbee7e45c2..9832e52e915 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -149,7 +149,7 @@ public class DatanodeManager { * Whether we should tell datanodes what to cache in replies to * heartbeat messages. */ - private boolean sendCachingCommands = false; + private boolean shouldSendCachingCommands = false; /** * The number of datanodes for each software version. This list should change @@ -159,6 +159,16 @@ public class DatanodeManager { private HashMap datanodesSoftwareVersions = new HashMap(4, 0.75f); + /** + * The minimum time between resending caching directives to Datanodes, + * in milliseconds. + * + * Note that when a rescan happens, we will send the new directives + * as soon as possible. This timeout only applies to resending + * directives that we've already sent. + */ + private final long timeBetweenResendingCachingDirectivesMs; + DatanodeManager(final BlockManager blockManager, final Namesystem namesystem, final Configuration conf) throws IOException { this.namesystem = namesystem; @@ -241,6 +251,9 @@ public class DatanodeManager { DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY + " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " + "It should be a positive non-zero float value, not greater than 1.0f."); + this.timeBetweenResendingCachingDirectivesMs = conf.getLong( + DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS, + DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT); } private static long getStaleIntervalFromConf(Configuration conf, @@ -1307,17 +1320,28 @@ public class DatanodeManager { cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks)); } - DatanodeCommand pendingCacheCommand = - getCacheCommand(nodeinfo.getPendingCached(), nodeinfo, - DatanodeProtocol.DNA_CACHE, blockPoolId); - if (pendingCacheCommand != null) { - cmds.add(pendingCacheCommand); - } - DatanodeCommand pendingUncacheCommand = - getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo, - DatanodeProtocol.DNA_UNCACHE, blockPoolId); - if (pendingUncacheCommand != null) { - cmds.add(pendingUncacheCommand); + boolean sendingCachingCommands = false; + long nowMs = Time.monotonicNow(); + if (shouldSendCachingCommands && + ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >= + timeBetweenResendingCachingDirectivesMs)) { + DatanodeCommand pendingCacheCommand = + getCacheCommand(nodeinfo.getPendingCached(), nodeinfo, + DatanodeProtocol.DNA_CACHE, blockPoolId); + if (pendingCacheCommand != null) { + cmds.add(pendingCacheCommand); + sendingCachingCommands = true; + } + DatanodeCommand pendingUncacheCommand = + getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo, + DatanodeProtocol.DNA_UNCACHE, blockPoolId); + if (pendingUncacheCommand != null) { + cmds.add(pendingUncacheCommand); + sendingCachingCommands = true; + } + if (sendingCachingCommands) { + nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs); + } } blockManager.addKeyUpdateCommand(cmds, nodeinfo); @@ -1355,19 +1379,13 @@ public class DatanodeManager { if (length == 0) { return null; } - // Read and clear the existing cache commands. + // Read the existing cache commands. long[] blockIds = new long[length]; int i = 0; for (Iterator iter = list.iterator(); iter.hasNext(); ) { CachedBlock cachedBlock = iter.next(); blockIds[i++] = cachedBlock.getBlockId(); - iter.remove(); - } - if (!sendCachingCommands) { - // Do not send caching commands unless the FSNamesystem told us we - // should. - return null; } return new BlockIdCommand(action, poolId, blockIds); } @@ -1416,12 +1434,24 @@ public class DatanodeManager { } } + /** + * Reset the lastCachingDirectiveSentTimeMs field of all the DataNodes we + * know about. + */ + public void resetLastCachingDirectiveSentTime() { + synchronized (datanodeMap) { + for (DatanodeDescriptor dn : datanodeMap.values()) { + dn.setLastCachingDirectiveSentTimeMs(0L); + } + } + } + @Override public String toString() { return getClass().getSimpleName() + ": " + host2DatanodeMap; } - public void setSendCachingCommands(boolean sendCachingCommands) { - this.sendCachingCommands = sendCachingCommands; + public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) { + this.shouldSendCachingCommands = shouldSendCachingCommands; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index 3e12168e22d..48ac9507b17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -289,6 +289,10 @@ public class FsDatasetCache { mappableBlockMap.put(key, new Value(null, State.CACHING)); volumeExecutor.execute( new CachingTask(key, blockFileName, length, genstamp)); + if (LOG.isDebugEnabled()) { + LOG.debug("Initiating caching for Block with id " + blockId + + ", pool " + bpid); + } } synchronized void uncacheBlock(String bpid, long blockId) { @@ -427,6 +431,10 @@ public class FsDatasetCache { mappableBlock.close(); } numBlocksFailedToCache.incrementAndGet(); + + synchronized (FsDatasetCache.this) { + mappableBlockMap.remove(key); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java index 09d2ed6d5e4..29d472335a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java @@ -44,20 +44,6 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private @InterfaceStability.Unstable public class MappableBlock implements Closeable { - public static interface Mlocker { - void mlock(MappedByteBuffer mmap, long length) throws IOException; - } - - private static class PosixMlocker implements Mlocker { - public void mlock(MappedByteBuffer mmap, long length) - throws IOException { - NativeIO.POSIX.mlock(mmap, length); - } - } - - @VisibleForTesting - public static Mlocker mlocker = new PosixMlocker(); - private MappedByteBuffer mmap; private final long length; @@ -96,7 +82,7 @@ public class MappableBlock implements Closeable { throw new IOException("Block InputStream has no FileChannel."); } mmap = blockChannel.map(MapMode.READ_ONLY, 0, length); - mlocker.mlock(mmap, length); + NativeIO.POSIX.cacheManipulator.mlock(blockFileName, mmap, length); verifyChecksum(length, metaIn, blockChannel, blockFileName); mappableBlock = new MappableBlock(mmap, length); } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d4610827cd7..fcc66c63d62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1014,7 +1014,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, nnEditLogRoller.start(); cacheManager.activate(); - blockManager.getDatanodeManager().setSendCachingCommands(true); + blockManager.getDatanodeManager().setShouldSendCachingCommands(true); } finally { writeUnlock(); startingActiveService = false; @@ -1065,7 +1065,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, dir.fsImage.updateLastAppliedTxIdFromWritten(); } cacheManager.deactivate(); - blockManager.getDatanodeManager().setSendCachingCommands(false); + blockManager.getDatanodeManager().setShouldSendCachingCommands(false); } finally { writeUnlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 45133d09967..dbf4f4433cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1516,6 +1516,18 @@ + + dfs.namenode.path.based.cache.retry.interval.ms + 60000 + + When the NameNode needs to uncache something that is cached, or cache + something that is not cached, it must direct the DataNodes to do so by + sending a DNA_CACHE or DNA_UNCACHE command in response to a DataNode + heartbeat. This parameter controls how frequently the NameNode will + resend these commands. + + + dfs.datanode.fsdatasetcache.max.threads.per.volume 4 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index b0e907fc1f7..a6909cb4788 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -28,8 +28,10 @@ import static org.mockito.Mockito.doReturn; import java.io.FileInputStream; import java.io.IOException; -import java.nio.MappedByteBuffer; +import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,7 +51,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock.Mlocker; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; @@ -60,6 +61,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.MetricsAsserts; @@ -87,8 +89,7 @@ public class TestFsDatasetCache { private static FsDatasetSpi fsd; private static DatanodeProtocolClientSideTranslatorPB spyNN; private static PageRounder rounder = new PageRounder(); - - private Mlocker mlocker; + private static CacheManipulator prevCacheManipulator; @Before public void setUp() throws Exception { @@ -96,6 +97,8 @@ public class TestFsDatasetCache { assumeTrue(NativeIO.getMemlockLimit() >= CACHE_CAPACITY); conf = new HdfsConfiguration(); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS, + 500); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY); @@ -113,8 +116,19 @@ public class TestFsDatasetCache { fsd = dn.getFSDataset(); spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn); - // Save the current mlocker and replace it at the end of the test - mlocker = MappableBlock.mlocker; + + prevCacheManipulator = NativeIO.POSIX.cacheManipulator; + + // Save the current CacheManipulator and replace it at the end of the test + // Stub out mlock calls to avoid failing when not enough memory is lockable + // by the operating system. + NativeIO.POSIX.cacheManipulator = new CacheManipulator() { + @Override + public void mlock(String identifier, + ByteBuffer mmap, long length) throws IOException { + LOG.info("mlocking " + identifier); + } + }; } @After @@ -125,8 +139,8 @@ public class TestFsDatasetCache { if (cluster != null) { cluster.shutdown(); } - // Restore the original mlocker - MappableBlock.mlocker = mlocker; + // Restore the original CacheManipulator + NativeIO.POSIX.cacheManipulator = prevCacheManipulator; } private static void setHeartbeatResponse(DatanodeCommand[] cmds) @@ -214,8 +228,7 @@ public class TestFsDatasetCache { return expected; } - @Test(timeout=600000) - public void testCacheAndUncacheBlock() throws Exception { + private void testCacheAndUncacheBlock() throws Exception { LOG.info("beginning testCacheAndUncacheBlock"); final int NUM_BLOCKS = 5; @@ -268,6 +281,42 @@ public class TestFsDatasetCache { LOG.info("finishing testCacheAndUncacheBlock"); } + @Test(timeout=600000) + public void testCacheAndUncacheBlockSimple() throws Exception { + testCacheAndUncacheBlock(); + } + + /** + * Run testCacheAndUncacheBlock with some failures injected into the mlock + * call. This tests the ability of the NameNode to resend commands. + */ + @Test(timeout=600000) + public void testCacheAndUncacheBlockWithRetries() throws Exception { + CacheManipulator prevCacheManipulator = NativeIO.POSIX.cacheManipulator; + + try { + NativeIO.POSIX.cacheManipulator = new CacheManipulator() { + private final Set seenIdentifiers = new HashSet(); + + @Override + public void mlock(String identifier, + ByteBuffer mmap, long length) throws IOException { + if (seenIdentifiers.contains(identifier)) { + // mlock succeeds the second time. + LOG.info("mlocking " + identifier); + return; + } + seenIdentifiers.add(identifier); + throw new IOException("injecting IOException during mlock of " + + identifier); + } + }; + testCacheAndUncacheBlock(); + } finally { + NativeIO.POSIX.cacheManipulator = prevCacheManipulator; + } + } + @Test(timeout=600000) public void testFilesExceedMaxLockedMemory() throws Exception { LOG.info("beginning testFilesExceedMaxLockedMemory"); @@ -357,10 +406,11 @@ public class TestFsDatasetCache { assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); assertEquals("Unexpected amount of cache used", current, cacheUsed); - MappableBlock.mlocker = new MappableBlock.Mlocker() { + NativeIO.POSIX.cacheManipulator = new NativeIO.POSIX.CacheManipulator() { @Override - public void mlock(MappedByteBuffer mmap, long length) throws IOException { - LOG.info("An mlock operation is starting."); + public void mlock(String identifier, + ByteBuffer mmap, long length) throws IOException { + LOG.info("An mlock operation is starting on " + identifier); try { Thread.sleep(3000); } catch (InterruptedException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java index 890ccfb525c..7182aad7043 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java @@ -33,6 +33,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.nio.MappedByteBuffer; import java.security.PrivilegedExceptionAction; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -60,6 +61,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBl import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; @@ -81,15 +83,7 @@ public class TestPathBasedCacheRequests { static private MiniDFSCluster cluster; static private DistributedFileSystem dfs; static private NamenodeProtocols proto; - - static { - MappableBlock.mlocker = new MappableBlock.Mlocker() { - @Override - public void mlock(MappedByteBuffer mmap, long length) throws IOException { - // Stubbed out for testing - } - }; - } + static private CacheManipulator prevCacheManipulator; @Before public void setup() throws Exception { @@ -101,6 +95,18 @@ public class TestPathBasedCacheRequests { cluster.waitActive(); dfs = cluster.getFileSystem(); proto = cluster.getNameNodeRpc(); + prevCacheManipulator = NativeIO.POSIX.cacheManipulator; + + // Save the current CacheManipulator and replace it at the end of the test + // Stub out mlock calls to avoid failing when not enough memory is lockable + // by the operating system. + NativeIO.POSIX.cacheManipulator = new CacheManipulator() { + @Override + public void mlock(String identifier, + ByteBuffer mmap, long length) throws IOException { + LOG.info("mlocking " + identifier); + } + }; } @After @@ -108,6 +114,8 @@ public class TestPathBasedCacheRequests { if (cluster != null) { cluster.shutdown(); } + // Restore the original CacheManipulator + NativeIO.POSIX.cacheManipulator = prevCacheManipulator; } @Test(timeout=60000) @@ -552,8 +560,8 @@ public class TestPathBasedCacheRequests { * @throws Exception */ private static void waitForCachedBlocks(NameNode nn, - final int expectedCachedBlocks, final int expectedCachedReplicas) - throws Exception { + final int expectedCachedBlocks, final int expectedCachedReplicas, + final String logString) throws Exception { final FSNamesystem namesystem = nn.getNamesystem(); final CacheManager cacheManager = namesystem.getCacheManager(); LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " + @@ -581,9 +589,9 @@ public class TestPathBasedCacheRequests { (numCachedReplicas == expectedCachedReplicas)) { return true; } else { - LOG.info("cached blocks: have " + numCachedBlocks + - " / " + expectedCachedBlocks); - LOG.info("cached replicas: have " + numCachedReplicas + + LOG.info(logString + " cached blocks: have " + numCachedBlocks + + " / " + expectedCachedBlocks + ". " + + "cached replicas: have " + numCachedReplicas + " / " + expectedCachedReplicas); return false; } @@ -681,7 +689,7 @@ public class TestPathBasedCacheRequests { paths.add(p.toUri().getPath()); } // Check the initial statistics at the namenode - waitForCachedBlocks(namenode, 0, 0); + waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0"); // Cache and check each path in sequence int expected = 0; for (int i=0; i entries = @@ -701,7 +710,8 @@ public class TestPathBasedCacheRequests { PathBasedCacheDirective directive = entries.next(); nnRpc.removePathBasedCacheDirective(directive.getId()); expected -= numBlocksPerFile; - waitForCachedBlocks(namenode, expected, expected); + waitForCachedBlocks(namenode, expected, expected, + "testWaitForCachedReplicas:2"); } } finally { cluster.shutdown(); @@ -735,7 +745,8 @@ public class TestPathBasedCacheRequests { paths.add(p.toUri().getPath()); } // Check the initial statistics at the namenode - waitForCachedBlocks(namenode, 0, 0); + waitForCachedBlocks(namenode, 0, 0, + "testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:0"); // Cache and check each path in sequence int expected = 0; for (int i=0; i