HDFS-6036. Forcibly timeout misbehaving DFSClients that try to do no-checksum reads that extend too long. (cmccabe)

(cherry picked from commit cad14aa916)
This commit is contained in:
Colin Patrick Mccabe 2014-09-08 12:51:22 -07:00
parent d20047edda
commit bdcf5e940f
7 changed files with 334 additions and 16 deletions

View File

@ -193,6 +193,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6943. Improve NN allocateBlock log to include replicas' datanode IPs.
(Ming Ma via wheat9)
HDFS-6036. Forcibly timeout misbehaving DFSClients that try to do
no-checksum reads that extend too long (cmccabe)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -249,6 +249,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_OOB_TIMEOUT_KEY = "dfs.datanode.oob.timeout-ms";
public static final String DFS_DATANODE_OOB_TIMEOUT_DEFAULT = "1500,0,0,0"; // OOB_TYPE1, OOB_TYPE2, OOB_TYPE3, OOB_TYPE4
public static final String DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS = "dfs.datanode.cache.revocation.timeout.ms";
public static final long DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT = 900000L;
public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms";
public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;

View File

@ -26,6 +26,7 @@ import java.io.Closeable;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
@ -43,6 +44,7 @@ import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.DomainSocketWatcher;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
@ -83,11 +85,13 @@ public class ShortCircuitRegistry {
private static class RegisteredShm extends ShortCircuitShm
implements DomainSocketWatcher.Handler {
private final String clientName;
private final ShortCircuitRegistry registry;
RegisteredShm(ShmId shmId, FileInputStream stream,
RegisteredShm(String clientName, ShmId shmId, FileInputStream stream,
ShortCircuitRegistry registry) throws IOException {
super(shmId, stream);
this.clientName = clientName;
this.registry = registry;
}
@ -100,6 +104,10 @@ public class ShortCircuitRegistry {
}
return true;
}
String getClientName() {
return clientName;
}
}
public synchronized void removeShm(ShortCircuitShm shm) {
@ -243,6 +251,16 @@ public class ShortCircuitRegistry {
}
}
public synchronized String getClientNames(ExtendedBlockId blockId) {
if (!enabled) return "";
final HashSet<String> clientNames = new HashSet<String>();
final Set<Slot> affectedSlots = slots.get(blockId);
for (Slot slot : affectedSlots) {
clientNames.add(((RegisteredShm)slot.getShm()).getClientName());
}
return Joiner.on(",").join(clientNames);
}
public static class NewShmInfo implements Closeable {
public final ShmId shmId;
public final FileInputStream stream;
@ -290,7 +308,7 @@ public class ShortCircuitRegistry {
shmId = ShmId.createRandom();
} while (segments.containsKey(shmId));
fis = shmFactory.createDescriptor(clientName, SHM_LENGTH);
shm = new RegisteredShm(shmId, fis, this);
shm = new RegisteredShm(clientName, shmId, fis, this);
} finally {
if (shm == null) {
IOUtils.closeQuietly(fis);

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -33,10 +38,12 @@ import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
@ -45,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -116,6 +124,12 @@ public class FsDatasetCache {
private final ThreadPoolExecutor uncachingExecutor;
private final ScheduledThreadPoolExecutor deferredUncachingExecutor;
private final long revocationMs;
private final long revocationPollingMs;
/**
* The approximate amount of cache space in use.
*
@ -217,6 +231,24 @@ public class FsDatasetCache {
new LinkedBlockingQueue<Runnable>(),
workerFactory);
this.uncachingExecutor.allowCoreThreadTimeOut(true);
this.deferredUncachingExecutor = new ScheduledThreadPoolExecutor(
1, workerFactory);
this.revocationMs = dataset.datanode.getConf().getLong(
DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT);
long confRevocationPollingMs = dataset.datanode.getConf().getLong(
DFS_DATANODE_CACHE_REVOCATION_POLLING_MS,
DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT);
long minRevocationPollingMs = revocationMs / 2;
if (minRevocationPollingMs < confRevocationPollingMs) {
throw new RuntimeException("configured value " +
confRevocationPollingMs + "for " +
DFS_DATANODE_CACHE_REVOCATION_POLLING_MS +
" is too high. It must not be more than half of the " +
"value of " + DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS +
". Reconfigure this to " + minRevocationPollingMs);
}
this.revocationPollingMs = confRevocationPollingMs;
}
/**
@ -262,13 +294,11 @@ public class FsDatasetCache {
synchronized void uncacheBlock(String bpid, long blockId) {
ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
Value prevValue = mappableBlockMap.get(key);
boolean deferred = false;
if (!dataset.datanode.getShortCircuitRegistry().
processBlockMunlockRequest(key)) {
// TODO: we probably want to forcibly uncache the block (and close the
// shm) after a certain timeout has elapsed.
LOG.debug("{} is anchored, and can't be uncached now.", key);
return;
deferred = true;
}
if (prevValue == null) {
LOG.debug("Block with id {}, pool {} does not need to be uncached, "
@ -285,12 +315,19 @@ public class FsDatasetCache {
new Value(prevValue.mappableBlock, State.CACHING_CANCELLED));
break;
case CACHED:
LOG.debug(
"Block with id {}, pool {} has been scheduled for uncaching" + ".",
blockId, bpid);
mappableBlockMap.put(key,
new Value(prevValue.mappableBlock, State.UNCACHING));
uncachingExecutor.execute(new UncachingTask(key));
if (deferred) {
LOG.debug("{} is anchored, and can't be uncached now. Scheduling it " +
"for uncaching in {} ",
key, DurationFormatUtils.formatDurationHMS(revocationPollingMs));
deferredUncachingExecutor.schedule(
new UncachingTask(key, revocationMs),
revocationPollingMs, TimeUnit.MILLISECONDS);
} else {
LOG.debug("{} has been scheduled for immediate uncaching.", key);
uncachingExecutor.execute(new UncachingTask(key, 0));
}
break;
default:
LOG.debug("Block with id {}, pool {} does not need to be uncached, "
@ -403,22 +440,62 @@ public class FsDatasetCache {
private class UncachingTask implements Runnable {
private final ExtendedBlockId key;
private final long revocationTimeMs;
UncachingTask(ExtendedBlockId key) {
UncachingTask(ExtendedBlockId key, long revocationDelayMs) {
this.key = key;
if (revocationDelayMs == 0) {
this.revocationTimeMs = 0;
} else {
this.revocationTimeMs = revocationDelayMs + Time.monotonicNow();
}
}
private boolean shouldDefer() {
/* If revocationTimeMs == 0, this is an immediate uncache request.
* No clients were anchored at the time we made the request. */
if (revocationTimeMs == 0) {
return false;
}
/* Let's check if any clients still have this block anchored. */
boolean anchored =
!dataset.datanode.getShortCircuitRegistry().
processBlockMunlockRequest(key);
if (!anchored) {
LOG.debug("Uncaching {} now that it is no longer in use " +
"by any clients.", key);
return false;
}
long delta = revocationTimeMs - Time.monotonicNow();
if (delta < 0) {
LOG.warn("Forcibly uncaching {} after {} " +
"because client(s) {} refused to stop using it.", key,
DurationFormatUtils.formatDurationHMS(revocationTimeMs),
dataset.datanode.getShortCircuitRegistry().getClientNames(key));
return false;
}
LOG.info("Replica {} still can't be uncached because some " +
"clients continue to use it. Will wait for {}", key,
DurationFormatUtils.formatDurationHMS(delta));
return true;
}
@Override
public void run() {
Value value;
if (shouldDefer()) {
deferredUncachingExecutor.schedule(
this, revocationPollingMs, TimeUnit.MILLISECONDS);
return;
}
synchronized (FsDatasetCache.this) {
value = mappableBlockMap.get(key);
}
Preconditions.checkNotNull(value);
Preconditions.checkArgument(value.state == State.UNCACHING);
// TODO: we will eventually need to do revocation here if any clients
// are reading via mmap with checksums enabled. See HDFS-5182.
IOUtils.closeQuietly(value.mappableBlock);
synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key);
@ -427,7 +504,13 @@ public class FsDatasetCache {
usedBytesCount.release(value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1);
dataset.datanode.getMetrics().incrBlocksUncached(1);
LOG.debug("Uncaching of {} completed. usedBytes = {}", key, newUsedBytes);
if (revocationTimeMs != 0) {
LOG.debug("Uncaching of {} completed. usedBytes = {}",
key, newUsedBytes);
} else {
LOG.debug("Deferred uncaching of {} completed. usedBytes = {}",
key, newUsedBytes);
}
}
}

View File

@ -2108,4 +2108,25 @@
</description>
</property>
<property>
<name>dfs.datanode.cache.revocation.timeout.ms</name>
<value>900000</value>
<description>When the DFSClient reads from a block file which the DataNode is
caching, the DFSClient can skip verifying checksums. The DataNode will
keep the block file in cache until the client is done. If the client takes
an unusually long time, though, the DataNode may need to evict the block
file from the cache anyway. This value controls how long the DataNode will
wait for the client to release a replica that it is reading without
checksums.
</description>
</property>
<property>
<name>dfs.datanode.cache.revocation.polling.ms</name>
<value>500</value>
<description>How often the DataNode should poll to see if the clients have
stopped using a replica that the DataNode wants to uncache.
</description>
</property>
</configuration>

View File

@ -89,7 +89,7 @@ public class TestFsDatasetCache {
private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
// Most Linux installs allow a default of 64KB locked memory
private static final long CACHE_CAPACITY = 64 * 1024;
static final long CACHE_CAPACITY = 64 * 1024;
// mlock always locks the entire page. So we don't need to deal with this
// rounding, use the OS page size for the block size.
private static final long PAGE_SIZE =

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestFsDatasetCacheRevocation {
private static final Logger LOG = LoggerFactory.getLogger(
TestFsDatasetCacheRevocation.class);
private static CacheManipulator prevCacheManipulator;
private static TemporarySocketDirectory sockDir;
private static final int BLOCK_SIZE = 4096;
@Before
public void setUp() throws Exception {
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
DomainSocket.disableBindPathValidation();
sockDir = new TemporarySocketDirectory();
}
@After
public void tearDown() throws Exception {
// Restore the original CacheManipulator
NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
sockDir.close();
}
private static Configuration getDefaultConf() {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 50);
conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 250);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
TestFsDatasetCache.CACHE_CAPACITY);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), "sock").getAbsolutePath());
return conf;
}
/**
* Test that when a client has a replica mmapped, we will not un-mlock that
* replica for a reasonable amount of time, even if an uncache request
* occurs.
*/
@Test(timeout=120000)
public void testPinning() throws Exception {
Configuration conf = getDefaultConf();
// Set a really long revocation timeout, so that we won't reach it during
// this test.
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
1800000L);
// Poll very often
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
MiniDFSCluster cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and cache a file.
final String TEST_FILE = "/test_file";
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
BLOCK_SIZE, (short)1, 0xcafe);
dfs.addCachePool(new CachePoolInfo("pool"));
long cacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
setPool("pool").setPath(new Path(TEST_FILE)).
setReplication((short) 1).build());
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Mmap the file.
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
ByteBuffer buf =
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
// Attempt to uncache file. The file should still be cached.
dfs.removeCacheDirective(cacheDirectiveId);
Thread.sleep(500);
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Un-mmap the file. The file should be uncached after this.
in.releaseBuffer(buf);
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
// Cleanup
in.close();
cluster.shutdown();
}
/**
* Test that when we have an uncache request, and the client refuses to release
* the replica for a long time, we will un-mlock it.
*/
@Test(timeout=120000)
public void testRevocation() throws Exception {
BlockReaderTestUtil.enableHdfsCachingTracing();
BlockReaderTestUtil.enableShortCircuitShmTracing();
Configuration conf = getDefaultConf();
// Set a really short revocation timeout.
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, 250L);
// Poll very often
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
MiniDFSCluster cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and cache a file.
final String TEST_FILE = "/test_file2";
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
BLOCK_SIZE, (short)1, 0xcafe);
dfs.addCachePool(new CachePoolInfo("pool"));
long cacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
setPool("pool").setPath(new Path(TEST_FILE)).
setReplication((short) 1).build());
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Mmap the file.
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
ByteBuffer buf =
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
// Attempt to uncache file. The file should get uncached.
LOG.info("removing cache directive {}", cacheDirectiveId);
dfs.removeCacheDirective(cacheDirectiveId);
LOG.info("finished removing cache directive {}", cacheDirectiveId);
Thread.sleep(1000);
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
// Cleanup
in.releaseBuffer(buf);
in.close();
cluster.shutdown();
}
}