HDFS-8803. Move DfsClientConf to hdfs-client. Contributed by Mingliang Liu.

This commit is contained in:
Haohui Mai 2015-08-19 11:28:05 -07:00
parent c0a4cd978a
commit 535b6db74c
61 changed files with 649 additions and 402 deletions

View File

@ -17,7 +17,12 @@
*/
package org.apache.hadoop.hdfs.client;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.concurrent.TimeUnit;
/** Client configuration properties */
@InterfaceAudience.Private
public interface HdfsClientConfigKeys {
long SECOND = 1000L;
long MINUTE = 60 * SECOND;
@ -31,7 +36,7 @@ public interface HdfsClientConfigKeys {
String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
"^(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?)*$";
static final String PREFIX = "dfs.client.";
String PREFIX = "dfs.client.";
String DFS_NAMESERVICES = "dfs.nameservices";
int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
@ -45,6 +50,72 @@ public interface HdfsClientConfigKeys {
int DFS_NAMENODE_RPC_PORT_DEFAULT = 8020;
String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY =
"dfs.namenode.kerberos.principal";
String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY =
"dfs.client.socketcache.capacity";
int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY =
"dfs.client.socketcache.expiryMsec";
long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 3000;
String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
String DFS_CLIENT_CACHE_DROP_BEHIND_WRITES =
"dfs.client.cache.drop.behind.writes";
String DFS_CLIENT_CACHE_DROP_BEHIND_READS =
"dfs.client.cache.drop.behind.reads";
String DFS_CLIENT_CACHE_READAHEAD = "dfs.client.cache.readahead";
String DFS_CLIENT_CACHED_CONN_RETRY_KEY = "dfs.client.cached.conn.retry";
int DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT = 3;
String DFS_CLIENT_CONTEXT = "dfs.client.context";
String DFS_CLIENT_CONTEXT_DEFAULT = "default";
String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS =
"dfs.client.file-block-storage-locations.num-threads";
int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10;
String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS =
"dfs.client.file-block-storage-locations.timeout.millis";
int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT = 1000;
String DFS_CLIENT_USE_LEGACY_BLOCKREADER =
"dfs.client.use.legacy.blockreader";
boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = false;
String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL =
"dfs.client.use.legacy.blockreader.local";
boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false;
String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY =
"dfs.client.datanode-restart.timeout";
long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30;
// Much code in hdfs is not yet updated to use these keys.
// the initial delay (unit is ms) for locateFollowingBlock, the delay time
// will increase exponentially(double) for each retry.
String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY =
"dfs.client.max.block.acquire.failures";
int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY =
"dfs.datanode.socket.write.timeout";
String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC =
"dfs.client.domain.socket.data.traffic";
boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
String DFS_DOMAIN_SOCKET_PATH_KEY = "dfs.domain.socket.path";
String DFS_DOMAIN_SOCKET_PATH_DEFAULT = "";
String DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS =
"dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
60000;
String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
"dfs.client.slow.io.warning.threshold.ms";
long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
String DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS =
"dfs.client.key.provider.cache.expiry";
long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
TimeUnit.DAYS.toMillis(10); // 10 days
String DFS_HDFS_BLOCKS_METADATA_ENABLED =
"dfs.datanode.hdfs-blocks-metadata.enabled";
boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
/** dfs.client.retry configuration properties */
interface Retry {

View File

@ -17,49 +17,73 @@
*/
package org.apache.hadoop.hdfs.client.impl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Mmap;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Read;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
/**
* DFSClient configuration
* DFSClient configuration.
*/
public class DfsClientConf {
private static final Logger LOG = LoggerFactory.getLogger(DfsClientConf
.class);
private final int hdfsTimeout; // timeout value for a DFS operation.
@ -76,7 +100,7 @@ public class DfsClientConf {
private final ByteArrayManager.Conf writeByteArrayManagerConf;
private final int socketTimeout;
private final long excludedNodesCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught */
/** Wait time window (in msec) if BlockMissingException is caught. */
private final int timeWindow;
private final int numCachedConnRetry;
private final int numBlockWriteRetry;
@ -97,137 +121,138 @@ public class DfsClientConf {
private final long slowIoWarningThresholdMs;
private final ShortCircuitConf shortCircuitConf;
private final long hedgedReadThresholdMillis;
private final int hedgedReadThreadpoolSize;
public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
// The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getTimeout(conf);
maxRetryAttempts = conf.getInt(
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
Retry.MAX_ATTEMPTS_KEY,
Retry.MAX_ATTEMPTS_DEFAULT);
timeWindow = conf.getInt(
HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY,
HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT);
Retry.WINDOW_BASE_KEY,
Retry.WINDOW_BASE_DEFAULT);
retryTimesForGetLastBlockLength = conf.getInt(
HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
retryIntervalForGetLastBlockLength = conf.getInt(
HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY,
HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY,
Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
maxFailoverAttempts = conf.getInt(
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
Failover.MAX_ATTEMPTS_KEY,
Failover.MAX_ATTEMPTS_DEFAULT);
failoverSleepBaseMillis = conf.getInt(
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
Failover.SLEEPTIME_BASE_KEY,
Failover.SLEEPTIME_BASE_DEFAULT);
failoverSleepMaxMillis = conf.getInt(
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY,
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT);
Failover.SLEEPTIME_MAX_KEY,
Failover.SLEEPTIME_MAX_DEFAULT);
maxBlockAcquireFailures = conf.getInt(
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
datanodeSocketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsServerConstants.WRITE_TIMEOUT);
datanodeSocketWriteTimeout = conf.getInt(
DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsConstants.WRITE_TIMEOUT);
ioBufferSize = conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
defaultChecksumOpt = getChecksumOptFromConf(conf);
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
HdfsConstants.READ_TIMEOUT);
/** dfs.write.packet.size is an internal config variable */
writePacketSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
writeMaxPackets = conf.getInt(
HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_KEY,
HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);
Write.MAX_PACKETS_IN_FLIGHT_KEY,
Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);
final boolean byteArrayManagerEnabled = conf.getBoolean(
HdfsClientConfigKeys.Write.ByteArrayManager.ENABLED_KEY,
HdfsClientConfigKeys.Write.ByteArrayManager.ENABLED_DEFAULT);
Write.ByteArrayManager.ENABLED_KEY,
Write.ByteArrayManager.ENABLED_DEFAULT);
if (!byteArrayManagerEnabled) {
writeByteArrayManagerConf = null;
} else {
final int countThreshold = conf.getInt(
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
final int countLimit = conf.getInt(
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_LIMIT_KEY,
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
Write.ByteArrayManager.COUNT_LIMIT_KEY,
Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
final long countResetTimePeriodMs = conf.getLong(
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
writeByteArrayManagerConf = new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs);
countThreshold, countLimit, countResetTimePeriodMs);
}
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
DFS_BLOCK_SIZE_DEFAULT);
defaultReplication = (short) conf.getInt(
DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
excludedNodesCacheExpiry = conf.getLong(
HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY,
HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
prefetchSize = conf.getLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY,
Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY,
10 * defaultBlockSize);
numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
numBlockWriteRetry = conf.getInt(
HdfsClientConfigKeys.BlockWrite.RETRIES_KEY,
HdfsClientConfigKeys.BlockWrite.RETRIES_DEFAULT);
BlockWrite.RETRIES_KEY,
BlockWrite.RETRIES_DEFAULT);
numBlockWriteLocateFollowingRetry = conf.getInt(
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
blockWriteLocateFollowingInitialDelayMs = conf.getInt(
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT);
BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT);
uMask = FsPermission.getUMask(conf);
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
hdfsBlocksMetadataEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
fileBlockStorageLocationsNumThreads = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
fileBlockStorageLocationsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
datanodeRestartTimeout = conf.getLong(
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
slowIoWarningThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
shortCircuitConf = new ShortCircuitConf(conf);
hedgedReadThresholdMillis = conf.getLong(
HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY,
HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_DEFAULT);
HedgedRead.THRESHOLD_MILLIS_KEY,
HedgedRead.THRESHOLD_MILLIS_DEFAULT);
hedgedReadThreadpoolSize = conf.getInt(
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
HedgedRead.THREADPOOL_SIZE_KEY,
HedgedRead.THREADPOOL_SIZE_DEFAULT);
}
private DataChecksum.Type getChecksumType(Configuration conf) {
final String checksum = conf.get(
DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
DFS_CHECKSUM_TYPE_KEY,
DFS_CHECKSUM_TYPE_DEFAULT);
try {
return DataChecksum.Type.valueOf(checksum);
} catch(IllegalArgumentException iae) {
DFSClient.LOG.warn("Bad checksum type: " + checksum + ". Using default "
+ DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
LOG.warn("Bad checksum type: {}. Using default {}", checksum,
DFS_CHECKSUM_TYPE_DEFAULT);
return DataChecksum.Type.valueOf(
DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
DFS_CHECKSUM_TYPE_DEFAULT);
}
}
@ -498,8 +523,11 @@ public ShortCircuitConf getShortCircuitConf() {
return shortCircuitConf;
}
/**
* Configuration for short-circuit reads.
*/
public static class ShortCircuitConf {
private static final Log LOG = LogFactory.getLog(ShortCircuitConf.class);
private static final Logger LOG = DfsClientConf.LOG;
private final int socketCacheCapacity;
private final long socketCacheExpiry;
@ -513,9 +541,9 @@ public static class ShortCircuitConf {
private final boolean shortCircuitLocalReads;
private final boolean domainSocketDataTraffic;
private final int shortCircuitStreamsCacheSize;
private final long shortCircuitStreamsCacheExpiryMs;
private final long shortCircuitStreamsCacheExpiryMs;
private final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
private final boolean shortCircuitMmapEnabled;
private final int shortCircuitMmapCacheSize;
private final long shortCircuitMmapCacheExpiryMs;
@ -524,10 +552,6 @@ public static class ShortCircuitConf {
private final long keyProviderCacheExpiryMs;
@VisibleForTesting
public BlockReaderFactory.FailureInjector brfFailureInjector =
new BlockReaderFactory.FailureInjector();
public ShortCircuitConf(Configuration conf) {
socketCacheCapacity = conf.getInt(
DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
@ -537,66 +561,64 @@ public ShortCircuitConf(Configuration conf) {
DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
useLegacyBlockReader = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
DFS_CLIENT_USE_LEGACY_BLOCKREADER,
DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
useLegacyBlockReaderLocal = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
shortCircuitLocalReads = conf.getBoolean(
HdfsClientConfigKeys.Read.ShortCircuit.KEY,
HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT);
Read.ShortCircuit.KEY,
Read.ShortCircuit.DEFAULT);
domainSocketDataTraffic = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
domainSocketPath = conf.getTrimmed(
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
DFS_DOMAIN_SOCKET_PATH_KEY,
DFS_DOMAIN_SOCKET_PATH_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
+ " = " + useLegacyBlockReaderLocal);
LOG.debug(HdfsClientConfigKeys.Read.ShortCircuit.KEY
+ " = " + shortCircuitLocalReads);
LOG.debug(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
+ " = " + domainSocketDataTraffic);
LOG.debug(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
+ " = " + domainSocketPath);
}
LOG.debug(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
+ " = {}", useLegacyBlockReaderLocal);
LOG.debug(Read.ShortCircuit.KEY
+ " = {}", shortCircuitLocalReads);
LOG.debug(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
+ " = {}", domainSocketDataTraffic);
LOG.debug(DFS_DOMAIN_SOCKET_PATH_KEY
+ " = {}", domainSocketPath);
skipShortCircuitChecksums = conf.getBoolean(
HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT);
Read.ShortCircuit.SKIP_CHECKSUM_KEY,
Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT);
shortCircuitBufferSize = conf.getInt(
HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY,
HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_DEFAULT);
Read.ShortCircuit.BUFFER_SIZE_KEY,
Read.ShortCircuit.BUFFER_SIZE_DEFAULT);
shortCircuitStreamsCacheSize = conf.getInt(
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY,
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT);
Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY,
Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT);
shortCircuitStreamsCacheExpiryMs = conf.getLong(
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT);
Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT);
shortCircuitMmapEnabled = conf.getBoolean(
HdfsClientConfigKeys.Mmap.ENABLED_KEY,
HdfsClientConfigKeys.Mmap.ENABLED_DEFAULT);
Mmap.ENABLED_KEY,
Mmap.ENABLED_DEFAULT);
shortCircuitMmapCacheSize = conf.getInt(
HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY,
HdfsClientConfigKeys.Mmap.CACHE_SIZE_DEFAULT);
Mmap.CACHE_SIZE_KEY,
Mmap.CACHE_SIZE_DEFAULT);
shortCircuitMmapCacheExpiryMs = conf.getLong(
HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_KEY,
HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_DEFAULT);
Mmap.CACHE_TIMEOUT_MS_KEY,
Mmap.CACHE_TIMEOUT_MS_DEFAULT);
shortCircuitMmapCacheRetryTimeout = conf.getLong(
HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_KEY,
HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_DEFAULT);
Mmap.RETRY_TIMEOUT_MS_KEY,
Mmap.RETRY_TIMEOUT_MS_DEFAULT);
shortCircuitCacheStaleThresholdMs = conf.getLong(
HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY,
HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT);
ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY,
ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT);
shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
keyProviderCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
}
/**

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client.impl;

View File

@ -78,6 +78,13 @@ public class HdfsConstants {
public static final String CLIENT_NAMENODE_PROTOCOL_NAME =
"org.apache.hadoop.hdfs.protocol.ClientProtocol";
// Timeouts for communicating with DataNode for streaming writes/reads
public static final int READ_TIMEOUT = 60 * 1000;
public static final int READ_TIMEOUT_EXTENSION = 5 * 1000;
public static final int WRITE_TIMEOUT = 8 * 60 * 1000;
//for write pipeline
public static final int WRITE_TIMEOUT_EXTENSION = 5 * 1000;
// SafeMode actions
public enum SafeModeAction {
SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET

View File

@ -22,29 +22,30 @@
import java.util.Map;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manage byte array creation and release.
* Manage byte array creation and release.
*/
@InterfaceAudience.Private
public abstract class ByteArrayManager {
static final Log LOG = LogFactory.getLog(ByteArrayManager.class);
private static final ThreadLocal<StringBuilder> debugMessage = new ThreadLocal<StringBuilder>() {
static final Logger LOG = LoggerFactory.getLogger(ByteArrayManager.class);
private static final ThreadLocal<StringBuilder> DEBUG_MESSAGE =
new ThreadLocal<StringBuilder>() {
protected StringBuilder initialValue() {
return new StringBuilder();
}
};
private static void logDebugMessage() {
final StringBuilder b = debugMessage.get();
LOG.debug(b);
final StringBuilder b = DEBUG_MESSAGE.get();
LOG.debug(b.toString());
b.setLength(0);
}
@ -97,7 +98,7 @@ synchronized long getCount() {
/**
* Increment the counter, and reset it if there is no increment
* for acertain time period.
* for a certain time period.
*
* @return the new count.
*/
@ -112,10 +113,10 @@ synchronized long increment() {
}
/** A map from integers to counters. */
static class CounterMap {
static final class CounterMap {
/** @see ByteArrayManager.Conf#countResetTimePeriodMs */
private final long countResetTimePeriodMs;
private final Map<Integer, Counter> map = new HashMap<Integer, Counter>();
private final Map<Integer, Counter> map = new HashMap<>();
private CounterMap(long countResetTimePeriodMs) {
this.countResetTimePeriodMs = countResetTimePeriodMs;
@ -125,7 +126,8 @@ private CounterMap(long countResetTimePeriodMs) {
* @return the counter for the given key;
* and create a new counter if it does not exist.
*/
synchronized Counter get(final Integer key, final boolean createIfNotExist) {
synchronized Counter get(final Integer key, final boolean
createIfNotExist) {
Counter count = map.get(key);
if (count == null && createIfNotExist) {
count = new Counter(countResetTimePeriodMs);
@ -133,17 +135,13 @@ synchronized Counter get(final Integer key, final boolean createIfNotExist) {
}
return count;
}
synchronized void clear() {
map.clear();
}
}
/** Manage byte arrays with the same fixed length. */
static class FixedLengthManager {
private final int byteArrayLength;
private final int maxAllocated;
private final Queue<byte[]> freeQueue = new LinkedList<byte[]>();
private final Queue<byte[]> freeQueue = new LinkedList<>();
private int numAllocated = 0;
@ -157,31 +155,31 @@ static class FixedLengthManager {
*
* If the number of allocated arrays >= maximum, the current thread is
* blocked until the number of allocated arrays drops to below the maximum.
*
*
* The byte array allocated by this method must be returned for recycling
* via the {@link FixedLengthManager#recycle(byte[])} method.
*/
synchronized byte[] allocate() throws InterruptedException {
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", ").append(this);
DEBUG_MESSAGE.get().append(", ").append(this);
}
for(; numAllocated >= maxAllocated;) {
if (LOG.isDebugEnabled()) {
debugMessage.get().append(": wait ...");
DEBUG_MESSAGE.get().append(": wait ...");
logDebugMessage();
}
wait();
if (LOG.isDebugEnabled()) {
debugMessage.get().append("wake up: ").append(this);
DEBUG_MESSAGE.get().append("wake up: ").append(this);
}
}
numAllocated++;
final byte[] array = freeQueue.poll();
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", recycled? ").append(array != null);
DEBUG_MESSAGE.get().append(", recycled? ").append(array != null);
}
return array != null? array : new byte[byteArrayLength];
}
@ -197,7 +195,7 @@ synchronized int recycle(byte[] array) {
Preconditions.checkNotNull(array);
Preconditions.checkArgument(array.length == byteArrayLength);
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", ").append(this);
DEBUG_MESSAGE.get().append(", ").append(this);
}
notify();
@ -210,7 +208,7 @@ synchronized int recycle(byte[] array) {
if (freeQueue.size() < maxAllocated - numAllocated) {
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", freeQueue.offer");
DEBUG_MESSAGE.get().append(", freeQueue.offer");
}
freeQueue.offer(array);
}
@ -227,7 +225,7 @@ public synchronized String toString() {
/** A map from array lengths to byte array managers. */
static class ManagerMap {
private final int countLimit;
private final Map<Integer, FixedLengthManager> map = new HashMap<Integer, FixedLengthManager>();
private final Map<Integer, FixedLengthManager> map = new HashMap<>();
ManagerMap(int countLimit) {
this.countLimit = countLimit;
@ -243,12 +241,11 @@ synchronized FixedLengthManager get(final Integer arrayLength,
}
return manager;
}
synchronized void clear() {
map.clear();
}
}
/**
* Configuration for ByteArrayManager.
*/
public static class Conf {
/**
* The count threshold for each array length so that a manager is created
@ -265,7 +262,8 @@ public static class Conf {
*/
private final long countResetTimePeriodMs;
public Conf(int countThreshold, int countLimit, long countResetTimePeriodMs) {
public Conf(int countThreshold, int countLimit, long
countResetTimePeriodMs) {
this.countThreshold = countThreshold;
this.countLimit = countLimit;
this.countResetTimePeriodMs = countResetTimePeriodMs;
@ -277,20 +275,20 @@ public Conf(int countThreshold, int countLimit, long countResetTimePeriodMs) {
* the returned array is larger than or equal to the given length.
*
* The current thread may be blocked if some resource is unavailable.
*
*
* The byte array created by this method must be released
* via the {@link ByteArrayManager#release(byte[])} method.
*
* @return a byte array with length larger than or equal to the given length.
*/
public abstract byte[] newByteArray(int size) throws InterruptedException;
/**
* Release the given byte array.
*
*
* The byte array may or may not be created
* by the {@link ByteArrayManager#newByteArray(int)} method.
*
*
* @return the number of free array.
*/
public abstract int release(byte[] array);
@ -307,7 +305,7 @@ static class NewByteArrayWithoutLimit extends ByteArrayManager {
public byte[] newByteArray(int size) throws InterruptedException {
return new byte[size];
}
@Override
public int release(byte[] array) {
return 0;
@ -320,38 +318,41 @@ public int release(byte[] array) {
*/
static class Impl extends ByteArrayManager {
private final Conf conf;
private final CounterMap counters;
private final ManagerMap managers;
Impl(Conf conf) {
this.conf = conf;
this.counters = new CounterMap(conf.countResetTimePeriodMs);
this.managers = new ManagerMap(conf.countLimit);
}
/**
* Allocate a byte array, where the length of the allocated array
* is the least power of two of the given length
* unless the given length is less than {@link #MIN_ARRAY_LENGTH}.
* In such case, the returned array length is equal to {@link #MIN_ARRAY_LENGTH}.
* In such case, the returned array length is equal to {@link
* #MIN_ARRAY_LENGTH}.
*
* If the number of allocated arrays exceeds the capacity,
* the current thread is blocked until
* the number of allocated arrays drops to below the capacity.
*
*
* The byte array allocated by this method must be returned for recycling
* via the {@link Impl#release(byte[])} method.
*
* @return a byte array with length larger than or equal to the given length.
* @return a byte array with length larger than or equal to the given
* length.
*/
@Override
public byte[] newByteArray(final int arrayLength) throws InterruptedException {
public byte[] newByteArray(final int arrayLength)
throws InterruptedException {
Preconditions.checkArgument(arrayLength >= 0);
if (LOG.isDebugEnabled()) {
debugMessage.get().append("allocate(").append(arrayLength).append(")");
DEBUG_MESSAGE.get().append("allocate(").append(arrayLength).append(")");
}
final byte[] array;
if (arrayLength == 0) {
array = EMPTY_BYTE_ARRAY;
@ -361,37 +362,40 @@ public byte[] newByteArray(final int arrayLength) throws InterruptedException {
final long count = counters.get(powerOfTwo, true).increment();
final boolean aboveThreshold = count > conf.countThreshold;
// create a new manager only if the count is above threshold.
final FixedLengthManager manager = managers.get(powerOfTwo, aboveThreshold);
final FixedLengthManager manager =
managers.get(powerOfTwo, aboveThreshold);
if (LOG.isDebugEnabled()) {
debugMessage.get().append(": count=").append(count)
DEBUG_MESSAGE.get().append(": count=").append(count)
.append(aboveThreshold? ", aboveThreshold": ", belowThreshold");
}
array = manager != null? manager.allocate(): new byte[powerOfTwo];
}
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", return byte[").append(array.length).append("]");
DEBUG_MESSAGE.get().append(", return byte[")
.append(array.length).append("]");
logDebugMessage();
}
return array;
}
/**
* Recycle the given byte array.
*
*
* The byte array may or may not be allocated
* by the {@link Impl#newByteArray(int)} method.
*
*
* This is a non-blocking call.
*/
@Override
public int release(final byte[] array) {
Preconditions.checkNotNull(array);
if (LOG.isDebugEnabled()) {
debugMessage.get().append("recycle: array.length=").append(array.length);
DEBUG_MESSAGE.get()
.append("recycle: array.length=").append(array.length);
}
final int freeQueueSize;
if (array.length == 0) {
freeQueueSize = -1;
@ -399,18 +403,18 @@ public int release(final byte[] array) {
final FixedLengthManager manager = managers.get(array.length, false);
freeQueueSize = manager == null? -1: manager.recycle(array);
}
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", freeQueueSize=").append(freeQueueSize);
DEBUG_MESSAGE.get().append(", freeQueueSize=").append(freeQueueSize);
logDebugMessage();
}
return freeQueueSize;
}
CounterMap getCounters() {
return counters;
}
ManagerMap getManagers() {
return managers;
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;

View File

@ -469,6 +469,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8911. NameNode Metric : Add Editlog counters as a JMX metric.
(Anu Engineer via Arpit Agarwal)
HDFS-8803. Move DfsClientConf to hdfs-client. (Mingliang Liu via wheat9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -91,7 +91,7 @@ public boolean getSupportsReceiptVerification() {
/**
* Injects failures into specific operations during unit tests.
*/
private final FailureInjector failureInjector;
private static FailureInjector failureInjector = new FailureInjector();
/**
* The file name, for logging and debugging purposes.
@ -187,7 +187,6 @@ public boolean getSupportsReceiptVerification() {
public BlockReaderFactory(DfsClientConf conf) {
this.conf = conf;
this.failureInjector = conf.getShortCircuitConf().brfFailureInjector;
this.remainingCacheTries = conf.getNumCachedConnRetry();
}
@ -278,6 +277,11 @@ public BlockReaderFactory setConfiguration(
return this;
}
@VisibleForTesting
public static void setFailureInjectorForTesting(FailureInjector injector) {
failureInjector = injector;
}
/**
* Build a BlockReader with the given options.
*

View File

@ -23,6 +23,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
@ -138,8 +139,8 @@ public static ClientContext get(String name, DfsClientConf conf) {
*/
@VisibleForTesting
public static ClientContext getFromConf(Configuration conf) {
return get(conf.get(DFSConfigKeys.DFS_CLIENT_CONTEXT,
DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT),
return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT,
HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT),
new DfsClientConf(conf));
}

View File

@ -18,11 +18,11 @@
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@ -443,12 +443,12 @@ SocketAddress getRandomLocalInterfaceAddr() {
*/
int getDatanodeWriteTimeout(int numNodes) {
final int t = dfsClientConf.getDatanodeSocketWriteTimeout();
return t > 0? t + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
return t > 0? t + HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
}
int getDatanodeReadTimeout(int numNodes) {
final int t = dfsClientConf.getSocketTimeout();
return t > 0? HdfsServerConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
return t > 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
}
@VisibleForTesting

View File

@ -44,14 +44,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_STREAM_BUFFER_SIZE_KEY = "dfs.stream-buffer-size";
public static final int DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
public static final String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
public static final String DFS_BYTES_PER_CHECKSUM_KEY =
HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT =
HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
public static final String DFS_USER_HOME_DIR_PREFIX_KEY = "dfs.user.home.dir.prefix";
public static final String DFS_USER_HOME_DIR_PREFIX_DEFAULT = "/user";
public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
public static final String DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
public static final String DFS_CHECKSUM_TYPE_KEY = HdfsClientConfigKeys
.DFS_CHECKSUM_TYPE_KEY;
public static final String DFS_CHECKSUM_TYPE_DEFAULT =
HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
public static final String DFS_HDFS_BLOCKS_METADATA_ENABLED =
HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED;
public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT =
HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT;
public static final String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT;
@ -489,7 +495,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB
public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction";
public static final float DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT = 0.75f;
public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";
public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY =
HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup";
public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins";
public static final String DFS_WEB_UGI_KEY = "dfs.web.ugi";
@ -500,8 +507,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_USER_NAME_KEY = DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS = "dfs.datanode.shared.file.descriptor.paths";
public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS_DEFAULT = "/dev/shm,/tmp";
public static final String DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
public static final int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000;
public static final String
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS =
HdfsClientConfigKeys
.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
public static final int
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
HdfsClientConfigKeys
.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT;
public static final String DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file";
public static final String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY =
HdfsClientConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
@ -542,8 +555,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
0.6f;
public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
public static final String DFS_DOMAIN_SOCKET_PATH_KEY = "dfs.domain.socket.path";
public static final String DFS_DOMAIN_SOCKET_PATH_DEFAULT = "";
public static final String DFS_DOMAIN_SOCKET_PATH_KEY =
HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
public static final String DFS_DOMAIN_SOCKET_PATH_DEFAULT =
HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT;
public static final String DFS_STORAGE_POLICY_ENABLED_KEY = "dfs.storage.policy.enabled";
public static final boolean DFS_STORAGE_POLICY_ENABLED_DEFAULT = true;
@ -962,64 +977,136 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
= HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY =
HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
@Deprecated
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
@Deprecated
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY =
HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@Deprecated
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY =
HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
@Deprecated
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY =
HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
@Deprecated
public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 3000;
@Deprecated
public static final String DFS_CLIENT_USE_DN_HOSTNAME =
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
@Deprecated
public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_CACHE_DROP_BEHIND_WRITES =
HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
@Deprecated
public static final String DFS_CLIENT_CACHE_DROP_BEHIND_READS =
HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
@Deprecated
public static final String DFS_CLIENT_CACHE_READAHEAD =
HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
@Deprecated
public static final String DFS_CLIENT_CACHED_CONN_RETRY_KEY =
HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
@Deprecated
public static final int DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
public static final String DFS_CLIENT_CACHE_DROP_BEHIND_WRITES = "dfs.client.cache.drop.behind.writes";
public static final String DFS_CLIENT_CACHE_DROP_BEHIND_READS = "dfs.client.cache.drop.behind.reads";
public static final String DFS_CLIENT_CACHE_READAHEAD = "dfs.client.cache.readahead";
public static final String DFS_CLIENT_CACHED_CONN_RETRY_KEY = "dfs.client.cached.conn.retry";
public static final int DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT = 3;
@Deprecated
public static final String DFS_CLIENT_CONTEXT = HdfsClientConfigKeys
.DFS_CLIENT_CONTEXT;
@Deprecated
public static final String DFS_CLIENT_CONTEXT_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
@Deprecated
public static final String
DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS =
HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS;
@Deprecated
public static final int
DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT =
HdfsClientConfigKeys
.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT;
@Deprecated
public static final String
DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS =
HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS;
@Deprecated
public static final int
DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT =
HdfsClientConfigKeys
.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT;
public static final String DFS_CLIENT_CONTEXT = "dfs.client.context";
public static final String DFS_CLIENT_CONTEXT_DEFAULT = "default";
public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";
public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10;
public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS = "dfs.client.file-block-storage-locations.timeout.millis";
public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT = 1000;
public static final String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = "dfs.client.datanode-restart.timeout";
public static final long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30;
@Deprecated
public static final String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY =
HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
@Deprecated
public static final long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.client.https.keystore.resource";
public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml";
public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";
public static final boolean DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;
// Much code in hdfs is not yet updated to use these keys.
// the initial delay (unit is ms) for locateFollowingBlock, the delay time will increase exponentially(double) for each retry.
public static final String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = "dfs.client.max.block.acquire.failures";
public static final int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
@Deprecated
public static final String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY =
HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
@Deprecated
public static final int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADER = "dfs.client.use.legacy.blockreader";
public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = false;
public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL = "dfs.client.use.legacy.blockreader.local";
public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false;
@Deprecated
public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADER =
HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
@Deprecated
public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL =
HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL;
@Deprecated
public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT
= HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT;
public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
@Deprecated
public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC =
HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
@Deprecated
public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT;
// The number of NN response dropped by client proactively in each RPC call.
// For testing NN retry cache, we can set this property with positive value.
public static final String DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY = "dfs.client.test.drop.namenode.response.number";
public static final int DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT = 0;
@Deprecated
public static final String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
"dfs.client.slow.io.warning.threshold.ms";
public static final long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY;
@Deprecated
public static final long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS =
"dfs.client.key.provider.cache.expiry";
HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS;
@Deprecated
public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
TimeUnit.DAYS.toMillis(10); // 10 days
HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
@ -202,7 +203,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
}
if (blockSize % bytesPerChecksum != 0) {
throw new HadoopIllegalArgumentException("Invalid values: "
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+ HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+ ") must divide block size (=" + blockSize + ").");
}
this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();

View File

@ -93,7 +93,7 @@ private static void addDeprecatedKeys() {
new DeprecationDelta("dfs.secondary.http.address",
DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY),
new DeprecationDelta("dfs.socket.timeout",
DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY),
HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY),
new DeprecationDelta("fs.checkpoint.dir",
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY),
new DeprecationDelta("fs.checkpoint.edits.dir",
@ -127,19 +127,19 @@ private static void addDeprecatedKeys() {
new DeprecationDelta("dfs.permissions.supergroup",
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY),
new DeprecationDelta("dfs.write.packet.size",
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY),
HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY),
new DeprecationDelta("dfs.block.size",
DFSConfigKeys.DFS_BLOCK_SIZE_KEY),
new DeprecationDelta("dfs.datanode.max.xcievers",
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
new DeprecationDelta("io.bytes.per.checksum",
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY),
HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY),
new DeprecationDelta("dfs.federation.nameservices",
DFSConfigKeys.DFS_NAMESERVICES),
new DeprecationDelta("dfs.federation.nameservice.id",
DFSConfigKeys.DFS_NAMESERVICE_ID),
new DeprecationDelta("dfs.client.file-block-storage-locations.timeout",
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS),
HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS),
});
}

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -63,7 +64,6 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@ -318,7 +318,7 @@ private void dispatch() {
try {
sock.connect(
NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
HdfsServerConstants.READ_TIMEOUT);
HdfsConstants.READ_TIMEOUT);
sock.setKeepAlive(true);

View File

@ -279,12 +279,6 @@ static public StartupOption getEnum(String value) {
}
}
// Timeouts for communicating with DataNode for streaming writes/reads
int READ_TIMEOUT = 60 * 1000;
int READ_TIMEOUT_EXTENSION = 5 * 1000;
int WRITE_TIMEOUT = 8 * 60 * 1000;
int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
/**
* Defines the NameNode role.
*/

View File

@ -27,11 +27,11 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
@ -55,9 +55,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.security.SaslPropertiesResolver;
/**
@ -107,9 +108,9 @@ public class DNConf {
public DNConf(Configuration conf) {
this.conf = conf;
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
HdfsConstants.READ_TIMEOUT);
socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsServerConstants.WRITE_TIMEOUT);
HdfsConstants.WRITE_TIMEOUT);
socketKeepaliveTimeout = conf.getInt(
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
@ -149,8 +150,8 @@ public DNConf(Configuration conf) {
DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
this.dfsclientSlowIoWarningThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
this.datanodeSlowIoWarningThresholdMs = conf.getLong(
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT);

View File

@ -145,7 +145,6 @@
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@ -921,8 +920,8 @@ private void initDataXceiver(Configuration conf) throws IOException {
if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) ||
conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
conf.getBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
DomainPeerServer domainPeerServer =
getDomainPeerServer(conf, streamingAddr.getPort());
if (domainPeerServer != null) {
@ -943,8 +942,8 @@ private static DomainPeerServer getDomainPeerServer(Configuration conf,
if (domainSocketPath.isEmpty()) {
if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) &&
(!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
(!conf.getBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
LOG.warn("Although short-circuit local reads are configured, " +
"they are disabled because you didn't configure " +
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY);
@ -2141,7 +2140,7 @@ public void run() {
}
long writeTimeout = dnConf.socketWriteTimeout +
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
DataEncryptionKeyFactory keyFactory =

View File

@ -72,7 +72,6 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
@ -705,9 +704,9 @@ public void writeBlock(final ExtendedBlock block,
mirrorSock = datanode.newSocket();
try {
int timeoutValue = dnConf.socketTimeout
+ (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
+ (HdfsConstants.READ_TIMEOUT_EXTENSION * targets.length);
int writeTimeout = dnConf.socketWriteTimeout +
(HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);

View File

@ -23,7 +23,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@ -94,7 +94,7 @@ public static SecureResources getSecureResources(Configuration conf)
InetSocketAddress streamingAddr = DataNode.getStreamingAddr(conf);
int socketWriteTimeout = conf.getInt(
DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsServerConstants.WRITE_TIMEOUT);
HdfsConstants.WRITE_TIMEOUT);
ServerSocket ss = (socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket();

View File

@ -24,12 +24,12 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
@ -71,7 +72,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.net.NetUtils;
@ -813,7 +813,7 @@ private void copyBlock(final DFSClient dfs, LocatedBlock lblock,
chosenNode = bestNode(dfs, lblock.getLocations(), deadNodes);
targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr());
} catch (IOException ie) {
if (failures >= DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT) {
if (failures >= HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT) {
throw new IOException("Could not obtain block " + lblock, ie);
}
LOG.info("Could not obtain block from any node: " + ie);
@ -849,8 +849,8 @@ public Peer newConnectedPeer(InetSocketAddress addr,
Peer peer = null;
Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
try {
s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
s.connect(addr, HdfsConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
peer = TcpPeerServer.peerFromSocketAndKey(
dfs.getSaslDataTransferClient(), s, NamenodeFsck.this,
blockToken, datanodeId);

View File

@ -27,7 +27,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.util.PerformanceAdvisory;
@ -112,7 +112,7 @@ public DomainSocketFactory(ShortCircuitConf conf) {
} else {
if (conf.getDomainSocketPath().isEmpty()) {
throw new HadoopIllegalArgumentException(feature + " is enabled but "
+ DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set.");
+ HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set.");
} else if (DomainSocket.getLoadingFailureReason() != null) {
LOG.warn(feature + " cannot be used because "
+ DomainSocket.getLoadingFailureReason());

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.fi.FiTestUtil;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.BlockReceiverAspects;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -226,11 +227,11 @@ private static void setConfiguration() {
conf = new Configuration();
int customPerChecksumSize = 700;
int customBlockSize = customPerChecksumSize * 3;
conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 100);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 100);
conf.setInt(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, customBlockSize / 2);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 0);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, customBlockSize / 2);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 0);
}
private static void initLoggers() {

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.log4j.Level;
@ -52,7 +53,7 @@ public class TestFiDataTransferProtocol {
static {
conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
}
static private FSDataOutputStream createFile(FileSystem fs, Path p

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.log4j.Level;
import org.junit.Assert;
@ -57,8 +58,8 @@ public class TestFiDataTransferProtocol2 {
static {
conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION);
conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, PACKET_SIZE);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, PACKET_SIZE);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
}
static final byte[] bytes = new byte[MAX_N_PACKET * PACKET_SIZE];

View File

@ -337,7 +337,7 @@ public void testZeroCopyMmapCache() throws Exception {
ByteBuffer results[] = { null, null, null, null };
DistributedFileSystem fs = null;
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
conf.set(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
@ -599,7 +599,7 @@ public void testZeroCopyReadOfCachedData() throws Exception {
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
false);
final String CONTEXT = "testZeroCopyReadOfCachedData";
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
conf.set(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH,
(int) NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize()));
@ -722,7 +722,7 @@ public void testClientMmapDisable() throws Exception {
final String CONTEXT = "testClientMmapDisable";
FSDataInputStream fsIn = null;
DistributedFileSystem fs = null;
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
conf.set(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
try {
// With HdfsClientConfigKeys.Mmap.ENABLED_KEY set to false,
@ -753,7 +753,7 @@ public void testClientMmapDisable() throws Exception {
// Now try again with HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY == 0.
conf.setBoolean(HdfsClientConfigKeys.Mmap.ENABLED_KEY, true);
conf.setInt(HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY, 0);
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT + ".1");
conf.set(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT + ".1");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
@ -784,7 +784,7 @@ public void test2GBMmapLimit() throws Exception {
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
final String CONTEXT = "test2GBMmapLimit";
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
conf.set(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
FSDataInputStream fsIn = null, fsIn2 = null;
ByteBuffer buf1 = null, buf2 = null;

View File

@ -20,7 +20,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -42,7 +41,7 @@ public void testUnbufferClosesSockets() throws Exception {
Configuration conf = new Configuration();
// Set a new ClientContext. This way, we will have our own PeerCache,
// rather than sharing one with other unit tests.
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
conf.set(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT,
"testUnbufferClosesSocketsContext");
// Disable short-circuit reads. With short-circuit, we wouldn't hold open a
@ -50,9 +49,9 @@ public void testUnbufferClosesSockets() throws Exception {
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
// Set a really long socket timeout to avoid test timing issues.
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
100000000L);
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
100000000L);
MiniDFSCluster cluster = null;

View File

@ -36,10 +36,10 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
@ -202,8 +202,8 @@ public Peer newConnectedPeer(InetSocketAddress addr,
Socket sock = NetUtils.
getDefaultSocketFactory(fs.getConf()).createSocket();
try {
sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
sock.connect(addr, HdfsConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
peer = TcpPeerServer.peerFromSocket(sock);
} finally {
if (peer == null) {

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -52,9 +53,9 @@ public class FileAppendTest4 {
private static DistributedFileSystem fs;
private static void init(Configuration conf) {
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BYTES_PER_CHECKSUM);
conf.setInt(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BYTES_PER_CHECKSUM);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, PACKET_SIZE);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, PACKET_SIZE);
}
@BeforeClass

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
import static org.hamcrest.CoreMatchers.equalTo;

View File

@ -126,10 +126,10 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
!checksum);
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
conf.setLong(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
test.setConfiguration(conf);
FileInputStream dataIn = null, metaIn = null;
final Path TEST_PATH = new Path("/a");

View File

@ -64,12 +64,12 @@ private static HdfsConfiguration getConfiguration(
getAbsolutePath());
}
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
false);
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
return conf;
@ -164,7 +164,7 @@ public void testBothOldAndNewShortCircuitConfigured() throws Exception {
public void testBlockReaderLocalLegacyWithAppend() throws Exception {
final short REPL_FACTOR = 1;
final HdfsConfiguration conf = getConfiguration(null);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();

View File

@ -169,7 +169,7 @@ public void testPipelineRecoveryForLastBlock() throws IOException {
@Test
public void testPipelineRecoveryOnOOB() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "15");
conf.set(HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "15");
MiniDFSCluster cluster = null;
try {
int numDataNodes = 1;
@ -207,7 +207,7 @@ public void testPipelineRecoveryOnOOB() throws Exception {
@Test
public void testPipelineRecoveryOnRestartFailure() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "5");
conf.set(HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "5");
MiniDFSCluster cluster = null;
try {
int numDataNodes = 2;

View File

@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.junit.Assert;
import org.junit.Test;
@ -89,8 +90,8 @@ public void testReadFromOneDN() throws Exception {
// instances. Also use a really long socket timeout so that nothing
// gets closed before we get around to checking the cache size at the end.
final String contextName = "testReadFromOneDNContext";
configuration.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, contextName);
configuration.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
configuration.set(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, contextName);
configuration.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
100000000L);
BlockReaderTestUtil util = new BlockReaderTestUtil(1, configuration);
final Path testFile = new Path("/testConnCache.dat");

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -177,11 +177,11 @@ public void testWriteTimeoutAtDataNode() throws IOException,
InterruptedException {
final int writeTimeout = 100; //milliseconds.
// set a very short write timeout for datanode, so that tests runs fast.
conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, writeTimeout);
conf.setInt(HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, writeTimeout);
// set a smaller block size
final int blockSize = 10*1024*1024;
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 1);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 1);
// set a small buffer size
final int bufferSize = 4096;
conf.setInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, bufferSize);
@ -638,10 +638,11 @@ private boolean busyTest(int xcievers, int threads, int fileLen, int timeWin, in
short replicationFactor = 1;
long blockSize = 128*1024*1024; // DFS block size
int bufferSize = 4096;
conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, xcievers);
conf.setInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
retries);
conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
xcievers);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
retries);
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, timeWin);
// Disable keepalive
conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 0);

View File

@ -73,7 +73,7 @@ private void testSkipInner(MiniDFSCluster cluster) throws IOException {
@Test(timeout=60000)
public void testSkipWithRemoteBlockReader() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
testSkipInner(cluster);

View File

@ -17,12 +17,12 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
@ -58,7 +59,6 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.io.IOUtils;
@ -108,8 +108,8 @@ private void sendRecvData(String testDescription,
StringUtils.byteToHexString(sendBuf.toByteArray()));
sock = new Socket();
sock.connect(dnAddr, HdfsServerConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
sock.connect(dnAddr, HdfsConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
OutputStream out = sock.getOutputStream();
// Should we excuse

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
@ -287,7 +288,7 @@ private void complexTest() throws IOException {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes).build();
cluster.waitActive();
@ -343,7 +344,7 @@ private void simpleTest(int datanodeToKill) throws IOException {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
int myMaxNodes = 5;
System.out.println("SimpleTest starting with DataNode to Kill " +
datanodeToKill);

View File

@ -23,6 +23,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.junit.Test;
/**
@ -46,7 +47,7 @@ public void testDisableCache() throws Exception {
// Configure a new instance with no peer caching, ensure that it doesn't
// cache anything
confWithoutCache.setInt(
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
BlockReaderTestUtil util = new BlockReaderTestUtil(1, confWithoutCache);
final Path testFile = new Path("/testConnCache.dat");
util.writeFile(testFile, FILE_SIZE / 1024);

View File

@ -572,7 +572,7 @@ public FileSystem run() throws Exception {
final Path dir = new Path("/filechecksum");
final int block_size = 1024;
final int buffer_size = conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
conf.setInt(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
//try different number of blocks
for(int n = 0; n < 5; n++) {
@ -1075,7 +1075,7 @@ public void testListFiles() throws IOException {
public void testDFSClientPeerReadTimeout() throws IOException {
final int timeout = 1000;
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
// only need cluster to create a dfs client to get a peer
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@ -1120,7 +1120,7 @@ public void testGetServerDefaults() throws IOException {
public void testDFSClientPeerWriteTimeout() throws IOException {
final int timeout = 1000;
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
// only need cluster to create a dfs client to get a peer
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

View File

@ -28,13 +28,13 @@
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@ -489,8 +489,8 @@ private void testComplexAppend(boolean appendToNewBlock) throws IOException {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 30000);
conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 30000);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 30000);
conf.setInt(HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 30000);
conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -84,7 +85,7 @@ public void setUp() throws Exception {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
1000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
// handle under-replicated blocks quickly (for replication asserts)
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5);
@ -339,7 +340,7 @@ public void testAppendInsufficientLocations() throws Exception {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
1000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 3000);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 3000);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4)
.build();

View File

@ -20,10 +20,10 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
@ -66,6 +66,7 @@
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -220,7 +221,7 @@ public void checkFileCreation(String netIf, boolean useDnHostname)
if (netIf != null) {
conf.set(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES, netIf);
}
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, useDnHostname);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, useDnHostname);
if (useDnHostname) {
// Since the mini cluster only listens on the loopback we have to
// ensure the hostname used to access DNs maps to the loopback. We

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -386,7 +387,7 @@ public void testPipelineHeartbeat() throws Exception {
final int fileLen = 6;
Configuration conf = new HdfsConfiguration();
final int timeout = 2000;
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
timeout);
final Path p = new Path("/pipelineHeartbeat/foo");

View File

@ -28,7 +28,7 @@ static public void setupCluster() throws Exception {
// off both short-circuit local reads and UNIX domain socket data traffic.
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
false);
// dfs.domain.socket.path should be ignored because the previous two keys
// were set to false. This is a regression test for HDFS-4473.

View File

@ -29,8 +29,8 @@ static public void setupCluster() throws Exception {
DFSInputStream.tcpReadsDisabledForTesting = true;
HdfsConfiguration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "");
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
false);

View File

@ -51,13 +51,13 @@ static public void setupCluster() throws Exception {
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
false);
conf.setBoolean(DFSConfigKeys.
conf.setBoolean(HdfsClientConfigKeys.
DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
// We want to test reading from stale sockets.
conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
5 * 60 * 1000);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 32);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 32);
// Avoid using the FileInputStreamCache.
conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY,
0);

View File

@ -41,7 +41,7 @@ static public void setupCluster() throws Exception {
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
DomainSocket.disableBindPathValidation();
setupCluster(1, conf);
}

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -148,11 +149,11 @@ private static void setConfiguration() {
conf = new Configuration();
int customPerChecksumSize = 700;
int customBlockSize = customPerChecksumSize * 3;
conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 100);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 100);
conf.setInt(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, customBlockSize / 2);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 0);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, customBlockSize / 2);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 0);
}
private static void initLoggers() {

View File

@ -212,7 +212,7 @@ private void datanodeRestartTest(MiniDFSCluster cluster, FileSystem fileSys,
return;
}
int numBlocks = 1;
assertTrue(numBlocks <= DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
assertTrue(numBlocks <= HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
byte[] expected = new byte[numBlocks * blockSize];
Random rand = new Random(seed);
rand.nextBytes(expected);

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@ -58,7 +59,7 @@ public void testEOFWithBlockReaderLocal() throws Exception {
new ShortCircuitTestContext("testEOFWithBlockReaderLocal");
try {
final Configuration conf = testContext.newConfiguration();
conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, BLOCK_SIZE);
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, BLOCK_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build();
testEOF(cluster, 1);
@ -73,7 +74,7 @@ public void testEOFWithBlockReaderLocal() throws Exception {
@Test(timeout=60000)
public void testEOFWithRemoteBlockReader() throws Exception {
final Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, BLOCK_SIZE);
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, BLOCK_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build();
testEOF(cluster, 1);

View File

@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
public class TestRemoteBlockReader extends TestBlockReaderBase {
HdfsConfiguration createConf() {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
return conf;
}
}

View File

@ -50,13 +50,13 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -169,8 +169,8 @@ public Peer newConnectedPeer(InetSocketAddress addr,
Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
try {
sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
sock.connect(addr, HdfsConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
peer = TcpPeerServer.peerFromSocket(sock);
} finally {
if (peer == null) {

View File

@ -45,8 +45,10 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -55,7 +57,6 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@ -106,7 +107,7 @@ public void testBlockReplacement() throws Exception {
final Random r = new Random();
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE/2);
CONF.setInt(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE/2);
CONF.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,500);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(REPLICATION_FACTOR)
.racks(INITIAL_RACKS).build();
@ -325,7 +326,7 @@ private boolean replaceBlock(
Socket sock = new Socket();
try {
sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
HdfsServerConstants.READ_TIMEOUT);
HdfsConstants.READ_TIMEOUT);
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.io.IOUtils;
@ -49,7 +50,7 @@
public class TestCachingStrategy {
private static final Log LOG = LogFactory.getLog(TestCachingStrategy.class);
private static final int MAX_TEST_FILE_LEN = 1024 * 1024;
private static final int WRITE_PACKET_SIZE = DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
private static final int WRITE_PACKET_SIZE = HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
private final static TestRecordingCacheTracker tracker =
new TestRecordingCacheTracker();
@ -259,8 +260,8 @@ public void testClientDefaults() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, false);
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, false);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, true);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS, true);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, true);
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;

View File

@ -54,11 +54,11 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -523,8 +523,8 @@ public Peer newConnectedPeer(InetSocketAddress addr,
Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
try {
sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
sock.connect(addr, HdfsConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
peer = TcpPeerServer.peerFromSocket(sock);
} finally {
if (peer == null) {

View File

@ -264,11 +264,12 @@ protected final void startUpCluster(
if (useSCR) {
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
// Do not share a client context across tests.
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
conf.set(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
if (useLegacyBlockReaderLocal) {
conf.setBoolean(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
conf.setBoolean(
HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
} else {
sockDir = new TemporarySocketDirectory();
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -53,7 +54,7 @@ public class TestDatanodeRestart {
// bring up a cluster of 3
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024L);
conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
@ -77,7 +78,7 @@ public class TestDatanodeRestart {
public void testRbwReplicas() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024L);
conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
try {
@ -146,7 +147,7 @@ private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt)
@Test public void testRecoverReplicas() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024L);
conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 512);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
try {

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.hdfs.shortcircuit;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.hamcrest.CoreMatchers.equalTo;
@ -71,6 +71,7 @@
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -671,8 +672,8 @@ public void testDataXceiverCleansUpSlotsOnFailure() throws Exception {
// The second read should fail, and we should only have 1 segment and 1 slot
// left.
fs.getClient().getConf().getShortCircuitConf().brfFailureInjector =
new TestCleanupFailureInjector();
BlockReaderFactory.setFailureInjectorForTesting(
new TestCleanupFailureInjector());
try {
DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
} catch (Throwable t) {
@ -766,8 +767,8 @@ public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception {
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
fs.getClient().getConf().getShortCircuitConf().brfFailureInjector =
new TestPreReceiptVerificationFailureInjector();
BlockReaderFactory.setFailureInjectorForTesting(
new TestPreReceiptVerificationFailureInjector());
final Path TEST_PATH1 = new Path("/test_file1");
DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2);
final Path TEST_PATH2 = new Path("/test_file2");

View File

@ -253,7 +253,7 @@ public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size,
ignoreChecksum);
// Set a random client context name so that we don't share a cache with
// other invocations of this function.
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
conf.set(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT,
UUID.randomUUID().toString());
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
@ -261,7 +261,7 @@ public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size,
if (shortCircuitUser != null) {
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
shortCircuitUser);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
}
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
@ -592,7 +592,7 @@ public void testReadWithRemoteBlockReader() throws IOException, InterruptedExcep
public void doTestShortCircuitReadWithRemoteBlockReader(boolean ignoreChecksum, int size, String shortCircuitUser,
int readOffset, boolean shortCircuitFails) throws IOException, InterruptedException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)