From cad14aa9168112ef1ceae80b94d9aae3ba293578 Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Mon, 8 Sep 2014 12:51:22 -0700 Subject: [PATCH] HDFS-6036. Forcibly timeout misbehaving DFSClients that try to do no-checksum reads that extend too long. (cmccabe) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 6 + .../server/datanode/ShortCircuitRegistry.java | 22 ++- .../fsdataset/impl/FsDatasetCache.java | 109 ++++++++-- .../src/main/resources/hdfs-default.xml | 21 ++ .../server/datanode/TestFsDatasetCache.java | 2 +- .../TestFsDatasetCacheRevocation.java | 187 ++++++++++++++++++ 7 files changed, 334 insertions(+), 16 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2c45017eb8c..1dbfc234c82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -449,6 +449,9 @@ Release 2.6.0 - UNRELEASED HDFS-6943. Improve NN allocateBlock log to include replicas' datanode IPs. (Ming Ma via wheat9) + HDFS-6036. Forcibly timeout misbehaving DFSClients that try to do + no-checksum reads that extend too long (cmccabe) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 2f86ed6f1b6..8443c7176e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -248,6 +248,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_OOB_TIMEOUT_KEY = "dfs.datanode.oob.timeout-ms"; public static final String DFS_DATANODE_OOB_TIMEOUT_DEFAULT = "1500,0,0,0"; // OOB_TYPE1, OOB_TYPE2, OOB_TYPE3, OOB_TYPE4 + public static final String DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS = "dfs.datanode.cache.revocation.timeout.ms"; + public static final long DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT = 900000L; + + public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms"; + public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L; + public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check"; public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java index a252a17855a..ddde22da67c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java @@ -26,6 +26,7 @@ import java.io.Closeable; import java.io.FileInputStream; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Set; @@ -43,6 +44,7 @@ import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocketWatcher; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; @@ -83,11 +85,13 @@ public class ShortCircuitRegistry { private static class RegisteredShm extends ShortCircuitShm implements DomainSocketWatcher.Handler { + private final String clientName; private final ShortCircuitRegistry registry; - RegisteredShm(ShmId shmId, FileInputStream stream, + RegisteredShm(String clientName, ShmId shmId, FileInputStream stream, ShortCircuitRegistry registry) throws IOException { super(shmId, stream); + this.clientName = clientName; this.registry = registry; } @@ -100,6 +104,10 @@ public class ShortCircuitRegistry { } return true; } + + String getClientName() { + return clientName; + } } public synchronized void removeShm(ShortCircuitShm shm) { @@ -243,6 +251,16 @@ public class ShortCircuitRegistry { } } + public synchronized String getClientNames(ExtendedBlockId blockId) { + if (!enabled) return ""; + final HashSet clientNames = new HashSet(); + final Set affectedSlots = slots.get(blockId); + for (Slot slot : affectedSlots) { + clientNames.add(((RegisteredShm)slot.getShm()).getClientName()); + } + return Joiner.on(",").join(clientNames); + } + public static class NewShmInfo implements Closeable { public final ShmId shmId; public final FileInputStream stream; @@ -290,7 +308,7 @@ public class ShortCircuitRegistry { shmId = ShmId.createRandom(); } while (segments.containsKey(shmId)); fis = shmFactory.createDescriptor(clientName, SHM_LENGTH); - shm = new RegisteredShm(shmId, fis, this); + shm = new RegisteredShm(clientName, shmId, fis, this); } finally { if (shm == null) { IOUtils.closeQuietly(fis); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index 060aed481a5..4acfc8f01a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -18,6 +18,11 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT; + import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -33,10 +38,12 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.time.DurationFormatUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; @@ -45,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,6 +124,12 @@ public class FsDatasetCache { private final ThreadPoolExecutor uncachingExecutor; + private final ScheduledThreadPoolExecutor deferredUncachingExecutor; + + private final long revocationMs; + + private final long revocationPollingMs; + /** * The approximate amount of cache space in use. * @@ -217,6 +231,24 @@ public class FsDatasetCache { new LinkedBlockingQueue(), workerFactory); this.uncachingExecutor.allowCoreThreadTimeOut(true); + this.deferredUncachingExecutor = new ScheduledThreadPoolExecutor( + 1, workerFactory); + this.revocationMs = dataset.datanode.getConf().getLong( + DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, + DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT); + long confRevocationPollingMs = dataset.datanode.getConf().getLong( + DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, + DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT); + long minRevocationPollingMs = revocationMs / 2; + if (minRevocationPollingMs < confRevocationPollingMs) { + throw new RuntimeException("configured value " + + confRevocationPollingMs + "for " + + DFS_DATANODE_CACHE_REVOCATION_POLLING_MS + + " is too high. It must not be more than half of the " + + "value of " + DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS + + ". Reconfigure this to " + minRevocationPollingMs); + } + this.revocationPollingMs = confRevocationPollingMs; } /** @@ -262,13 +294,11 @@ public class FsDatasetCache { synchronized void uncacheBlock(String bpid, long blockId) { ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); Value prevValue = mappableBlockMap.get(key); + boolean deferred = false; if (!dataset.datanode.getShortCircuitRegistry(). processBlockMunlockRequest(key)) { - // TODO: we probably want to forcibly uncache the block (and close the - // shm) after a certain timeout has elapsed. - LOG.debug("{} is anchored, and can't be uncached now.", key); - return; + deferred = true; } if (prevValue == null) { LOG.debug("Block with id {}, pool {} does not need to be uncached, " @@ -285,12 +315,19 @@ public class FsDatasetCache { new Value(prevValue.mappableBlock, State.CACHING_CANCELLED)); break; case CACHED: - LOG.debug( - "Block with id {}, pool {} has been scheduled for uncaching" + ".", - blockId, bpid); mappableBlockMap.put(key, new Value(prevValue.mappableBlock, State.UNCACHING)); - uncachingExecutor.execute(new UncachingTask(key)); + if (deferred) { + LOG.debug("{} is anchored, and can't be uncached now. Scheduling it " + + "for uncaching in {} ", + key, DurationFormatUtils.formatDurationHMS(revocationPollingMs)); + deferredUncachingExecutor.schedule( + new UncachingTask(key, revocationMs), + revocationPollingMs, TimeUnit.MILLISECONDS); + } else { + LOG.debug("{} has been scheduled for immediate uncaching.", key); + uncachingExecutor.execute(new UncachingTask(key, 0)); + } break; default: LOG.debug("Block with id {}, pool {} does not need to be uncached, " @@ -403,22 +440,62 @@ public class FsDatasetCache { private class UncachingTask implements Runnable { private final ExtendedBlockId key; + private final long revocationTimeMs; - UncachingTask(ExtendedBlockId key) { + UncachingTask(ExtendedBlockId key, long revocationDelayMs) { this.key = key; + if (revocationDelayMs == 0) { + this.revocationTimeMs = 0; + } else { + this.revocationTimeMs = revocationDelayMs + Time.monotonicNow(); + } + } + + private boolean shouldDefer() { + /* If revocationTimeMs == 0, this is an immediate uncache request. + * No clients were anchored at the time we made the request. */ + if (revocationTimeMs == 0) { + return false; + } + /* Let's check if any clients still have this block anchored. */ + boolean anchored = + !dataset.datanode.getShortCircuitRegistry(). + processBlockMunlockRequest(key); + if (!anchored) { + LOG.debug("Uncaching {} now that it is no longer in use " + + "by any clients.", key); + return false; + } + long delta = revocationTimeMs - Time.monotonicNow(); + if (delta < 0) { + LOG.warn("Forcibly uncaching {} after {} " + + "because client(s) {} refused to stop using it.", key, + DurationFormatUtils.formatDurationHMS(revocationTimeMs), + dataset.datanode.getShortCircuitRegistry().getClientNames(key)); + return false; + } + LOG.info("Replica {} still can't be uncached because some " + + "clients continue to use it. Will wait for {}", key, + DurationFormatUtils.formatDurationHMS(delta)); + return true; } @Override public void run() { Value value; - + + if (shouldDefer()) { + deferredUncachingExecutor.schedule( + this, revocationPollingMs, TimeUnit.MILLISECONDS); + return; + } + synchronized (FsDatasetCache.this) { value = mappableBlockMap.get(key); } Preconditions.checkNotNull(value); Preconditions.checkArgument(value.state == State.UNCACHING); - // TODO: we will eventually need to do revocation here if any clients - // are reading via mmap with checksums enabled. See HDFS-5182. + IOUtils.closeQuietly(value.mappableBlock); synchronized (FsDatasetCache.this) { mappableBlockMap.remove(key); @@ -427,7 +504,13 @@ public class FsDatasetCache { usedBytesCount.release(value.mappableBlock.getLength()); numBlocksCached.addAndGet(-1); dataset.datanode.getMetrics().incrBlocksUncached(1); - LOG.debug("Uncaching of {} completed. usedBytes = {}", key, newUsedBytes); + if (revocationTimeMs != 0) { + LOG.debug("Uncaching of {} completed. usedBytes = {}", + key, newUsedBytes); + } else { + LOG.debug("Deferred uncaching of {} completed. usedBytes = {}", + key, newUsedBytes); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 9170ec69ed4..4c2737995e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2116,4 +2116,25 @@ + + dfs.datanode.cache.revocation.timeout.ms + 900000 + When the DFSClient reads from a block file which the DataNode is + caching, the DFSClient can skip verifying checksums. The DataNode will + keep the block file in cache until the client is done. If the client takes + an unusually long time, though, the DataNode may need to evict the block + file from the cache anyway. This value controls how long the DataNode will + wait for the client to release a replica that it is reading without + checksums. + + + + + dfs.datanode.cache.revocation.polling.ms + 500 + How often the DataNode should poll to see if the clients have + stopped using a replica that the DataNode wants to uncache. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index d6e70d80037..c049d81d799 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -89,7 +89,7 @@ public class TestFsDatasetCache { private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class); // Most Linux installs allow a default of 64KB locked memory - private static final long CACHE_CAPACITY = 64 * 1024; + static final long CACHE_CAPACITY = 64 * 1024; // mlock always locks the entire page. So we don't need to deal with this // rounding, use the OS page size for the block size. private static final long PAGE_SIZE = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java new file mode 100644 index 00000000000..af28ed70652 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.BlockReaderTestUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestFsDatasetCacheRevocation { + private static final Logger LOG = LoggerFactory.getLogger( + TestFsDatasetCacheRevocation.class); + + private static CacheManipulator prevCacheManipulator; + + private static TemporarySocketDirectory sockDir; + + private static final int BLOCK_SIZE = 4096; + + @Before + public void setUp() throws Exception { + prevCacheManipulator = NativeIO.POSIX.getCacheManipulator(); + NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator()); + DomainSocket.disableBindPathValidation(); + sockDir = new TemporarySocketDirectory(); + } + + @After + public void tearDown() throws Exception { + // Restore the original CacheManipulator + NativeIO.POSIX.setCacheManipulator(prevCacheManipulator); + sockDir.close(); + } + + private static Configuration getDefaultConf() { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 50); + conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 250); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + TestFsDatasetCache.CACHE_CAPACITY); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + new File(sockDir.getDir(), "sock").getAbsolutePath()); + return conf; + } + + /** + * Test that when a client has a replica mmapped, we will not un-mlock that + * replica for a reasonable amount of time, even if an uncache request + * occurs. + */ + @Test(timeout=120000) + public void testPinning() throws Exception { + Configuration conf = getDefaultConf(); + // Set a really long revocation timeout, so that we won't reach it during + // this test. + conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, + 1800000L); + // Poll very often + conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L); + MiniDFSCluster cluster = null; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + + // Create and cache a file. + final String TEST_FILE = "/test_file"; + DFSTestUtil.createFile(dfs, new Path(TEST_FILE), + BLOCK_SIZE, (short)1, 0xcafe); + dfs.addCachePool(new CachePoolInfo("pool")); + long cacheDirectiveId = + dfs.addCacheDirective(new CacheDirectiveInfo.Builder(). + setPool("pool").setPath(new Path(TEST_FILE)). + setReplication((short) 1).build()); + FsDatasetSpi fsd = cluster.getDataNodes().get(0).getFSDataset(); + DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd); + + // Mmap the file. + FSDataInputStream in = dfs.open(new Path(TEST_FILE)); + ByteBuffer buf = + in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class)); + + // Attempt to uncache file. The file should still be cached. + dfs.removeCacheDirective(cacheDirectiveId); + Thread.sleep(500); + DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd); + + // Un-mmap the file. The file should be uncached after this. + in.releaseBuffer(buf); + DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd); + + // Cleanup + in.close(); + cluster.shutdown(); + } + + /** + * Test that when we have an uncache request, and the client refuses to release + * the replica for a long time, we will un-mlock it. + */ + @Test(timeout=120000) + public void testRevocation() throws Exception { + BlockReaderTestUtil.enableHdfsCachingTracing(); + BlockReaderTestUtil.enableShortCircuitShmTracing(); + Configuration conf = getDefaultConf(); + // Set a really short revocation timeout. + conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, 250L); + // Poll very often + conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L); + MiniDFSCluster cluster = null; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + + // Create and cache a file. + final String TEST_FILE = "/test_file2"; + DFSTestUtil.createFile(dfs, new Path(TEST_FILE), + BLOCK_SIZE, (short)1, 0xcafe); + dfs.addCachePool(new CachePoolInfo("pool")); + long cacheDirectiveId = + dfs.addCacheDirective(new CacheDirectiveInfo.Builder(). + setPool("pool").setPath(new Path(TEST_FILE)). + setReplication((short) 1).build()); + FsDatasetSpi fsd = cluster.getDataNodes().get(0).getFSDataset(); + DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd); + + // Mmap the file. + FSDataInputStream in = dfs.open(new Path(TEST_FILE)); + ByteBuffer buf = + in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class)); + + // Attempt to uncache file. The file should get uncached. + LOG.info("removing cache directive {}", cacheDirectiveId); + dfs.removeCacheDirective(cacheDirectiveId); + LOG.info("finished removing cache directive {}", cacheDirectiveId); + Thread.sleep(1000); + DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd); + + // Cleanup + in.releaseBuffer(buf); + in.close(); + cluster.shutdown(); + } +}