HDFS-8082. Move dfs.client.read.*, dfs.client.short.circuit.*, dfs.client.mmap.* and dfs.client.hedged.read.* conf from DFSConfigKeys to HdfsClientConfigKeys.
This commit is contained in:
parent
80a2a12423
commit
75bbcc8bf3
|
@ -32,8 +32,8 @@ public interface HdfsClientConfigKeys {
|
||||||
|
|
||||||
static final String PREFIX = "dfs.client.";
|
static final String PREFIX = "dfs.client.";
|
||||||
|
|
||||||
/** Client retry configuration properties */
|
/** dfs.client.retry configuration properties */
|
||||||
public interface Retry {
|
interface Retry {
|
||||||
String PREFIX = HdfsClientConfigKeys.PREFIX + "retry.";
|
String PREFIX = HdfsClientConfigKeys.PREFIX + "retry.";
|
||||||
|
|
||||||
String POLICY_ENABLED_KEY = PREFIX + "policy.enabled";
|
String POLICY_ENABLED_KEY = PREFIX + "policy.enabled";
|
||||||
|
@ -53,7 +53,7 @@ public interface HdfsClientConfigKeys {
|
||||||
int WINDOW_BASE_DEFAULT = 3000;
|
int WINDOW_BASE_DEFAULT = 3000;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Client failover configuration properties */
|
/** dfs.client.failover configuration properties */
|
||||||
interface Failover {
|
interface Failover {
|
||||||
String PREFIX = HdfsClientConfigKeys.PREFIX + "failover.";
|
String PREFIX = HdfsClientConfigKeys.PREFIX + "failover.";
|
||||||
|
|
||||||
|
@ -70,6 +70,7 @@ public interface HdfsClientConfigKeys {
|
||||||
int CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
|
int CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** dfs.client.write configuration properties */
|
||||||
interface Write {
|
interface Write {
|
||||||
String PREFIX = HdfsClientConfigKeys.PREFIX + "write.";
|
String PREFIX = HdfsClientConfigKeys.PREFIX + "write.";
|
||||||
|
|
||||||
|
@ -92,6 +93,7 @@ public interface HdfsClientConfigKeys {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** dfs.client.block.write configuration properties */
|
||||||
interface BlockWrite {
|
interface BlockWrite {
|
||||||
String PREFIX = HdfsClientConfigKeys.PREFIX + "block.write.";
|
String PREFIX = HdfsClientConfigKeys.PREFIX + "block.write.";
|
||||||
|
|
||||||
|
@ -114,7 +116,60 @@ public interface HdfsClientConfigKeys {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** HTTP client configuration properties */
|
/** dfs.client.read configuration properties */
|
||||||
|
interface Read {
|
||||||
|
String PREFIX = HdfsClientConfigKeys.PREFIX + "read.";
|
||||||
|
|
||||||
|
String PREFETCH_SIZE_KEY = PREFIX + "prefetch.size";
|
||||||
|
|
||||||
|
interface ShortCircuit {
|
||||||
|
String PREFIX = Read.PREFIX + "shortcircuit.";
|
||||||
|
|
||||||
|
String KEY = PREFIX.substring(0, PREFIX.length()-1);
|
||||||
|
boolean DEFAULT = false;
|
||||||
|
String SKIP_CHECKSUM_KEY = PREFIX + "skip.checksum";
|
||||||
|
boolean SKIP_CHECKSUM_DEFAULT = false;
|
||||||
|
String BUFFER_SIZE_KEY = PREFIX + "buffer.size";
|
||||||
|
int BUFFER_SIZE_DEFAULT = 1024 * 1024;
|
||||||
|
|
||||||
|
String STREAMS_CACHE_SIZE_KEY = PREFIX + "streams.cache.size";
|
||||||
|
int STREAMS_CACHE_SIZE_DEFAULT = 256;
|
||||||
|
String STREAMS_CACHE_EXPIRY_MS_KEY = PREFIX + "streams.cache.expiry.ms";
|
||||||
|
long STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5*MINUTE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** dfs.client.short.circuit configuration properties */
|
||||||
|
interface ShortCircuit {
|
||||||
|
String PREFIX = Read.PREFIX + "short.circuit.";
|
||||||
|
|
||||||
|
String REPLICA_STALE_THRESHOLD_MS_KEY = PREFIX + "replica.stale.threshold.ms";
|
||||||
|
long REPLICA_STALE_THRESHOLD_MS_DEFAULT = 30*MINUTE;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** dfs.client.mmap configuration properties */
|
||||||
|
interface Mmap {
|
||||||
|
String PREFIX = HdfsClientConfigKeys.PREFIX + "mmap.";
|
||||||
|
|
||||||
|
String ENABLED_KEY = PREFIX + "enabled";
|
||||||
|
boolean ENABLED_DEFAULT = true;
|
||||||
|
String CACHE_SIZE_KEY = PREFIX + "cache.size";
|
||||||
|
int CACHE_SIZE_DEFAULT = 256;
|
||||||
|
String CACHE_TIMEOUT_MS_KEY = PREFIX + "cache.timeout.ms";
|
||||||
|
long CACHE_TIMEOUT_MS_DEFAULT = 60*MINUTE;
|
||||||
|
String RETRY_TIMEOUT_MS_KEY = PREFIX + "retry.timeout.ms";
|
||||||
|
long RETRY_TIMEOUT_MS_DEFAULT = 5*MINUTE;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** dfs.client.hedged.read configuration properties */
|
||||||
|
interface HedgedRead {
|
||||||
|
String THRESHOLD_MILLIS_KEY = PREFIX + "threshold.millis";
|
||||||
|
long THRESHOLD_MILLIS_DEFAULT = 500;
|
||||||
|
String THREADPOOL_SIZE_KEY = PREFIX + "threadpool.size";
|
||||||
|
int THREADPOOL_SIZE_DEFAULT = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** dfs.http.client configuration properties */
|
||||||
interface HttpClient {
|
interface HttpClient {
|
||||||
String PREFIX = "dfs.http.client.";
|
String PREFIX = "dfs.http.client.";
|
||||||
|
|
||||||
|
|
|
@ -438,6 +438,10 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-8144. Split TestLazyPersistFiles into multiple tests. (Arpit Agarwal)
|
HDFS-8144. Split TestLazyPersistFiles into multiple tests. (Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-8082. Move dfs.client.read.*, dfs.client.short.circuit.*,
|
||||||
|
dfs.client.mmap.* and dfs.client.hedged.read.* conf from DFSConfigKeys
|
||||||
|
to HdfsClientConfigKeys. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||||
|
@ -307,7 +308,7 @@ class BlockReaderLocalLegacy implements BlockReader {
|
||||||
throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
|
throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
|
||||||
"buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
|
"buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
|
||||||
"a single chunk (" + bytesPerChecksum + "). Please configure " +
|
"a single chunk (" + bytesPerChecksum + "). Please configure " +
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY +
|
HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY +
|
||||||
" appropriately");
|
" appropriately");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -232,7 +232,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
private final CachingStrategy defaultReadCachingStrategy;
|
private final CachingStrategy defaultReadCachingStrategy;
|
||||||
private final CachingStrategy defaultWriteCachingStrategy;
|
private final CachingStrategy defaultWriteCachingStrategy;
|
||||||
private final ClientContext clientContext;
|
private final ClientContext clientContext;
|
||||||
private volatile long hedgedReadThresholdMillis;
|
|
||||||
private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
|
private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
|
||||||
new DFSHedgedReadMetrics();
|
new DFSHedgedReadMetrics();
|
||||||
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
|
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
|
||||||
|
@ -367,14 +367,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
this.clientContext = ClientContext.get(
|
this.clientContext = ClientContext.get(
|
||||||
conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
|
conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
|
||||||
dfsClientConf);
|
dfsClientConf);
|
||||||
this.hedgedReadThresholdMillis = conf.getLong(
|
|
||||||
DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
|
if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
|
||||||
DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS);
|
this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());
|
||||||
int numThreads = conf.getInt(
|
|
||||||
DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
|
|
||||||
DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
|
|
||||||
if (numThreads > 0) {
|
|
||||||
this.initThreadsNumForHedgedReads(numThreads);
|
|
||||||
}
|
}
|
||||||
this.saslClient = new SaslDataTransferClient(
|
this.saslClient = new SaslDataTransferClient(
|
||||||
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
||||||
|
@ -3133,15 +3128,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
long getHedgedReadTimeout() {
|
|
||||||
return this.hedgedReadThresholdMillis;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
void setHedgedReadTimeout(long timeoutMillis) {
|
|
||||||
this.hedgedReadThresholdMillis = timeoutMillis;
|
|
||||||
}
|
|
||||||
|
|
||||||
ThreadPoolExecutor getHedgedReadsThreadPool() {
|
ThreadPoolExecutor getHedgedReadsThreadPool() {
|
||||||
return HEDGED_READ_THREAD_POOL;
|
return HEDGED_READ_THREAD_POOL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -688,7 +688,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// client retry confs are moved to HdfsClientConfigKeys.Retry
|
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
|
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
|
||||||
= HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY;
|
= HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY;
|
||||||
|
@ -726,7 +726,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final int DFS_CLIENT_RETRY_WINDOW_BASE_DEFAULT
|
public static final int DFS_CLIENT_RETRY_WINDOW_BASE_DEFAULT
|
||||||
= HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT;
|
= HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT;
|
||||||
|
|
||||||
// client failover confs are moved to HdfsClientConfigKeys.Failover
|
// dfs.client.failover confs are moved to HdfsClientConfigKeys.Failover
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX
|
public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX
|
||||||
= HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX;
|
= HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX;
|
||||||
|
@ -761,7 +761,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT
|
public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT
|
||||||
= HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT;
|
= HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT;
|
||||||
|
|
||||||
// client write confs are moved to HdfsClientConfigKeys.Write
|
// dfs.client.write confs are moved to HdfsClientConfigKeys.Write
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static final String DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY
|
public static final String DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY
|
||||||
= HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_KEY;
|
= HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_KEY;
|
||||||
|
@ -799,7 +799,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final long DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT
|
public static final long DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT
|
||||||
= HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT;
|
= HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT;
|
||||||
|
|
||||||
// client block.write confs are moved to HdfsClientConfigKeys.BlockWrite
|
// dfs.client.block.write confs are moved to HdfsClientConfigKeys.BlockWrite
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static final String DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY
|
public static final String DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY
|
||||||
= HdfsClientConfigKeys.BlockWrite.RETRIES_KEY;
|
= HdfsClientConfigKeys.BlockWrite.RETRIES_KEY;
|
||||||
|
@ -837,6 +837,88 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT
|
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT
|
||||||
= HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT;
|
= HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT;
|
||||||
|
|
||||||
|
// dfs.client.read confs are moved to HdfsClientConfigKeys.Read
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY
|
||||||
|
= HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY
|
||||||
|
= HdfsClientConfigKeys.Read.ShortCircuit.KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY
|
||||||
|
= HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY
|
||||||
|
= HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY
|
||||||
|
= HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY
|
||||||
|
= HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT;
|
||||||
|
|
||||||
|
// dfs.client.mmap confs are moved to HdfsClientConfigKeys.Mmap
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_MMAP_ENABLED
|
||||||
|
= HdfsClientConfigKeys.Mmap.ENABLED_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final boolean DFS_CLIENT_MMAP_ENABLED_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Mmap.ENABLED_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_MMAP_CACHE_SIZE
|
||||||
|
= HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Mmap.CACHE_SIZE_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS
|
||||||
|
= HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS
|
||||||
|
= HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final long DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_DEFAULT;
|
||||||
|
|
||||||
|
// dfs.client.short.circuit confs are moved to HdfsClientConfigKeys.ShortCircuit
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS
|
||||||
|
= HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final long DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT;
|
||||||
|
|
||||||
|
// dfs.client.hedged.read confs are moved to HdfsClientConfigKeys.HedgedRead
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS
|
||||||
|
= HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final long DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS
|
||||||
|
= HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE
|
||||||
|
= HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE
|
||||||
|
= HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -883,30 +965,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
|
|
||||||
public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
|
public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
|
||||||
|
|
||||||
public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size";
|
|
||||||
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
|
|
||||||
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
|
|
||||||
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
|
|
||||||
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
|
|
||||||
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
|
|
||||||
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
|
|
||||||
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY = "dfs.client.read.shortcircuit.streams.cache.size";
|
|
||||||
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 256;
|
|
||||||
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY = "dfs.client.read.shortcircuit.streams.cache.expiry.ms";
|
|
||||||
public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5 * 60 * 1000;
|
|
||||||
|
|
||||||
public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
|
public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
|
||||||
public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
|
public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
|
||||||
public static final String DFS_CLIENT_MMAP_ENABLED= "dfs.client.mmap.enabled";
|
|
||||||
public static final boolean DFS_CLIENT_MMAP_ENABLED_DEFAULT = true;
|
|
||||||
public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
|
|
||||||
public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 256;
|
|
||||||
public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";
|
|
||||||
public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT = 60 * 60 * 1000;
|
|
||||||
public static final String DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS = "dfs.client.mmap.retry.timeout.ms";
|
|
||||||
public static final long DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000;
|
|
||||||
public static final String DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS = "dfs.client.short.circuit.replica.stale.threshold.ms";
|
|
||||||
public static final long DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT = 30 * 60 * 1000;
|
|
||||||
|
|
||||||
// The number of NN response dropped by client proactively in each RPC call.
|
// The number of NN response dropped by client proactively in each RPC call.
|
||||||
// For testing NN retry cache, we can set this property with positive value.
|
// For testing NN retry cache, we can set this property with positive value.
|
||||||
|
@ -919,14 +980,4 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.client.key.provider.cache.expiry";
|
"dfs.client.key.provider.cache.expiry";
|
||||||
public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
|
public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
|
||||||
TimeUnit.DAYS.toMillis(10); // 10 days
|
TimeUnit.DAYS.toMillis(10); // 10 days
|
||||||
|
|
||||||
// hedged read properties
|
|
||||||
public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
|
|
||||||
"dfs.client.hedged.read.threshold.millis";
|
|
||||||
public static final long DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
|
|
||||||
500;
|
|
||||||
|
|
||||||
public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE =
|
|
||||||
"dfs.client.hedged.read.threadpool.size";
|
|
||||||
public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1196,6 +1196,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
long end, byte[] buf, int offset,
|
long end, byte[] buf, int offset,
|
||||||
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
final DfsClientConf conf = dfsClient.getConf();
|
||||||
ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
|
ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
|
||||||
CompletionService<ByteBuffer> hedgedService =
|
CompletionService<ByteBuffer> hedgedService =
|
||||||
new ExecutorCompletionService<ByteBuffer>(
|
new ExecutorCompletionService<ByteBuffer>(
|
||||||
|
@ -1223,13 +1224,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
futures.add(firstRequest);
|
futures.add(firstRequest);
|
||||||
try {
|
try {
|
||||||
Future<ByteBuffer> future = hedgedService.poll(
|
Future<ByteBuffer> future = hedgedService.poll(
|
||||||
dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
|
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
|
||||||
if (future != null) {
|
if (future != null) {
|
||||||
future.get();
|
future.get();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout()
|
DFSClient.LOG.debug("Waited " + conf.getHedgedReadThresholdMillis()
|
||||||
+ "ms to read from " + chosenNode.info
|
+ "ms to read from " + chosenNode.info
|
||||||
+ "; spawning hedged read");
|
+ "; spawning hedged read");
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds deprecated keys into the configuration.
|
* Adds deprecated keys into the configuration.
|
||||||
|
@ -85,7 +85,7 @@ public class HdfsConfiguration extends Configuration {
|
||||||
new DeprecationDelta("dfs.name.edits.dir",
|
new DeprecationDelta("dfs.name.edits.dir",
|
||||||
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY),
|
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY),
|
||||||
new DeprecationDelta("dfs.read.prefetch.size",
|
new DeprecationDelta("dfs.read.prefetch.size",
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY),
|
HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY),
|
||||||
new DeprecationDelta("dfs.safemode.extension",
|
new DeprecationDelta("dfs.safemode.extension",
|
||||||
DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY),
|
DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY),
|
||||||
new DeprecationDelta("dfs.safemode.threshold.pct",
|
new DeprecationDelta("dfs.safemode.threshold.pct",
|
||||||
|
|
|
@ -27,7 +27,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_T
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
|
||||||
|
@ -99,6 +98,9 @@ public class DfsClientConf {
|
||||||
|
|
||||||
private final ShortCircuitConf shortCircuitConf;
|
private final ShortCircuitConf shortCircuitConf;
|
||||||
|
|
||||||
|
private final long hedgedReadThresholdMillis;
|
||||||
|
private final int hedgedReadThreadpoolSize;
|
||||||
|
|
||||||
public DfsClientConf(Configuration conf) {
|
public DfsClientConf(Configuration conf) {
|
||||||
// The hdfsTimeout is currently the same as the ipc timeout
|
// The hdfsTimeout is currently the same as the ipc timeout
|
||||||
hdfsTimeout = Client.getTimeout(conf);
|
hdfsTimeout = Client.getTimeout(conf);
|
||||||
|
@ -172,7 +174,7 @@ public class DfsClientConf {
|
||||||
excludedNodesCacheExpiry = conf.getLong(
|
excludedNodesCacheExpiry = conf.getLong(
|
||||||
HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY,
|
HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY,
|
||||||
HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
|
HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
|
||||||
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
|
prefetchSize = conf.getLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
|
||||||
10 * defaultBlockSize);
|
10 * defaultBlockSize);
|
||||||
numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
|
numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
|
||||||
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
|
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
|
||||||
|
@ -206,6 +208,13 @@ public class DfsClientConf {
|
||||||
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
|
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
|
||||||
|
|
||||||
shortCircuitConf = new ShortCircuitConf(conf);
|
shortCircuitConf = new ShortCircuitConf(conf);
|
||||||
|
|
||||||
|
hedgedReadThresholdMillis = conf.getLong(
|
||||||
|
HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY,
|
||||||
|
HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_DEFAULT);
|
||||||
|
hedgedReadThreadpoolSize = conf.getInt(
|
||||||
|
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
|
||||||
|
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private DataChecksum.Type getChecksumType(Configuration conf) {
|
private DataChecksum.Type getChecksumType(Configuration conf) {
|
||||||
|
@ -468,6 +477,20 @@ public class DfsClientConf {
|
||||||
return slowIoWarningThresholdMs;
|
return slowIoWarningThresholdMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the hedgedReadThresholdMillis
|
||||||
|
*/
|
||||||
|
public long getHedgedReadThresholdMillis() {
|
||||||
|
return hedgedReadThresholdMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the hedgedReadThreadpoolSize
|
||||||
|
*/
|
||||||
|
public int getHedgedReadThreadpoolSize() {
|
||||||
|
return hedgedReadThreadpoolSize;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the shortCircuitConf
|
* @return the shortCircuitConf
|
||||||
*/
|
*/
|
||||||
|
@ -520,8 +543,8 @@ public class DfsClientConf {
|
||||||
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
|
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
|
||||||
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
|
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
|
||||||
shortCircuitLocalReads = conf.getBoolean(
|
shortCircuitLocalReads = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
|
HdfsClientConfigKeys.Read.ShortCircuit.KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
|
HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT);
|
||||||
domainSocketDataTraffic = conf.getBoolean(
|
domainSocketDataTraffic = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
||||||
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
|
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
|
||||||
|
@ -532,7 +555,7 @@ public class DfsClientConf {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
|
LOG.debug(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
|
||||||
+ " = " + useLegacyBlockReaderLocal);
|
+ " = " + useLegacyBlockReaderLocal);
|
||||||
LOG.debug(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY
|
LOG.debug(HdfsClientConfigKeys.Read.ShortCircuit.KEY
|
||||||
+ " = " + shortCircuitLocalReads);
|
+ " = " + shortCircuitLocalReads);
|
||||||
LOG.debug(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
|
LOG.debug(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
|
||||||
+ " = " + domainSocketDataTraffic);
|
+ " = " + domainSocketDataTraffic);
|
||||||
|
@ -541,32 +564,32 @@ public class DfsClientConf {
|
||||||
}
|
}
|
||||||
|
|
||||||
skipShortCircuitChecksums = conf.getBoolean(
|
skipShortCircuitChecksums = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
|
HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
|
HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT);
|
||||||
shortCircuitBufferSize = conf.getInt(
|
shortCircuitBufferSize = conf.getInt(
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
|
HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
|
HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_DEFAULT);
|
||||||
shortCircuitStreamsCacheSize = conf.getInt(
|
shortCircuitStreamsCacheSize = conf.getInt(
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
|
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT);
|
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT);
|
||||||
shortCircuitStreamsCacheExpiryMs = conf.getLong(
|
shortCircuitStreamsCacheExpiryMs = conf.getLong(
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
|
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
|
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT);
|
||||||
shortCircuitMmapEnabled = conf.getBoolean(
|
shortCircuitMmapEnabled = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED,
|
HdfsClientConfigKeys.Mmap.ENABLED_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT);
|
HdfsClientConfigKeys.Mmap.ENABLED_DEFAULT);
|
||||||
shortCircuitMmapCacheSize = conf.getInt(
|
shortCircuitMmapCacheSize = conf.getInt(
|
||||||
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
|
HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
|
HdfsClientConfigKeys.Mmap.CACHE_SIZE_DEFAULT);
|
||||||
shortCircuitMmapCacheExpiryMs = conf.getLong(
|
shortCircuitMmapCacheExpiryMs = conf.getLong(
|
||||||
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
|
HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
|
HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_DEFAULT);
|
||||||
shortCircuitMmapCacheRetryTimeout = conf.getLong(
|
shortCircuitMmapCacheRetryTimeout = conf.getLong(
|
||||||
DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
|
HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT);
|
HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_DEFAULT);
|
||||||
shortCircuitCacheStaleThresholdMs = conf.getLong(
|
shortCircuitCacheStaleThresholdMs = conf.getLong(
|
||||||
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
|
HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
|
HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT);
|
||||||
shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
|
shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
|
||||||
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
|
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
|
||||||
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
|
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
|
||||||
|
|
|
@ -38,8 +38,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
|
||||||
|
@ -87,10 +87,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import com.google.common.cache.CacheBuilder;
|
|
||||||
import com.google.common.cache.CacheLoader;
|
|
||||||
import com.google.common.cache.LoadingCache;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -102,13 +98,14 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalFileSystem;
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.net.DomainPeerServer;
|
import org.apache.hadoop.hdfs.net.DomainPeerServer;
|
||||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
@ -142,8 +139,8 @@ import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||||
|
@ -186,7 +183,10 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.tracing.SpanReceiverHost;
|
||||||
|
import org.apache.hadoop.tracing.SpanReceiverInfo;
|
||||||
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
|
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
|
||||||
|
import org.apache.hadoop.tracing.TraceAdminProtocol;
|
||||||
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
|
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
|
||||||
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
@ -198,14 +198,15 @@ import org.apache.hadoop.util.ServicePlugin;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.apache.hadoop.tracing.SpanReceiverHost;
|
|
||||||
import org.apache.hadoop.tracing.SpanReceiverInfo;
|
|
||||||
import org.apache.hadoop.tracing.TraceAdminProtocol;
|
|
||||||
import org.mortbay.util.ajax.JSON;
|
import org.mortbay.util.ajax.JSON;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
import com.google.common.cache.CacheLoader;
|
||||||
|
import com.google.common.cache.LoadingCache;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.google.protobuf.BlockingService;
|
import com.google.protobuf.BlockingService;
|
||||||
|
|
||||||
/**********************************************************
|
/**********************************************************
|
||||||
|
@ -410,8 +411,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
|
conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
|
||||||
|
|
||||||
// Determine whether we should try to pass file descriptors to clients.
|
// Determine whether we should try to pass file descriptors to clients.
|
||||||
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
|
if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
|
HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT)) {
|
||||||
String reason = DomainSocket.getLoadingFailureReason();
|
String reason = DomainSocket.getLoadingFailureReason();
|
||||||
if (reason != null) {
|
if (reason != null) {
|
||||||
LOG.warn("File descriptor passing is disabled because " + reason);
|
LOG.warn("File descriptor passing is disabled because " + reason);
|
||||||
|
@ -927,8 +928,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
this.dataXceiverServer = new Daemon(threadGroup, xserver);
|
this.dataXceiverServer = new Daemon(threadGroup, xserver);
|
||||||
this.threadGroup.setDaemon(true); // auto destroy when empty
|
this.threadGroup.setDaemon(true); // auto destroy when empty
|
||||||
|
|
||||||
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
|
if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
|
HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) ||
|
||||||
conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
||||||
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
|
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
|
||||||
DomainPeerServer domainPeerServer =
|
DomainPeerServer domainPeerServer =
|
||||||
|
@ -943,14 +944,14 @@ public class DataNode extends ReconfigurableBase
|
||||||
this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
|
this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
static DomainPeerServer getDomainPeerServer(Configuration conf,
|
private static DomainPeerServer getDomainPeerServer(Configuration conf,
|
||||||
int port) throws IOException {
|
int port) throws IOException {
|
||||||
String domainSocketPath =
|
String domainSocketPath =
|
||||||
conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
|
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
|
||||||
if (domainSocketPath.isEmpty()) {
|
if (domainSocketPath.isEmpty()) {
|
||||||
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
|
if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) &&
|
HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) &&
|
||||||
(!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
|
(!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
|
||||||
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
|
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
|
||||||
LOG.warn("Although short-circuit local reads are configured, " +
|
LOG.warn("Although short-circuit local reads are configured, " +
|
||||||
|
|
|
@ -37,8 +37,6 @@ import org.apache.commons.lang.mutable.MutableBoolean;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
||||||
import org.apache.hadoop.hdfs.net.DomainPeer;
|
import org.apache.hadoop.hdfs.net.DomainPeer;
|
||||||
|
@ -339,27 +337,6 @@ public class ShortCircuitCache implements Closeable {
|
||||||
*/
|
*/
|
||||||
private final DfsClientShmManager shmManager;
|
private final DfsClientShmManager shmManager;
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a {@link ShortCircuitCache} object from a {@link Configuration}
|
|
||||||
*/
|
|
||||||
public static ShortCircuitCache fromConf(Configuration conf) {
|
|
||||||
return new ShortCircuitCache(
|
|
||||||
conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
|
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
|
|
||||||
conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
|
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT),
|
|
||||||
conf.getInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
|
|
||||||
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT),
|
|
||||||
conf.getLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
|
|
||||||
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT),
|
|
||||||
conf.getLong(DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
|
|
||||||
DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT),
|
|
||||||
conf.getLong(DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
|
|
||||||
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT),
|
|
||||||
conf.getInt(DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
|
|
||||||
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
|
public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
|
||||||
return new ShortCircuitCache(
|
return new ShortCircuitCache(
|
||||||
conf.getShortCircuitStreamsCacheSize(),
|
conf.getShortCircuitStreamsCacheSize(),
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
package org.apache.hadoop.fs;
|
package org.apache.hadoop.fs;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
|
||||||
|
@ -48,6 +46,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||||
|
@ -120,15 +119,15 @@ public class TestEnhancedByteBufferAccess {
|
||||||
Assume.assumeTrue(NativeIO.isAvailable());
|
Assume.assumeTrue(NativeIO.isAvailable());
|
||||||
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
|
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
|
conf.setInt(HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY, 3);
|
||||||
conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
|
conf.setLong(HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_KEY, 100);
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
new File(sockDir.getDir(),
|
new File(sockDir.getDir(),
|
||||||
"TestRequestMmapAccess._PORT.sock").getAbsolutePath());
|
"TestRequestMmapAccess._PORT.sock").getAbsolutePath());
|
||||||
conf.setBoolean(DFSConfigKeys.
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
|
true);
|
||||||
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
|
conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
|
||||||
conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
|
conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
|
||||||
|
@ -597,8 +596,8 @@ public class TestEnhancedByteBufferAccess {
|
||||||
final Path TEST_PATH = new Path("/a");
|
final Path TEST_PATH = new Path("/a");
|
||||||
final int RANDOM_SEED = 23453;
|
final int RANDOM_SEED = 23453;
|
||||||
HdfsConfiguration conf = initZeroCopyTest();
|
HdfsConfiguration conf = initZeroCopyTest();
|
||||||
conf.setBoolean(DFSConfigKeys.
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
|
false);
|
||||||
final String CONTEXT = "testZeroCopyReadOfCachedData";
|
final String CONTEXT = "testZeroCopyReadOfCachedData";
|
||||||
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
|
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
|
||||||
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
|
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
|
||||||
|
@ -715,7 +714,7 @@ public class TestEnhancedByteBufferAccess {
|
||||||
@Test
|
@Test
|
||||||
public void testClientMmapDisable() throws Exception {
|
public void testClientMmapDisable() throws Exception {
|
||||||
HdfsConfiguration conf = initZeroCopyTest();
|
HdfsConfiguration conf = initZeroCopyTest();
|
||||||
conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, false);
|
conf.setBoolean(HdfsClientConfigKeys.Mmap.ENABLED_KEY, false);
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
final Path TEST_PATH = new Path("/a");
|
final Path TEST_PATH = new Path("/a");
|
||||||
final int TEST_FILE_LENGTH = 16385;
|
final int TEST_FILE_LENGTH = 16385;
|
||||||
|
@ -726,8 +725,8 @@ public class TestEnhancedByteBufferAccess {
|
||||||
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
|
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// With DFS_CLIENT_MMAP_ENABLED set to false, we should not do memory
|
// With HdfsClientConfigKeys.Mmap.ENABLED_KEY set to false,
|
||||||
// mapped reads.
|
// we should not do memory mapped reads.
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
|
@ -751,9 +750,9 @@ public class TestEnhancedByteBufferAccess {
|
||||||
fs = null;
|
fs = null;
|
||||||
cluster = null;
|
cluster = null;
|
||||||
try {
|
try {
|
||||||
// Now try again with DFS_CLIENT_MMAP_CACHE_SIZE == 0. It should work.
|
// Now try again with HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY == 0.
|
||||||
conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, true);
|
conf.setBoolean(HdfsClientConfigKeys.Mmap.ENABLED_KEY, true);
|
||||||
conf.setInt(DFS_CLIENT_MMAP_CACHE_SIZE, 0);
|
conf.setInt(HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY, 0);
|
||||||
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT + ".1");
|
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT + ".1");
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
|
@ -20,12 +20,12 @@ package org.apache.hadoop.fs;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.PeerCache;
|
import org.apache.hadoop.hdfs.PeerCache;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -47,7 +47,7 @@ public class TestUnbuffer {
|
||||||
|
|
||||||
// Disable short-circuit reads. With short-circuit, we wouldn't hold open a
|
// Disable short-circuit reads. With short-circuit, we wouldn't hold open a
|
||||||
// TCP socket.
|
// TCP socket.
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
|
||||||
|
|
||||||
// Set a really long socket timeout to avoid test timing issues.
|
// Set a really long socket timeout to avoid test timing issues.
|
||||||
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
||||||
|
@ -100,7 +100,7 @@ public class TestUnbuffer {
|
||||||
public void testOpenManyFilesViaTcp() throws Exception {
|
public void testOpenManyFilesViaTcp() throws Exception {
|
||||||
final int NUM_OPENS = 500;
|
final int NUM_OPENS = 500;
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS];
|
FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS];
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -1462,7 +1462,7 @@ public class DFSTestUtil {
|
||||||
|
|
||||||
public Configuration newConfiguration() {
|
public Configuration newConfiguration() {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
new File(sockDir.getDir(),
|
new File(sockDir.getDir(),
|
||||||
testName + "._PORT.sock").getAbsolutePath());
|
testName + "._PORT.sock").getAbsolutePath());
|
||||||
|
|
|
@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
@ -43,6 +41,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
|
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
|
||||||
|
@ -81,8 +80,8 @@ public class TestBlockReaderFactory {
|
||||||
conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
|
conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
|
||||||
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
|
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
|
||||||
testName + "._PORT").getAbsolutePath());
|
testName + "._PORT").getAbsolutePath());
|
||||||
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
false);
|
false);
|
||||||
conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
|
conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
|
||||||
return conf;
|
return conf;
|
||||||
|
@ -108,7 +107,7 @@ public class TestBlockReaderFactory {
|
||||||
"testFallbackFromShortCircuitToUnixDomainTraffic_clientContext");
|
"testFallbackFromShortCircuitToUnixDomainTraffic_clientContext");
|
||||||
clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
|
clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
|
||||||
Configuration serverConf = new Configuration(clientConf);
|
Configuration serverConf = new Configuration(clientConf);
|
||||||
serverConf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
|
serverConf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
|
||||||
|
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
|
new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.fs.ChecksumException;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -123,8 +124,8 @@ public class TestBlockReaderLocal {
|
||||||
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.setBoolean(DFSConfigKeys.
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
|
!checksum);
|
||||||
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
|
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
|
||||||
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
|
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
|
||||||
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
|
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
|
||||||
|
@ -721,10 +722,10 @@ public class TestBlockReaderLocal {
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock").
|
new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock").
|
||||||
getAbsolutePath());
|
getAbsolutePath());
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
DomainSocket.disableBindPathValidation();
|
DomainSocket.disableBindPathValidation();
|
||||||
} else {
|
} else {
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
|
||||||
}
|
}
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
final Path TEST_PATH = new Path("/a");
|
final Path TEST_PATH = new Path("/a");
|
||||||
|
|
|
@ -63,9 +63,9 @@ public class TestBlockReaderLocalLegacy {
|
||||||
new File(socketDir.getDir(), "TestBlockReaderLocalLegacy.%d.sock").
|
new File(socketDir.getDir(), "TestBlockReaderLocalLegacy.%d.sock").
|
||||||
getAbsolutePath());
|
getAbsolutePath());
|
||||||
}
|
}
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
false);
|
false);
|
||||||
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||||
|
|
|
@ -17,9 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
|
||||||
import org.apache.log4j.Level;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
@ -29,7 +27,7 @@ public class TestParallelRead extends TestParallelReadUtil {
|
||||||
// This is a test of the normal (TCP) read path. For this reason, we turn
|
// This is a test of the normal (TCP) read path. For this reason, we turn
|
||||||
// off both short-circuit local reads and UNIX domain socket data traffic.
|
// off both short-circuit local reads and UNIX domain socket data traffic.
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
||||||
false);
|
false);
|
||||||
// dfs.domain.socket.path should be ignored because the previous two keys
|
// dfs.domain.socket.path should be ignored because the previous two keys
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -30,9 +31,9 @@ public class TestParallelShortCircuitLegacyRead extends TestParallelReadUtil {
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "");
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "");
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
|
false);
|
||||||
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||||
DomainSocket.disableBindPathValidation();
|
DomainSocket.disableBindPathValidation();
|
||||||
|
|
|
@ -17,15 +17,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import static org.hamcrest.CoreMatchers.*;
|
|
||||||
|
|
||||||
public class TestParallelShortCircuitRead extends TestParallelReadUtil {
|
public class TestParallelShortCircuitRead extends TestParallelReadUtil {
|
||||||
private static TemporarySocketDirectory sockDir;
|
private static TemporarySocketDirectory sockDir;
|
||||||
|
@ -38,9 +40,9 @@ public class TestParallelShortCircuitRead extends TestParallelReadUtil {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
|
new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
|
false);
|
||||||
DomainSocket.disableBindPathValidation();
|
DomainSocket.disableBindPathValidation();
|
||||||
setupCluster(1, conf);
|
setupCluster(1, conf);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,15 +17,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import static org.hamcrest.CoreMatchers.*;
|
|
||||||
|
|
||||||
public class TestParallelShortCircuitReadNoChecksum extends TestParallelReadUtil {
|
public class TestParallelShortCircuitReadNoChecksum extends TestParallelReadUtil {
|
||||||
private static TemporarySocketDirectory sockDir;
|
private static TemporarySocketDirectory sockDir;
|
||||||
|
@ -38,9 +40,9 @@ public class TestParallelShortCircuitReadNoChecksum extends TestParallelReadUtil
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
|
new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
|
true);
|
||||||
DomainSocket.disableBindPathValidation();
|
DomainSocket.disableBindPathValidation();
|
||||||
setupCluster(1, conf);
|
setupCluster(1, conf);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,14 +17,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import static org.hamcrest.CoreMatchers.*;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class tests short-circuit local reads without any FileInputStream or
|
* This class tests short-circuit local reads without any FileInputStream or
|
||||||
|
@ -41,13 +44,13 @@ public class TestParallelShortCircuitReadUnCached extends TestParallelReadUtil {
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
new File(sockDir.getDir(),
|
new File(sockDir.getDir(),
|
||||||
"TestParallelShortCircuitReadUnCached._PORT.sock").getAbsolutePath());
|
"TestParallelShortCircuitReadUnCached._PORT.sock").getAbsolutePath());
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
// Enabling data transfer encryption should have no effect when using
|
// Enabling data transfer encryption should have no effect when using
|
||||||
// short-circuit local reads. This is a regression test for HDFS-5353.
|
// short-circuit local reads. This is a regression test for HDFS-5353.
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
|
false);
|
||||||
conf.setBoolean(DFSConfigKeys.
|
conf.setBoolean(DFSConfigKeys.
|
||||||
DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
|
DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
|
||||||
// We want to test reading from stale sockets.
|
// We want to test reading from stale sockets.
|
||||||
|
@ -56,8 +59,8 @@ public class TestParallelShortCircuitReadUnCached extends TestParallelReadUtil {
|
||||||
5 * 60 * 1000);
|
5 * 60 * 1000);
|
||||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 32);
|
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 32);
|
||||||
// Avoid using the FileInputStreamCache.
|
// Avoid using the FileInputStreamCache.
|
||||||
conf.setInt(DFSConfigKeys.
|
conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY,
|
||||||
DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, 0);
|
0);
|
||||||
DomainSocket.disableBindPathValidation();
|
DomainSocket.disableBindPathValidation();
|
||||||
DFSInputStream.tcpReadsDisabledForTesting = true;
|
DFSInputStream.tcpReadsDisabledForTesting = true;
|
||||||
setupCluster(1, conf);
|
setupCluster(1, conf);
|
||||||
|
|
|
@ -17,15 +17,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import static org.hamcrest.CoreMatchers.*;
|
|
||||||
|
|
||||||
public class TestParallelUnixDomainRead extends TestParallelReadUtil {
|
public class TestParallelUnixDomainRead extends TestParallelReadUtil {
|
||||||
private static TemporarySocketDirectory sockDir;
|
private static TemporarySocketDirectory sockDir;
|
||||||
|
@ -38,7 +40,7 @@ public class TestParallelUnixDomainRead extends TestParallelReadUtil {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
|
new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
|
||||||
DomainSocket.disableBindPathValidation();
|
DomainSocket.disableBindPathValidation();
|
||||||
setupCluster(1, conf);
|
setupCluster(1, conf);
|
||||||
|
|
|
@ -273,8 +273,8 @@ public class TestPread {
|
||||||
public void testHedgedPreadDFSBasic() throws IOException {
|
public void testHedgedPreadDFSBasic() throws IOException {
|
||||||
isHedgedRead = true;
|
isHedgedRead = true;
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
|
conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 5);
|
||||||
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 1);
|
conf.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, 1);
|
||||||
dfsPreadTest(conf, false, true); // normal pread
|
dfsPreadTest(conf, false, true); // normal pread
|
||||||
dfsPreadTest(conf, true, true); // trigger read code path without
|
dfsPreadTest(conf, true, true); // trigger read code path without
|
||||||
// transferTo.
|
// transferTo.
|
||||||
|
@ -286,9 +286,9 @@ public class TestPread {
|
||||||
int numHedgedReadPoolThreads = 5;
|
int numHedgedReadPoolThreads = 5;
|
||||||
final int hedgedReadTimeoutMillis = 50;
|
final int hedgedReadTimeoutMillis = 50;
|
||||||
|
|
||||||
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
|
conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
|
||||||
numHedgedReadPoolThreads);
|
numHedgedReadPoolThreads);
|
||||||
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
|
conf.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY,
|
||||||
hedgedReadTimeoutMillis);
|
hedgedReadTimeoutMillis);
|
||||||
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
|
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
|
||||||
// Set up the InjectionHandler
|
// Set up the InjectionHandler
|
||||||
|
@ -362,9 +362,9 @@ public class TestPread {
|
||||||
int numHedgedReadPoolThreads = 5;
|
int numHedgedReadPoolThreads = 5;
|
||||||
final int initialHedgedReadTimeoutMillis = 50000;
|
final int initialHedgedReadTimeoutMillis = 50000;
|
||||||
final int fixedSleepIntervalMillis = 50;
|
final int fixedSleepIntervalMillis = 50;
|
||||||
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
|
conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
|
||||||
numHedgedReadPoolThreads);
|
numHedgedReadPoolThreads);
|
||||||
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
|
conf.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY,
|
||||||
initialHedgedReadTimeoutMillis);
|
initialHedgedReadTimeoutMillis);
|
||||||
|
|
||||||
// Set up the InjectionHandler
|
// Set up the InjectionHandler
|
||||||
|
@ -404,7 +404,14 @@ public class TestPread {
|
||||||
* that there were hedged reads. But, none of the reads had to run in the
|
* that there were hedged reads. But, none of the reads had to run in the
|
||||||
* current thread.
|
* current thread.
|
||||||
*/
|
*/
|
||||||
dfsClient.setHedgedReadTimeout(50); // 50ms
|
{
|
||||||
|
Configuration conf2 = new Configuration(cluster.getConfiguration(0));
|
||||||
|
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
|
||||||
|
conf2.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, 50);
|
||||||
|
fileSys.close();
|
||||||
|
fileSys = (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf2);
|
||||||
|
metrics = fileSys.getClient().getHedgedReadMetrics();
|
||||||
|
}
|
||||||
pReadFile(fileSys, file1);
|
pReadFile(fileSys, file1);
|
||||||
// assert that there were hedged reads
|
// assert that there were hedged reads
|
||||||
assertTrue(metrics.getHedgedReadOps() > 0);
|
assertTrue(metrics.getHedgedReadOps() > 0);
|
||||||
|
@ -439,7 +446,7 @@ public class TestPread {
|
||||||
private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum)
|
private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
|
||||||
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
|
conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY, 4096);
|
||||||
// Set short retry timeouts so this test runs faster
|
// Set short retry timeouts so this test runs faster
|
||||||
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
|
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
|
||||||
if (simulatedStorage) {
|
if (simulatedStorage) {
|
||||||
|
|
|
@ -23,8 +23,6 @@ import java.io.File;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -35,6 +33,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
@ -84,7 +83,7 @@ public class TestFsDatasetCacheRevocation {
|
||||||
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
|
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
|
||||||
TestFsDatasetCache.CACHE_CAPACITY);
|
TestFsDatasetCache.CACHE_CAPACITY);
|
||||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
new File(sockDir.getDir(), "sock").getAbsolutePath());
|
new File(sockDir.getDir(), "sock").getAbsolutePath());
|
||||||
return conf;
|
return conf;
|
||||||
|
|
|
@ -17,6 +17,34 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
|
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
||||||
|
import static org.apache.hadoop.fs.StorageType.DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
|
||||||
|
import static org.hamcrest.core.Is.is;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -24,12 +52,13 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
@ -44,25 +73,6 @@ import org.junit.After;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
|
||||||
import static org.apache.hadoop.fs.StorageType.DEFAULT;
|
|
||||||
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
|
||||||
import static org.hamcrest.core.Is.is;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
public abstract class LazyPersistTestCase {
|
public abstract class LazyPersistTestCase {
|
||||||
static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
|
static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
|
||||||
|
|
||||||
|
@ -237,7 +247,7 @@ public abstract class LazyPersistTestCase {
|
||||||
EVICTION_LOW_WATERMARK * BLOCK_SIZE);
|
EVICTION_LOW_WATERMARK * BLOCK_SIZE);
|
||||||
|
|
||||||
if (useSCR) {
|
if (useSCR) {
|
||||||
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
// Do not share a client context across tests.
|
// Do not share a client context across tests.
|
||||||
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
|
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
|
||||||
if (useLegacyBlockReaderLocal) {
|
if (useLegacyBlockReaderLocal) {
|
||||||
|
@ -299,7 +309,7 @@ public abstract class LazyPersistTestCase {
|
||||||
|
|
||||||
if (useSCR)
|
if (useSCR)
|
||||||
{
|
{
|
||||||
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,useSCR);
|
||||||
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
|
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
|
||||||
sockDir = new TemporarySocketDirectory();
|
sockDir = new TemporarySocketDirectory();
|
||||||
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
|
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
|
||||||
|
|
|
@ -20,10 +20,7 @@ package org.apache.hadoop.hdfs.shortcircuit;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY;
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
@ -36,7 +33,6 @@ import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import com.google.common.collect.HashMultimap;
|
|
||||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -45,12 +41,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
|
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
|
||||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.net.DomainPeer;
|
import org.apache.hadoop.hdfs.net.DomainPeer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -61,8 +57,8 @@ import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisito
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
|
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
|
||||||
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
|
@ -76,6 +72,7 @@ import org.junit.Test;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.common.collect.HashMultimap;
|
||||||
|
|
||||||
public class TestShortCircuitCache {
|
public class TestShortCircuitCache {
|
||||||
static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);
|
static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);
|
||||||
|
@ -388,8 +385,8 @@ public class TestShortCircuitCache {
|
||||||
conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
|
conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
|
||||||
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
|
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
|
||||||
testName).getAbsolutePath());
|
testName).getAbsolutePath());
|
||||||
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
false);
|
false);
|
||||||
conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
|
conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
|
||||||
DFSInputStream.tcpReadsDisabledForTesting = true;
|
DFSInputStream.tcpReadsDisabledForTesting = true;
|
||||||
|
@ -541,7 +538,7 @@ public class TestShortCircuitCache {
|
||||||
"testUnlinkingReplicasInFileDescriptorCache", sockDir);
|
"testUnlinkingReplicasInFileDescriptorCache", sockDir);
|
||||||
// We don't want the CacheCleaner to time out short-circuit shared memory
|
// We don't want the CacheCleaner to time out short-circuit shared memory
|
||||||
// segments during the test, so set the timeout really high.
|
// segments during the test, so set the timeout really high.
|
||||||
conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
|
conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
|
||||||
1000000000L);
|
1000000000L);
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
@ -637,7 +634,8 @@ public class TestShortCircuitCache {
|
||||||
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
||||||
Configuration conf = createShortCircuitConf(
|
Configuration conf = createShortCircuitConf(
|
||||||
"testDataXceiverCleansUpSlotsOnFailure", sockDir);
|
"testDataXceiverCleansUpSlotsOnFailure", sockDir);
|
||||||
conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
|
conf.setLong(
|
||||||
|
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
|
||||||
1000000000L);
|
1000000000L);
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.TestBlockReaderLocal;
|
import org.apache.hadoop.hdfs.TestBlockReaderLocal;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -247,8 +248,8 @@ public class TestShortCircuitLocalRead {
|
||||||
int readOffset, String shortCircuitUser, String readingUser,
|
int readOffset, String shortCircuitUser, String readingUser,
|
||||||
boolean legacyShortCircuitFails) throws IOException, InterruptedException {
|
boolean legacyShortCircuitFails) throws IOException, InterruptedException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
ignoreChecksum);
|
ignoreChecksum);
|
||||||
// Set a random client context name so that we don't share a cache with
|
// Set a random client context name so that we don't share a cache with
|
||||||
// other invocations of this function.
|
// other invocations of this function.
|
||||||
|
@ -384,8 +385,8 @@ public class TestShortCircuitLocalRead {
|
||||||
public void testSkipWithVerifyChecksum() throws IOException {
|
public void testSkipWithVerifyChecksum() throws IOException {
|
||||||
int size = blockSize;
|
int size = blockSize;
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
"/tmp/testSkipWithVerifyChecksum._PORT");
|
"/tmp/testSkipWithVerifyChecksum._PORT");
|
||||||
DomainSocket.disableBindPathValidation();
|
DomainSocket.disableBindPathValidation();
|
||||||
|
@ -431,8 +432,8 @@ public class TestShortCircuitLocalRead {
|
||||||
public void testHandleTruncatedBlockFile() throws IOException {
|
public void testHandleTruncatedBlockFile() throws IOException {
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
"/tmp/testHandleTruncatedBlockFile._PORT");
|
"/tmp/testHandleTruncatedBlockFile._PORT");
|
||||||
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
|
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
|
||||||
|
@ -529,10 +530,10 @@ public class TestShortCircuitLocalRead {
|
||||||
|
|
||||||
// Setup create a file
|
// Setup create a file
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, shortcircuit);
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
"/tmp/TestShortCircuitLocalRead._PORT");
|
"/tmp/TestShortCircuitLocalRead._PORT");
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
checksum);
|
checksum);
|
||||||
|
|
||||||
//Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
|
//Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
|
||||||
|
@ -592,7 +593,7 @@ public class TestShortCircuitLocalRead {
|
||||||
int readOffset, boolean shortCircuitFails) throws IOException, InterruptedException {
|
int readOffset, boolean shortCircuitFails) throws IOException, InterruptedException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
||||||
.format(true).build();
|
.format(true).build();
|
||||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.tracing;
|
||||||
|
|
||||||
import static org.junit.Assume.assumeTrue;
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -26,19 +28,17 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
import org.apache.hadoop.util.NativeCodeLoader;
|
import org.apache.hadoop.util.NativeCodeLoader;
|
||||||
import org.apache.htrace.Sampler;
|
import org.apache.htrace.Sampler;
|
||||||
import org.apache.htrace.Span;
|
|
||||||
import org.apache.htrace.Trace;
|
import org.apache.htrace.Trace;
|
||||||
import org.apache.htrace.TraceScope;
|
import org.apache.htrace.TraceScope;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
public class TestTracingShortCircuitLocalRead {
|
public class TestTracingShortCircuitLocalRead {
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
|
@ -67,8 +67,8 @@ public class TestTracingShortCircuitLocalRead {
|
||||||
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY,
|
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY,
|
||||||
TestTracing.SetSpanReceiver.class.getName());
|
TestTracing.SetSpanReceiver.class.getName());
|
||||||
conf.setLong("dfs.blocksize", 100 * 1024);
|
conf.setLong("dfs.blocksize", 100 * 1024);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
"testShortCircuitTraceHooks._PORT");
|
"testShortCircuitTraceHooks._PORT");
|
||||||
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
|
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
|
||||||
|
|
Loading…
Reference in New Issue