HDFS-8082. Move dfs.client.read.*, dfs.client.short.circuit.*, dfs.client.mmap.* and dfs.client.hedged.read.* conf from DFSConfigKeys to HdfsClientConfigKeys.

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-04-16 13:22:31 -07:00
parent e1ebad8203
commit bad547d969
28 changed files with 360 additions and 239 deletions

View File

@ -32,8 +32,8 @@ public interface HdfsClientConfigKeys {
static final String PREFIX = "dfs.client."; static final String PREFIX = "dfs.client.";
/** Client retry configuration properties */ /** dfs.client.retry configuration properties */
public interface Retry { interface Retry {
String PREFIX = HdfsClientConfigKeys.PREFIX + "retry."; String PREFIX = HdfsClientConfigKeys.PREFIX + "retry.";
String POLICY_ENABLED_KEY = PREFIX + "policy.enabled"; String POLICY_ENABLED_KEY = PREFIX + "policy.enabled";
@ -53,7 +53,7 @@ public interface HdfsClientConfigKeys {
int WINDOW_BASE_DEFAULT = 3000; int WINDOW_BASE_DEFAULT = 3000;
} }
/** Client failover configuration properties */ /** dfs.client.failover configuration properties */
interface Failover { interface Failover {
String PREFIX = HdfsClientConfigKeys.PREFIX + "failover."; String PREFIX = HdfsClientConfigKeys.PREFIX + "failover.";
@ -70,6 +70,7 @@ public interface HdfsClientConfigKeys {
int CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0; int CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
} }
/** dfs.client.write configuration properties */
interface Write { interface Write {
String PREFIX = HdfsClientConfigKeys.PREFIX + "write."; String PREFIX = HdfsClientConfigKeys.PREFIX + "write.";
@ -92,6 +93,7 @@ public interface HdfsClientConfigKeys {
} }
} }
/** dfs.client.block.write configuration properties */
interface BlockWrite { interface BlockWrite {
String PREFIX = HdfsClientConfigKeys.PREFIX + "block.write."; String PREFIX = HdfsClientConfigKeys.PREFIX + "block.write.";
@ -114,7 +116,60 @@ public interface HdfsClientConfigKeys {
} }
} }
/** HTTP client configuration properties */ /** dfs.client.read configuration properties */
interface Read {
String PREFIX = HdfsClientConfigKeys.PREFIX + "read.";
String PREFETCH_SIZE_KEY = PREFIX + "prefetch.size";
interface ShortCircuit {
String PREFIX = Read.PREFIX + "shortcircuit.";
String KEY = PREFIX.substring(0, PREFIX.length()-1);
boolean DEFAULT = false;
String SKIP_CHECKSUM_KEY = PREFIX + "skip.checksum";
boolean SKIP_CHECKSUM_DEFAULT = false;
String BUFFER_SIZE_KEY = PREFIX + "buffer.size";
int BUFFER_SIZE_DEFAULT = 1024 * 1024;
String STREAMS_CACHE_SIZE_KEY = PREFIX + "streams.cache.size";
int STREAMS_CACHE_SIZE_DEFAULT = 256;
String STREAMS_CACHE_EXPIRY_MS_KEY = PREFIX + "streams.cache.expiry.ms";
long STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5*MINUTE;
}
}
/** dfs.client.short.circuit configuration properties */
interface ShortCircuit {
String PREFIX = Read.PREFIX + "short.circuit.";
String REPLICA_STALE_THRESHOLD_MS_KEY = PREFIX + "replica.stale.threshold.ms";
long REPLICA_STALE_THRESHOLD_MS_DEFAULT = 30*MINUTE;
}
/** dfs.client.mmap configuration properties */
interface Mmap {
String PREFIX = HdfsClientConfigKeys.PREFIX + "mmap.";
String ENABLED_KEY = PREFIX + "enabled";
boolean ENABLED_DEFAULT = true;
String CACHE_SIZE_KEY = PREFIX + "cache.size";
int CACHE_SIZE_DEFAULT = 256;
String CACHE_TIMEOUT_MS_KEY = PREFIX + "cache.timeout.ms";
long CACHE_TIMEOUT_MS_DEFAULT = 60*MINUTE;
String RETRY_TIMEOUT_MS_KEY = PREFIX + "retry.timeout.ms";
long RETRY_TIMEOUT_MS_DEFAULT = 5*MINUTE;
}
/** dfs.client.hedged.read configuration properties */
interface HedgedRead {
String THRESHOLD_MILLIS_KEY = PREFIX + "threshold.millis";
long THRESHOLD_MILLIS_DEFAULT = 500;
String THREADPOOL_SIZE_KEY = PREFIX + "threadpool.size";
int THREADPOOL_SIZE_DEFAULT = 0;
}
/** dfs.http.client configuration properties */
interface HttpClient { interface HttpClient {
String PREFIX = "dfs.http.client."; String PREFIX = "dfs.http.client.";

View File

@ -120,6 +120,10 @@ Release 2.8.0 - UNRELEASED
HDFS-8144. Split TestLazyPersistFiles into multiple tests. (Arpit Agarwal) HDFS-8144. Split TestLazyPersistFiles into multiple tests. (Arpit Agarwal)
HDFS-8082. Move dfs.client.read.*, dfs.client.short.circuit.*,
dfs.client.mmap.* and dfs.client.hedged.read.* conf from DFSConfigKeys
to HdfsClientConfigKeys. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -307,7 +308,7 @@ class BlockReaderLocalLegacy implements BlockReader {
throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " + throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
"buffer size (" + bufferSizeBytes + ") is not large enough to hold " + "buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
"a single chunk (" + bytesPerChecksum + "). Please configure " + "a single chunk (" + bytesPerChecksum + "). Please configure " +
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY +
" appropriately"); " appropriately");
} }

View File

@ -234,7 +234,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private final CachingStrategy defaultReadCachingStrategy; private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy; private final CachingStrategy defaultWriteCachingStrategy;
private final ClientContext clientContext; private final ClientContext clientContext;
private volatile long hedgedReadThresholdMillis;
private static final DFSHedgedReadMetrics HEDGED_READ_METRIC = private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
new DFSHedgedReadMetrics(); new DFSHedgedReadMetrics();
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
@ -369,14 +369,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.clientContext = ClientContext.get( this.clientContext = ClientContext.get(
conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT), conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
dfsClientConf); dfsClientConf);
this.hedgedReadThresholdMillis = conf.getLong(
DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS); this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());
int numThreads = conf.getInt(
DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
if (numThreads > 0) {
this.initThreadsNumForHedgedReads(numThreads);
} }
this.saslClient = new SaslDataTransferClient( this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
@ -3132,15 +3127,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
} }
long getHedgedReadTimeout() {
return this.hedgedReadThresholdMillis;
}
@VisibleForTesting
void setHedgedReadTimeout(long timeoutMillis) {
this.hedgedReadThresholdMillis = timeoutMillis;
}
ThreadPoolExecutor getHedgedReadsThreadPool() { ThreadPoolExecutor getHedgedReadsThreadPool() {
return HEDGED_READ_THREAD_POOL; return HEDGED_READ_THREAD_POOL;
} }

View File

@ -693,7 +693,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// client retry confs are moved to HdfsClientConfigKeys.Retry // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated @Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
= HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY; = HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY;
@ -731,7 +731,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_CLIENT_RETRY_WINDOW_BASE_DEFAULT public static final int DFS_CLIENT_RETRY_WINDOW_BASE_DEFAULT
= HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT; = HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT;
// client failover confs are moved to HdfsClientConfigKeys.Failover // dfs.client.failover confs are moved to HdfsClientConfigKeys.Failover
@Deprecated @Deprecated
public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX
= HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX; = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX;
@ -766,7 +766,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT
= HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT; = HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT;
// client write confs are moved to HdfsClientConfigKeys.Write // dfs.client.write confs are moved to HdfsClientConfigKeys.Write
@Deprecated @Deprecated
public static final String DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY public static final String DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY
= HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_KEY; = HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_KEY;
@ -804,7 +804,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT public static final long DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT
= HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT; = HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT;
// client block.write confs are moved to HdfsClientConfigKeys.BlockWrite // dfs.client.block.write confs are moved to HdfsClientConfigKeys.BlockWrite
@Deprecated @Deprecated
public static final String DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY public static final String DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY
= HdfsClientConfigKeys.BlockWrite.RETRIES_KEY; = HdfsClientConfigKeys.BlockWrite.RETRIES_KEY;
@ -842,6 +842,88 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT
= HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT; = HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT;
// dfs.client.read confs are moved to HdfsClientConfigKeys.Read
@Deprecated
public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY
= HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY;
@Deprecated
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY
= HdfsClientConfigKeys.Read.ShortCircuit.KEY;
@Deprecated
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT
= HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT;
@Deprecated
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY
= HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY;
@Deprecated
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT
= HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY
= HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY;
@Deprecated
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT
= HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY
= HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY;
@Deprecated
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT
= HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY
= HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY;
@Deprecated
public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT
= HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT;
// dfs.client.mmap confs are moved to HdfsClientConfigKeys.Mmap
@Deprecated
public static final String DFS_CLIENT_MMAP_ENABLED
= HdfsClientConfigKeys.Mmap.ENABLED_KEY;
@Deprecated
public static final boolean DFS_CLIENT_MMAP_ENABLED_DEFAULT
= HdfsClientConfigKeys.Mmap.ENABLED_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_MMAP_CACHE_SIZE
= HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY;
@Deprecated
public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT
= HdfsClientConfigKeys.Mmap.CACHE_SIZE_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS
= HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_KEY;
@Deprecated
public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT
= HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS
= HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_KEY;
@Deprecated
public static final long DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT
= HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_DEFAULT;
// dfs.client.short.circuit confs are moved to HdfsClientConfigKeys.ShortCircuit
@Deprecated
public static final String DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS
= HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY;
@Deprecated
public static final long DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT
= HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT;
// dfs.client.hedged.read confs are moved to HdfsClientConfigKeys.HedgedRead
@Deprecated
public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS
= HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY;
@Deprecated
public static final long DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS
= HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_DEFAULT;
@Deprecated
public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE
= HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY;
@Deprecated
public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE
= HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT;
@ -888,30 +970,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces"; public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size";
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY = "dfs.client.read.shortcircuit.streams.cache.size";
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 256;
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY = "dfs.client.read.shortcircuit.streams.cache.expiry.ms";
public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5 * 60 * 1000;
public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic"; public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false; public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
public static final String DFS_CLIENT_MMAP_ENABLED= "dfs.client.mmap.enabled";
public static final boolean DFS_CLIENT_MMAP_ENABLED_DEFAULT = true;
public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 256;
public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";
public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT = 60 * 60 * 1000;
public static final String DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS = "dfs.client.mmap.retry.timeout.ms";
public static final long DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000;
public static final String DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS = "dfs.client.short.circuit.replica.stale.threshold.ms";
public static final long DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT = 30 * 60 * 1000;
// The number of NN response dropped by client proactively in each RPC call. // The number of NN response dropped by client proactively in each RPC call.
// For testing NN retry cache, we can set this property with positive value. // For testing NN retry cache, we can set this property with positive value.
@ -924,14 +985,4 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.client.key.provider.cache.expiry"; "dfs.client.key.provider.cache.expiry";
public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT = public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
TimeUnit.DAYS.toMillis(10); // 10 days TimeUnit.DAYS.toMillis(10); // 10 days
// hedged read properties
public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
"dfs.client.hedged.read.threshold.millis";
public static final long DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
500;
public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE =
"dfs.client.hedged.read.threadpool.size";
public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
} }

View File

@ -1196,6 +1196,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
long end, byte[] buf, int offset, long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
final DfsClientConf conf = dfsClient.getConf();
ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>(); ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
CompletionService<ByteBuffer> hedgedService = CompletionService<ByteBuffer> hedgedService =
new ExecutorCompletionService<ByteBuffer>( new ExecutorCompletionService<ByteBuffer>(
@ -1223,13 +1224,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
futures.add(firstRequest); futures.add(firstRequest);
try { try {
Future<ByteBuffer> future = hedgedService.poll( Future<ByteBuffer> future = hedgedService.poll(
dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS); conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
if (future != null) { if (future != null) {
future.get(); future.get();
return; return;
} }
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() DFSClient.LOG.debug("Waited " + conf.getHedgedReadThresholdMillis()
+ "ms to read from " + chosenNode.info + "ms to read from " + chosenNode.info
+ "; spawning hedged read"); + "; spawning hedged read");
} }

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
/** /**
* Adds deprecated keys into the configuration. * Adds deprecated keys into the configuration.
@ -85,7 +85,7 @@ public class HdfsConfiguration extends Configuration {
new DeprecationDelta("dfs.name.edits.dir", new DeprecationDelta("dfs.name.edits.dir",
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY), DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY),
new DeprecationDelta("dfs.read.prefetch.size", new DeprecationDelta("dfs.read.prefetch.size",
DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY), HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY),
new DeprecationDelta("dfs.safemode.extension", new DeprecationDelta("dfs.safemode.extension",
DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY), DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY),
new DeprecationDelta("dfs.safemode.threshold.pct", new DeprecationDelta("dfs.safemode.threshold.pct",

View File

@ -27,7 +27,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_T
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
@ -99,6 +98,9 @@ public class DfsClientConf {
private final ShortCircuitConf shortCircuitConf; private final ShortCircuitConf shortCircuitConf;
private final long hedgedReadThresholdMillis;
private final int hedgedReadThreadpoolSize;
public DfsClientConf(Configuration conf) { public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout // The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getTimeout(conf); hdfsTimeout = Client.getTimeout(conf);
@ -172,7 +174,7 @@ public class DfsClientConf {
excludedNodesCacheExpiry = conf.getLong( excludedNodesCacheExpiry = conf.getLong(
HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY, HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY,
HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY, prefetchSize = conf.getLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
10 * defaultBlockSize); 10 * defaultBlockSize);
numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
@ -206,6 +208,13 @@ public class DfsClientConf {
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
shortCircuitConf = new ShortCircuitConf(conf); shortCircuitConf = new ShortCircuitConf(conf);
hedgedReadThresholdMillis = conf.getLong(
HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY,
HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_DEFAULT);
hedgedReadThreadpoolSize = conf.getInt(
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
} }
private DataChecksum.Type getChecksumType(Configuration conf) { private DataChecksum.Type getChecksumType(Configuration conf) {
@ -468,6 +477,20 @@ public class DfsClientConf {
return slowIoWarningThresholdMs; return slowIoWarningThresholdMs;
} }
/**
* @return the hedgedReadThresholdMillis
*/
public long getHedgedReadThresholdMillis() {
return hedgedReadThresholdMillis;
}
/**
* @return the hedgedReadThreadpoolSize
*/
public int getHedgedReadThreadpoolSize() {
return hedgedReadThreadpoolSize;
}
/** /**
* @return the shortCircuitConf * @return the shortCircuitConf
*/ */
@ -520,8 +543,8 @@ public class DfsClientConf {
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT); DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
shortCircuitLocalReads = conf.getBoolean( shortCircuitLocalReads = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, HdfsClientConfigKeys.Read.ShortCircuit.KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT); HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT);
domainSocketDataTraffic = conf.getBoolean( domainSocketDataTraffic = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT); DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
@ -532,7 +555,7 @@ public class DfsClientConf {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL LOG.debug(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
+ " = " + useLegacyBlockReaderLocal); + " = " + useLegacyBlockReaderLocal);
LOG.debug(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY LOG.debug(HdfsClientConfigKeys.Read.ShortCircuit.KEY
+ " = " + shortCircuitLocalReads); + " = " + shortCircuitLocalReads);
LOG.debug(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC LOG.debug(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
+ " = " + domainSocketDataTraffic); + " = " + domainSocketDataTraffic);
@ -541,32 +564,32 @@ public class DfsClientConf {
} }
skipShortCircuitChecksums = conf.getBoolean( skipShortCircuitChecksums = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT); HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT);
shortCircuitBufferSize = conf.getInt( shortCircuitBufferSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY, HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT); HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_DEFAULT);
shortCircuitStreamsCacheSize = conf.getInt( shortCircuitStreamsCacheSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT); HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT);
shortCircuitStreamsCacheExpiryMs = conf.getLong( shortCircuitStreamsCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT); HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT);
shortCircuitMmapEnabled = conf.getBoolean( shortCircuitMmapEnabled = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED, HdfsClientConfigKeys.Mmap.ENABLED_KEY,
DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT); HdfsClientConfigKeys.Mmap.ENABLED_DEFAULT);
shortCircuitMmapCacheSize = conf.getInt( shortCircuitMmapCacheSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT); HdfsClientConfigKeys.Mmap.CACHE_SIZE_DEFAULT);
shortCircuitMmapCacheExpiryMs = conf.getLong( shortCircuitMmapCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_KEY,
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT); HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_DEFAULT);
shortCircuitMmapCacheRetryTimeout = conf.getLong( shortCircuitMmapCacheRetryTimeout = conf.getLong(
DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS, HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_KEY,
DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT); HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_DEFAULT);
shortCircuitCacheStaleThresholdMs = conf.getLong( shortCircuitCacheStaleThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS, HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY,
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT); HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT);
shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt( shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT); DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);

View File

@ -38,8 +38,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
@ -87,10 +87,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName; import javax.management.ObjectName;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -102,14 +98,15 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.net.DomainPeerServer; import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -143,8 +140,8 @@ import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@ -189,7 +186,10 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.tracing.SpanReceiverHost;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService; import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocol;
import org.apache.hadoop.tracing.TraceAdminProtocolPB; import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB; import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -201,14 +201,15 @@ import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.tracing.SpanReceiverHost;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import org.apache.hadoop.tracing.TraceAdminProtocol;
import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
/********************************************************** /**********************************************************
@ -413,8 +414,8 @@ public class DataNode extends ReconfigurableBase
conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED"); conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
// Determine whether we should try to pass file descriptors to clients. // Determine whether we should try to pass file descriptors to clients.
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) { HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT)) {
String reason = DomainSocket.getLoadingFailureReason(); String reason = DomainSocket.getLoadingFailureReason();
if (reason != null) { if (reason != null) {
LOG.warn("File descriptor passing is disabled because " + reason); LOG.warn("File descriptor passing is disabled because " + reason);
@ -934,8 +935,8 @@ public class DataNode extends ReconfigurableBase
this.dataXceiverServer = new Daemon(threadGroup, xserver); this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty this.threadGroup.setDaemon(true); // auto destroy when empty
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) || HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) ||
conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) { DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
DomainPeerServer domainPeerServer = DomainPeerServer domainPeerServer =
@ -950,14 +951,14 @@ public class DataNode extends ReconfigurableBase
this.shortCircuitRegistry = new ShortCircuitRegistry(conf); this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
} }
static DomainPeerServer getDomainPeerServer(Configuration conf, private static DomainPeerServer getDomainPeerServer(Configuration conf,
int port) throws IOException { int port) throws IOException {
String domainSocketPath = String domainSocketPath =
conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT); DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
if (domainSocketPath.isEmpty()) { if (domainSocketPath.isEmpty()) {
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) && HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) &&
(!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) { DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
LOG.warn("Although short-circuit local reads are configured, " + LOG.warn("Although short-circuit local reads are configured, " +

View File

@ -37,8 +37,6 @@ import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.net.DomainPeer;
@ -339,27 +337,6 @@ public class ShortCircuitCache implements Closeable {
*/ */
private final DfsClientShmManager shmManager; private final DfsClientShmManager shmManager;
/**
* Create a {@link ShortCircuitCache} object from a {@link Configuration}
*/
public static ShortCircuitCache fromConf(Configuration conf) {
return new ShortCircuitCache(
conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT),
conf.getInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT),
conf.getLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT),
conf.getLong(DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT),
conf.getLong(DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT),
conf.getInt(DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT));
}
public static ShortCircuitCache fromConf(ShortCircuitConf conf) { public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
return new ShortCircuitCache( return new ShortCircuitCache(
conf.getShortCircuitStreamsCacheSize(), conf.getShortCircuitStreamsCacheSize(),

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
@ -48,6 +46,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@ -120,15 +119,15 @@ public class TestEnhancedByteBufferAccess {
Assume.assumeTrue(NativeIO.isAvailable()); Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX); Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3); conf.setInt(HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY, 3);
conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100); conf.setLong(HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_KEY, 100);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), new File(sockDir.getDir(),
"TestRequestMmapAccess._PORT.sock").getAbsolutePath()); "TestRequestMmapAccess._PORT.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys. conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true); true);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000); conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000); conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
@ -597,8 +596,8 @@ public class TestEnhancedByteBufferAccess {
final Path TEST_PATH = new Path("/a"); final Path TEST_PATH = new Path("/a");
final int RANDOM_SEED = 23453; final int RANDOM_SEED = 23453;
HdfsConfiguration conf = initZeroCopyTest(); HdfsConfiguration conf = initZeroCopyTest();
conf.setBoolean(DFSConfigKeys. conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); false);
final String CONTEXT = "testZeroCopyReadOfCachedData"; final String CONTEXT = "testZeroCopyReadOfCachedData";
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT); conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
@ -715,7 +714,7 @@ public class TestEnhancedByteBufferAccess {
@Test @Test
public void testClientMmapDisable() throws Exception { public void testClientMmapDisable() throws Exception {
HdfsConfiguration conf = initZeroCopyTest(); HdfsConfiguration conf = initZeroCopyTest();
conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, false); conf.setBoolean(HdfsClientConfigKeys.Mmap.ENABLED_KEY, false);
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a"); final Path TEST_PATH = new Path("/a");
final int TEST_FILE_LENGTH = 16385; final int TEST_FILE_LENGTH = 16385;
@ -726,8 +725,8 @@ public class TestEnhancedByteBufferAccess {
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT); conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
try { try {
// With DFS_CLIENT_MMAP_ENABLED set to false, we should not do memory // With HdfsClientConfigKeys.Mmap.ENABLED_KEY set to false,
// mapped reads. // we should not do memory mapped reads.
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(); cluster.waitActive();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
@ -751,9 +750,9 @@ public class TestEnhancedByteBufferAccess {
fs = null; fs = null;
cluster = null; cluster = null;
try { try {
// Now try again with DFS_CLIENT_MMAP_CACHE_SIZE == 0. It should work. // Now try again with HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY == 0.
conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, true); conf.setBoolean(HdfsClientConfigKeys.Mmap.ENABLED_KEY, true);
conf.setInt(DFS_CLIENT_MMAP_CACHE_SIZE, 0); conf.setInt(HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY, 0);
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT + ".1"); conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT + ".1");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(); cluster.waitActive();

View File

@ -20,12 +20,12 @@ package org.apache.hadoop.fs;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.PeerCache; import org.apache.hadoop.hdfs.PeerCache;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -47,7 +47,7 @@ public class TestUnbuffer {
// Disable short-circuit reads. With short-circuit, we wouldn't hold open a // Disable short-circuit reads. With short-circuit, we wouldn't hold open a
// TCP socket. // TCP socket.
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
// Set a really long socket timeout to avoid test timing issues. // Set a really long socket timeout to avoid test timing issues.
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@ -100,7 +100,7 @@ public class TestUnbuffer {
public void testOpenManyFilesViaTcp() throws Exception { public void testOpenManyFilesViaTcp() throws Exception {
final int NUM_OPENS = 500; final int NUM_OPENS = 500;
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS]; FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS];
try { try {

View File

@ -1470,7 +1470,7 @@ public class DFSTestUtil {
public Configuration newConfiguration() { public Configuration newConfiguration() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), new File(sockDir.getDir(),
testName + "._PORT.sock").getAbsolutePath()); testName + "._PORT.sock").getAbsolutePath());

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
@ -43,6 +41,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo; import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
@ -81,8 +80,8 @@ public class TestBlockReaderFactory {
conf.setLong(DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
testName + "._PORT").getAbsolutePath()); testName + "._PORT").getAbsolutePath());
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
false); false);
conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
return conf; return conf;
@ -108,7 +107,7 @@ public class TestBlockReaderFactory {
"testFallbackFromShortCircuitToUnixDomainTraffic_clientContext"); "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext");
clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true); clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
Configuration serverConf = new Configuration(clientConf); Configuration serverConf = new Configuration(clientConf);
serverConf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); serverConf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
MiniDFSCluster cluster = MiniDFSCluster cluster =
new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -123,8 +124,8 @@ public class TestBlockReaderLocal {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys. conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum); !checksum);
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
BlockReaderLocalTest.BYTES_PER_CHECKSUM); BlockReaderLocalTest.BYTES_PER_CHECKSUM);
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
@ -721,10 +722,10 @@ public class TestBlockReaderLocal {
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock"). new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock").
getAbsolutePath()); getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
DomainSocket.disableBindPathValidation(); DomainSocket.disableBindPathValidation();
} else { } else {
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
} }
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a"); final Path TEST_PATH = new Path("/a");

View File

@ -63,9 +63,9 @@ public class TestBlockReaderLocalLegacy {
new File(socketDir.getDir(), "TestBlockReaderLocalLegacy.%d.sock"). new File(socketDir.getDir(), "TestBlockReaderLocalLegacy.%d.sock").
getAbsolutePath()); getAbsolutePath());
} }
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
false); false);
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName()); UserGroupInformation.getCurrentUser().getShortUserName());

View File

@ -17,9 +17,7 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.log4j.Level;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -29,7 +27,7 @@ public class TestParallelRead extends TestParallelReadUtil {
// This is a test of the normal (TCP) read path. For this reason, we turn // This is a test of the normal (TCP) read path. For this reason, we turn
// off both short-circuit local reads and UNIX domain socket data traffic. // off both short-circuit local reads and UNIX domain socket data traffic.
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
false); false);
// dfs.domain.socket.path should be ignored because the previous two keys // dfs.domain.socket.path should be ignored because the previous two keys

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -30,9 +31,9 @@ public class TestParallelShortCircuitLegacyRead extends TestParallelReadUtil {
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, ""); conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "");
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(DFSConfigKeys. conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); false);
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName()); UserGroupInformation.getCurrentUser().getShortUserName());
DomainSocket.disableBindPathValidation(); DomainSocket.disableBindPathValidation();

View File

@ -17,15 +17,17 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.equalTo;
import java.io.File; import java.io.File;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import static org.hamcrest.CoreMatchers.*;
public class TestParallelShortCircuitRead extends TestParallelReadUtil { public class TestParallelShortCircuitRead extends TestParallelReadUtil {
private static TemporarySocketDirectory sockDir; private static TemporarySocketDirectory sockDir;
@ -38,9 +40,9 @@ public class TestParallelShortCircuitRead extends TestParallelReadUtil {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath()); new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(DFSConfigKeys. conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); false);
DomainSocket.disableBindPathValidation(); DomainSocket.disableBindPathValidation();
setupCluster(1, conf); setupCluster(1, conf);
} }

View File

@ -17,15 +17,17 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.equalTo;
import java.io.File; import java.io.File;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import static org.hamcrest.CoreMatchers.*;
public class TestParallelShortCircuitReadNoChecksum extends TestParallelReadUtil { public class TestParallelShortCircuitReadNoChecksum extends TestParallelReadUtil {
private static TemporarySocketDirectory sockDir; private static TemporarySocketDirectory sockDir;
@ -38,9 +40,9 @@ public class TestParallelShortCircuitReadNoChecksum extends TestParallelReadUtil
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath()); new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(DFSConfigKeys. conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true); true);
DomainSocket.disableBindPathValidation(); DomainSocket.disableBindPathValidation();
setupCluster(1, conf); setupCluster(1, conf);
} }

View File

@ -17,14 +17,17 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.equalTo;
import java.io.File; import java.io.File;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import static org.hamcrest.CoreMatchers.*;
/** /**
* This class tests short-circuit local reads without any FileInputStream or * This class tests short-circuit local reads without any FileInputStream or
@ -41,13 +44,13 @@ public class TestParallelShortCircuitReadUnCached extends TestParallelReadUtil {
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), new File(sockDir.getDir(),
"TestParallelShortCircuitReadUnCached._PORT.sock").getAbsolutePath()); "TestParallelShortCircuitReadUnCached._PORT.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
// Enabling data transfer encryption should have no effect when using // Enabling data transfer encryption should have no effect when using
// short-circuit local reads. This is a regression test for HDFS-5353. // short-circuit local reads. This is a regression test for HDFS-5353.
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setBoolean(DFSConfigKeys. conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); false);
conf.setBoolean(DFSConfigKeys. conf.setBoolean(DFSConfigKeys.
DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true); DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
// We want to test reading from stale sockets. // We want to test reading from stale sockets.
@ -56,8 +59,8 @@ public class TestParallelShortCircuitReadUnCached extends TestParallelReadUtil {
5 * 60 * 1000); 5 * 60 * 1000);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 32); conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 32);
// Avoid using the FileInputStreamCache. // Avoid using the FileInputStreamCache.
conf.setInt(DFSConfigKeys. conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY,
DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, 0); 0);
DomainSocket.disableBindPathValidation(); DomainSocket.disableBindPathValidation();
DFSInputStream.tcpReadsDisabledForTesting = true; DFSInputStream.tcpReadsDisabledForTesting = true;
setupCluster(1, conf); setupCluster(1, conf);

View File

@ -17,15 +17,17 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.hamcrest.CoreMatchers.equalTo;
import java.io.File; import java.io.File;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import static org.hamcrest.CoreMatchers.*;
public class TestParallelUnixDomainRead extends TestParallelReadUtil { public class TestParallelUnixDomainRead extends TestParallelReadUtil {
private static TemporarySocketDirectory sockDir; private static TemporarySocketDirectory sockDir;
@ -38,7 +40,7 @@ public class TestParallelUnixDomainRead extends TestParallelReadUtil {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath()); new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
DomainSocket.disableBindPathValidation(); DomainSocket.disableBindPathValidation();
setupCluster(1, conf); setupCluster(1, conf);

View File

@ -276,8 +276,8 @@ public class TestPread {
public void testHedgedPreadDFSBasic() throws IOException { public void testHedgedPreadDFSBasic() throws IOException {
isHedgedRead = true; isHedgedRead = true;
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 5);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 1); conf.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, 1);
dfsPreadTest(conf, false, true); // normal pread dfsPreadTest(conf, false, true); // normal pread
dfsPreadTest(conf, true, true); // trigger read code path without dfsPreadTest(conf, true, true); // trigger read code path without
// transferTo. // transferTo.
@ -289,9 +289,9 @@ public class TestPread {
int numHedgedReadPoolThreads = 5; int numHedgedReadPoolThreads = 5;
final int hedgedReadTimeoutMillis = 50; final int hedgedReadTimeoutMillis = 50;
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
numHedgedReadPoolThreads); numHedgedReadPoolThreads);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, conf.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY,
hedgedReadTimeoutMillis); hedgedReadTimeoutMillis);
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0); conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
// Set up the InjectionHandler // Set up the InjectionHandler
@ -365,9 +365,9 @@ public class TestPread {
int numHedgedReadPoolThreads = 5; int numHedgedReadPoolThreads = 5;
final int initialHedgedReadTimeoutMillis = 50000; final int initialHedgedReadTimeoutMillis = 50000;
final int fixedSleepIntervalMillis = 50; final int fixedSleepIntervalMillis = 50;
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
numHedgedReadPoolThreads); numHedgedReadPoolThreads);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, conf.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY,
initialHedgedReadTimeoutMillis); initialHedgedReadTimeoutMillis);
// Set up the InjectionHandler // Set up the InjectionHandler
@ -407,7 +407,14 @@ public class TestPread {
* that there were hedged reads. But, none of the reads had to run in the * that there were hedged reads. But, none of the reads had to run in the
* current thread. * current thread.
*/ */
dfsClient.setHedgedReadTimeout(50); // 50ms {
Configuration conf2 = new Configuration(cluster.getConfiguration(0));
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
conf2.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, 50);
fileSys.close();
fileSys = (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf2);
metrics = fileSys.getClient().getHedgedReadMetrics();
}
pReadFile(fileSys, file1); pReadFile(fileSys, file1);
// assert that there were hedged reads // assert that there were hedged reads
assertTrue(metrics.getHedgedReadOps() > 0); assertTrue(metrics.getHedgedReadOps() > 0);
@ -442,7 +449,7 @@ public class TestPread {
private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum) private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum)
throws IOException { throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY, 4096);
// Set short retry timeouts so this test runs faster // Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0); conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
if (simulatedStorage) { if (simulatedStorage) {

View File

@ -23,8 +23,6 @@ import java.io.File;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.EnumSet; import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -35,6 +33,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -84,7 +83,7 @@ public class TestFsDatasetCacheRevocation {
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
TestFsDatasetCache.CACHE_CAPACITY); TestFsDatasetCache.CACHE_CAPACITY);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), "sock").getAbsolutePath()); new File(sockDir.getDir(), "sock").getAbsolutePath());
return conf; return conf;

View File

@ -17,6 +17,34 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -24,12 +52,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@ -44,25 +73,6 @@ import org.junit.After;
import org.junit.Rule; import org.junit.Rule;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public abstract class LazyPersistTestCase { public abstract class LazyPersistTestCase {
static final byte LAZY_PERSIST_POLICY_ID = (byte) 15; static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
@ -237,7 +247,7 @@ public abstract class LazyPersistTestCase {
EVICTION_LOW_WATERMARK * BLOCK_SIZE); EVICTION_LOW_WATERMARK * BLOCK_SIZE);
if (useSCR) { if (useSCR) {
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
// Do not share a client context across tests. // Do not share a client context across tests.
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString()); conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
if (useLegacyBlockReaderLocal) { if (useLegacyBlockReaderLocal) {
@ -299,7 +309,7 @@ public abstract class LazyPersistTestCase {
if (useSCR) if (useSCR)
{ {
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,useSCR);
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString()); conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
sockDir = new TemporarySocketDirectory(); sockDir = new TemporarySocketDirectory();
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),

View File

@ -20,10 +20,7 @@ package org.apache.hadoop.hdfs.shortcircuit;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -36,7 +33,6 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import com.google.common.collect.HashMultimap;
import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -45,12 +41,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.BlockReaderTestUtil; import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -61,8 +57,8 @@ import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisito
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor; import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@ -76,6 +72,7 @@ import org.junit.Test;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.HashMultimap;
public class TestShortCircuitCache { public class TestShortCircuitCache {
static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class); static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);
@ -388,8 +385,8 @@ public class TestShortCircuitCache {
conf.setLong(DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
testName).getAbsolutePath()); testName).getAbsolutePath());
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
false); false);
conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
DFSInputStream.tcpReadsDisabledForTesting = true; DFSInputStream.tcpReadsDisabledForTesting = true;
@ -541,7 +538,7 @@ public class TestShortCircuitCache {
"testUnlinkingReplicasInFileDescriptorCache", sockDir); "testUnlinkingReplicasInFileDescriptorCache", sockDir);
// We don't want the CacheCleaner to time out short-circuit shared memory // We don't want the CacheCleaner to time out short-circuit shared memory
// segments during the test, so set the timeout really high. // segments during the test, so set the timeout really high.
conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L); 1000000000L);
MiniDFSCluster cluster = MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
@ -637,7 +634,8 @@ public class TestShortCircuitCache {
TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf( Configuration conf = createShortCircuitConf(
"testDataXceiverCleansUpSlotsOnFailure", sockDir); "testDataXceiverCleansUpSlotsOnFailure", sockDir);
conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, conf.setLong(
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L); 1000000000L);
MiniDFSCluster cluster = MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); new MiniDFSCluster.Builder(conf).numDataNodes(1).build();

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestBlockReaderLocal; import org.apache.hadoop.hdfs.TestBlockReaderLocal;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -247,8 +248,8 @@ public class TestShortCircuitLocalRead {
int readOffset, String shortCircuitUser, String readingUser, int readOffset, String shortCircuitUser, String readingUser,
boolean legacyShortCircuitFails) throws IOException, InterruptedException { boolean legacyShortCircuitFails) throws IOException, InterruptedException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
ignoreChecksum); ignoreChecksum);
// Set a random client context name so that we don't share a cache with // Set a random client context name so that we don't share a cache with
// other invocations of this function. // other invocations of this function.
@ -384,8 +385,8 @@ public class TestShortCircuitLocalRead {
public void testSkipWithVerifyChecksum() throws IOException { public void testSkipWithVerifyChecksum() throws IOException {
int size = blockSize; int size = blockSize;
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
"/tmp/testSkipWithVerifyChecksum._PORT"); "/tmp/testSkipWithVerifyChecksum._PORT");
DomainSocket.disableBindPathValidation(); DomainSocket.disableBindPathValidation();
@ -431,8 +432,8 @@ public class TestShortCircuitLocalRead {
public void testHandleTruncatedBlockFile() throws IOException { public void testHandleTruncatedBlockFile() throws IOException {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
"/tmp/testHandleTruncatedBlockFile._PORT"); "/tmp/testHandleTruncatedBlockFile._PORT");
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
@ -529,10 +530,10 @@ public class TestShortCircuitLocalRead {
// Setup create a file // Setup create a file
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, shortcircuit);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
"/tmp/TestShortCircuitLocalRead._PORT"); "/tmp/TestShortCircuitLocalRead._PORT");
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
checksum); checksum);
//Override fileSize and DATA_TO_WRITE to much larger values for benchmark test //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
@ -592,7 +593,7 @@ public class TestShortCircuitLocalRead {
int readOffset, boolean shortCircuitFails) throws IOException, InterruptedException { int readOffset, boolean shortCircuitFails) throws IOException, InterruptedException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build(); .format(true).build();

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.tracing;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -26,19 +28,17 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.htrace.Sampler; import org.apache.htrace.Sampler;
import org.apache.htrace.Span;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
public class TestTracingShortCircuitLocalRead { public class TestTracingShortCircuitLocalRead {
private static Configuration conf; private static Configuration conf;
@ -67,8 +67,8 @@ public class TestTracingShortCircuitLocalRead {
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY,
TestTracing.SetSpanReceiver.class.getName()); TestTracing.SetSpanReceiver.class.getName());
conf.setLong("dfs.blocksize", 100 * 1024); conf.setLong("dfs.blocksize", 100 * 1024);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
"testShortCircuitTraceHooks._PORT"); "testShortCircuitTraceHooks._PORT");
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");