HDFS-12528. Add an option to not disable short-circuit reads on failures. Contributed by Xiao Chen.

(cherry picked from commit 2e7331ca26)
This commit is contained in:
Weiwei Yang 2018-01-31 07:25:03 +08:00 committed by Xiao Chen
parent fffa7dc377
commit 0c2c4c20cb
7 changed files with 176 additions and 15 deletions

View File

@ -140,6 +140,9 @@ public interface HdfsClientConfigKeys {
boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
String DFS_DOMAIN_SOCKET_PATH_KEY = "dfs.domain.socket.path";
String DFS_DOMAIN_SOCKET_PATH_DEFAULT = "";
String DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY =
"dfs.domain.socket.disable.interval.seconds";
long DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT = 600;
String DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS =
"dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.client.impl;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
import java.io.BufferedOutputStream;
@ -644,10 +645,17 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
LOG.debug("{}:{}", this, msg);
return new ShortCircuitReplicaInfo(new InvalidToken(msg));
default:
LOG.warn(this + ": unknown response code " + resp.getStatus() +
" while attempting to set up short-circuit access. " +
resp.getMessage() + ". Disabling short-circuit read for DataNode "
+ datanode + " temporarily.");
final long expiration =
clientContext.getDomainSocketFactory().getPathExpireSeconds();
String disableMsg = "disabled temporarily for " + expiration + " seconds";
if (expiration == 0) {
disableMsg = "not disabled";
}
LOG.warn("{}: unknown response code {} while attempting to set up "
+ "short-circuit access. {}. Short-circuit read for "
+ "DataNode {} is {} based on {}.",
this, resp.getStatus(), resp.getMessage(), datanode,
disableMsg, DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY);
clientContext.getDomainSocketFactory()
.disableShortCircuitForPath(pathInfo.getPath());
return null;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.client.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -69,6 +70,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT;
@ -616,6 +619,7 @@ public class DfsClientConf {
private final long shortCircuitMmapCacheExpiryMs;
private final long shortCircuitMmapCacheRetryTimeout;
private final long shortCircuitCacheStaleThresholdMs;
private final long domainSocketDisableIntervalSeconds;
private final long keyProviderCacheExpiryMs;
@ -682,6 +686,11 @@ public class DfsClientConf {
shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
domainSocketDisableIntervalSeconds = conf.getLong(
DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY,
DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT);
Preconditions.checkArgument(domainSocketDisableIntervalSeconds >= 0,
DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY + "can't be negative.");
keyProviderCacheExpiryMs = conf.getLong(
DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
@ -794,6 +803,13 @@ public class DfsClientConf {
return shortCircuitCacheStaleThresholdMs;
}
/**
* @return the domainSocketDisableIntervalSeconds
*/
public long getDomainSocketDisableIntervalSeconds() {
return domainSocketDisableIntervalSeconds;
}
/**
* @return the keyProviderCacheExpiryMs
*/
@ -828,7 +844,9 @@ public class DfsClientConf {
+ ", shortCircuitSharedMemoryWatcherInterruptCheckMs = "
+ shortCircuitSharedMemoryWatcherInterruptCheckMs
+ ", keyProviderCacheExpiryMs = "
+ keyProviderCacheExpiryMs;
+ keyProviderCacheExpiryMs
+ ", domainSocketDisableIntervalSeconds = "
+ domainSocketDisableIntervalSeconds;
}
}
}

View File

@ -92,10 +92,8 @@ public class DomainSocketFactory {
/**
* Information about domain socket paths.
*/
final Cache<String, PathState> pathMap =
CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
private final long pathExpireSeconds;
private final Cache<String, PathState> pathMap;
public DomainSocketFactory(ShortCircuitConf conf) {
final String feature;
@ -121,6 +119,10 @@ public class DomainSocketFactory {
LOG.debug(feature + " is enabled.");
}
}
pathExpireSeconds = conf.getDomainSocketDisableIntervalSeconds();
pathMap = CacheBuilder.newBuilder()
.expireAfterWrite(pathExpireSeconds, TimeUnit.SECONDS).build();
}
/**
@ -192,4 +194,8 @@ public class DomainSocketFactory {
public void clearPathMap() {
pathMap.invalidateAll();
}
public long getPathExpireSeconds() {
return pathExpireSeconds;
}
}

View File

@ -278,7 +278,7 @@ public class ShortCircuitCache implements Closeable {
* Maximum total size of the cache, including both mmapped and
* no$-mmapped elements.
*/
private final int maxTotalSize;
private int maxTotalSize;
/**
* Non-mmaped elements older than this will be closed.
@ -369,6 +369,11 @@ public class ShortCircuitCache implements Closeable {
return staleThresholdMs;
}
@VisibleForTesting
public void setMaxTotalSize(int maxTotalSize) {
this.maxTotalSize = maxTotalSize;
}
/**
* Increment the reference count of a replica, and remove it from any free
* list it may be in.
@ -1025,4 +1030,13 @@ public class ShortCircuitCache implements Closeable {
public DfsClientShmManager getDfsClientShmManager() {
return shmManager;
}
/**
* Can be used in testing to verify whether a read went through SCR, after
* the read is done and before the stream is closed.
*/
@VisibleForTesting
public int getReplicaInfoMapSize() {
return replicaInfoMap.size();
}
}

View File

@ -2550,6 +2550,17 @@
</description>
</property>
<property>
<name>dfs.domain.socket.disable.interval.seconds</name>
<value>600</value>
<description>
The interval that a DataNode is disabled for future Short-Circuit Reads,
after an error happens during a Short-Circuit Read. Setting this to 0 will
not disable Short-Circuit Reads at all after errors happen. Negative values
are invalid.
</description>
</property>
<property>
<name>dfs.client.read.shortcircuit.skip.checksum</name>
<value>false</value>

View File

@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONT
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY;
import static org.hamcrest.CoreMatchers.equalTo;
import java.io.File;
@ -36,9 +38,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
@ -54,18 +55,29 @@ import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisito
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.test.GenericTestUtils;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestBlockReaderFactory {
static final Log LOG = LogFactory.getLog(TestBlockReaderFactory.class);
static final Logger LOG =
LoggerFactory.getLogger(TestBlockReaderFactory.class);
@Rule
public final Timeout globalTimeout = new Timeout(180000);
@Before
public void init() {
@ -209,7 +221,7 @@ public class TestBlockReaderFactory {
* occurs); however, the failure result should not be cached. We want
* to be able to retry later and succeed.
*/
@Test(timeout=60000)
@Test
public void testShortCircuitCacheTemporaryFailure()
throws Exception {
BlockReaderTestUtil.enableBlockReaderFactoryTracing();
@ -302,6 +314,95 @@ public class TestBlockReaderFactory {
Assert.assertFalse(testFailed.get());
}
/**
* Test that by default, reads after a failure does not go through SCR.
*/
@Test
public void testShortCircuitCacheUnbufferDefault() throws Exception {
testShortCircuitCacheUnbufferWithDisableInterval(
DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT, true);
}
/**
* Test the case where if we disable the cache in
* {@link org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory}, reads
* after a failure still goes through SCR.
*/
@Test
public void testShortCircuitCacheUnbufferDisabled() throws Exception {
testShortCircuitCacheUnbufferWithDisableInterval(0, false);
}
private void testShortCircuitCacheUnbufferWithDisableInterval(
final long interval, final boolean disabled) throws Exception {
final String testName = GenericTestUtils.getMethodName();
BlockReaderTestUtil.enableBlockReaderFactoryTracing();
try (TemporarySocketDirectory sockDir = new TemporarySocketDirectory()) {
Configuration conf = createShortCircuitConf(testName, sockDir);
conf.set(DFS_CLIENT_CONTEXT, testName + interval + disabled);
conf.setLong(DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY, interval);
Configuration serverConf = new Configuration(conf);
MiniDFSCluster.Builder builder =
new MiniDFSCluster.Builder(serverConf).numDataNodes(1);
try (MiniDFSCluster cluster = builder.build();
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
.get(cluster.getURI(0), conf)) {
cluster.waitActive();
final Path testFile = new Path("/test_file");
final int testFileLen = 4000;
final int seed = 0xFADED;
DFSTestUtil.createFile(dfs, testFile, testFileLen, (short) 1, seed);
final byte[] expected = DFSTestUtil.
calculateFileContentsFromSeed(seed, testFileLen);
try (FSDataInputStream in = dfs.open(testFile)) {
Assert.assertEquals(0,
dfs.getClient().getClientContext().getShortCircuitCache()
.getReplicaInfoMapSize());
final byte[] buf = new byte[testFileLen];
IOUtils.readFully(in, buf, 0, testFileLen);
validateReadResult(dfs, expected, buf, 1);
// Set cache size to 0 so the replica marked evictable by unbuffer
// will be purged immediately.
dfs.getClient().getClientContext().getShortCircuitCache()
.setMaxTotalSize(0);
LOG.info("Unbuffering");
in.unbuffer();
Assert.assertEquals(0,
dfs.getClient().getClientContext().getShortCircuitCache()
.getReplicaInfoMapSize());
DFSTestUtil.appendFile(dfs, testFile, "append more data");
// This read will force a new replica read via TCP.
Arrays.fill(buf, (byte) 0);
in.seek(0);
IOUtils.readFully(in, buf, 0, testFileLen);
validateReadResult(dfs, expected, buf, 0);
}
LOG.info("Reading {} again.", testFile);
try (FSDataInputStream in = dfs.open(testFile)) {
final byte[] buf = new byte[testFileLen];
Arrays.fill(buf, (byte) 0);
IOUtils.readFully(in, buf, 0, testFileLen);
final int expectedMapSize = disabled ? 0 : 1;
validateReadResult(dfs, expected, buf, expectedMapSize);
}
}
}
}
private void validateReadResult(final DistributedFileSystem dfs,
final byte[] expected, final byte[] actual,
final int expectedScrRepMapSize) {
Assert.assertThat(expected, CoreMatchers.is(actual));
Assert.assertEquals(expectedScrRepMapSize,
dfs.getClient().getClientContext().getShortCircuitCache()
.getReplicaInfoMapSize());
}
/**
* Test that a client which supports short-circuit reads using
* shared memory can fall back to not using shared memory when