HDFS-8083. Move dfs.client.write.* conf from DFSConfigKeys to HdfsClientConfigKeys.Write.
This commit is contained in:
parent
944a16579f
commit
7fc50e2525
|
@ -19,6 +19,9 @@ package org.apache.hadoop.hdfs.client;
|
||||||
|
|
||||||
/** Client configuration properties */
|
/** Client configuration properties */
|
||||||
public interface HdfsClientConfigKeys {
|
public interface HdfsClientConfigKeys {
|
||||||
|
long SECOND = 1000L;
|
||||||
|
long MINUTE = 60 * SECOND;
|
||||||
|
|
||||||
String DFS_BLOCK_SIZE_KEY = "dfs.blocksize";
|
String DFS_BLOCK_SIZE_KEY = "dfs.blocksize";
|
||||||
long DFS_BLOCK_SIZE_DEFAULT = 128*1024*1024;
|
long DFS_BLOCK_SIZE_DEFAULT = 128*1024*1024;
|
||||||
String DFS_REPLICATION_KEY = "dfs.replication";
|
String DFS_REPLICATION_KEY = "dfs.replication";
|
||||||
|
@ -65,7 +68,50 @@ public interface HdfsClientConfigKeys {
|
||||||
int CONNECTION_RETRIES_DEFAULT = 0;
|
int CONNECTION_RETRIES_DEFAULT = 0;
|
||||||
String CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = PREFIX + "connection.retries.on.timeouts";
|
String CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = PREFIX + "connection.retries.on.timeouts";
|
||||||
int CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
|
int CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Write {
|
||||||
|
String PREFIX = HdfsClientConfigKeys.PREFIX + "write.";
|
||||||
|
|
||||||
|
String MAX_PACKETS_IN_FLIGHT_KEY = PREFIX + "max-packets-in-flight";
|
||||||
|
int MAX_PACKETS_IN_FLIGHT_DEFAULT = 80;
|
||||||
|
String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY = PREFIX + "exclude.nodes.cache.expiry.interval.millis";
|
||||||
|
long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE;
|
||||||
|
|
||||||
|
interface ByteArrayManager {
|
||||||
|
String PREFIX = Write.PREFIX + "byte-array-manager.";
|
||||||
|
|
||||||
|
String ENABLED_KEY = PREFIX + "enabled";
|
||||||
|
boolean ENABLED_DEFAULT = false;
|
||||||
|
String COUNT_THRESHOLD_KEY = PREFIX + "count-threshold";
|
||||||
|
int COUNT_THRESHOLD_DEFAULT = 128;
|
||||||
|
String COUNT_LIMIT_KEY = PREFIX + "count-limit";
|
||||||
|
int COUNT_LIMIT_DEFAULT = 2048;
|
||||||
|
String COUNT_RESET_TIME_PERIOD_MS_KEY = PREFIX + "count-reset-time-period-ms";
|
||||||
|
long COUNT_RESET_TIME_PERIOD_MS_DEFAULT = 10*SECOND;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface BlockWrite {
|
||||||
|
String PREFIX = HdfsClientConfigKeys.PREFIX + "block.write.";
|
||||||
|
|
||||||
|
String RETRIES_KEY = PREFIX + "retries";
|
||||||
|
int RETRIES_DEFAULT = 3;
|
||||||
|
String LOCATEFOLLOWINGBLOCK_RETRIES_KEY = PREFIX + "locateFollowingBlock.retries";
|
||||||
|
int LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT = 5;
|
||||||
|
String LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY = PREFIX + "locateFollowingBlock.initial.delay.ms";
|
||||||
|
int LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT = 400;
|
||||||
|
|
||||||
|
interface ReplaceDatanodeOnFailure {
|
||||||
|
String PREFIX = BlockWrite.PREFIX + "replace-datanode-on-failure.";
|
||||||
|
|
||||||
|
String ENABLE_KEY = PREFIX + "enable";
|
||||||
|
boolean ENABLE_DEFAULT = true;
|
||||||
|
String POLICY_KEY = PREFIX + "policy";
|
||||||
|
String POLICY_DEFAULT = "DEFAULT";
|
||||||
|
String BEST_EFFORT_KEY = PREFIX + "best-effort";
|
||||||
|
boolean BEST_EFFORT_DEFAULT = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** HTTP client configuration properties */
|
/** HTTP client configuration properties */
|
||||||
|
|
|
@ -39,7 +39,7 @@ abstract class EnumSetParam<E extends Enum<E>> extends Param<EnumSet<E>, EnumSet
|
||||||
}
|
}
|
||||||
|
|
||||||
static <E extends Enum<E>> EnumSet<E> toEnumSet(final Class<E> clazz,
|
static <E extends Enum<E>> EnumSet<E> toEnumSet(final Class<E> clazz,
|
||||||
final E... values) {
|
final E[] values) {
|
||||||
final EnumSet<E> set = EnumSet.noneOf(clazz);
|
final EnumSet<E> set = EnumSet.noneOf(clazz);
|
||||||
set.addAll(Arrays.asList(values));
|
set.addAll(Arrays.asList(values));
|
||||||
return set;
|
return set;
|
||||||
|
|
|
@ -428,6 +428,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-7933. fsck should also report decommissioning replicas.
|
HDFS-7933. fsck should also report decommissioning replicas.
|
||||||
(Xiaoyu Yao via cnauroth)
|
(Xiaoyu Yao via cnauroth)
|
||||||
|
|
||||||
|
HDFS-8083. Move dfs.client.write.* conf from DFSConfigKeys to
|
||||||
|
HdfsClientConfigKeys.Write. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -761,37 +761,87 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT
|
public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT
|
||||||
= HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT;
|
= HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT;
|
||||||
|
|
||||||
|
// client write confs are moved to HdfsClientConfigKeys.Write
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY
|
||||||
|
= HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final int DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL
|
||||||
|
= HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final long DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT; // 10 minutes, in ms
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY
|
||||||
|
= HdfsClientConfigKeys.Write.ByteArrayManager.ENABLED_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final boolean DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Write.ByteArrayManager.ENABLED_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY
|
||||||
|
= HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final int DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY
|
||||||
|
= HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_LIMIT_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final int DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_LIMIT_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY
|
||||||
|
= HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final long DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT;
|
||||||
|
|
||||||
|
// client block.write confs are moved to HdfsClientConfigKeys.BlockWrite
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.RETRIES_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final int DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.RETRIES_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final int DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final int DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_DEFAULT;
|
||||||
|
@Deprecated
|
||||||
|
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY;
|
||||||
|
@Deprecated
|
||||||
|
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT
|
||||||
|
= HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public static final String DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY = "dfs.client.write.max-packets-in-flight";
|
|
||||||
public static final int DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT = 80;
|
|
||||||
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
|
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
|
||||||
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
|
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
|
||||||
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY
|
|
||||||
= "dfs.client.write.byte-array-manager.enabled";
|
|
||||||
public static final boolean DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT
|
|
||||||
= false;
|
|
||||||
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY
|
|
||||||
= "dfs.client.write.byte-array-manager.count-threshold";
|
|
||||||
public static final int DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT
|
|
||||||
= 128;
|
|
||||||
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY
|
|
||||||
= "dfs.client.write.byte-array-manager.count-limit";
|
|
||||||
public static final int DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT
|
|
||||||
= 2048;
|
|
||||||
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY
|
|
||||||
= "dfs.client.write.byte-array-manager.count-reset-time-period-ms";
|
|
||||||
public static final long DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT
|
|
||||||
= 10L * 1000;
|
|
||||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
|
|
||||||
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
|
|
||||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
|
|
||||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
|
|
||||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY = "dfs.client.block.write.replace-datanode-on-failure.best-effort";
|
|
||||||
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT = false;
|
|
||||||
public static final String DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL = "dfs.client.write.exclude.nodes.cache.expiry.interval.millis";
|
|
||||||
public static final long DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10 * 60 * 1000; // 10 minutes, in ms
|
|
||||||
|
|
||||||
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
|
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
|
||||||
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
|
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
|
||||||
|
@ -822,13 +872,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";
|
public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";
|
||||||
public static final boolean DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;
|
public static final boolean DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;
|
||||||
// Much code in hdfs is not yet updated to use these keys.
|
// Much code in hdfs is not yet updated to use these keys.
|
||||||
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
|
|
||||||
public static final int DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT = 5;
|
|
||||||
// the initial delay (unit is ms) for locateFollowingBlock, the delay time will increase exponentially(double) for each retry.
|
// the initial delay (unit is ms) for locateFollowingBlock, the delay time will increase exponentially(double) for each retry.
|
||||||
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY = "dfs.client.block.write.locateFollowingBlock.initial.delay.ms";
|
|
||||||
public static final int DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT = 400;
|
|
||||||
public static final String DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY = "dfs.client.block.write.retries";
|
|
||||||
public static final int DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT = 3;
|
|
||||||
public static final String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = "dfs.client.max.block.acquire.failures";
|
public static final String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = "dfs.client.max.block.acquire.failures";
|
||||||
public static final int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
|
public static final int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
|
||||||
|
|
||||||
|
|
|
@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
|
@ -1029,7 +1030,7 @@ class DataStreamer extends Daemon {
|
||||||
.append("The current failed datanode replacement policy is ")
|
.append("The current failed datanode replacement policy is ")
|
||||||
.append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
|
.append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
|
||||||
.append("a client may configure this via '")
|
.append("a client may configure this via '")
|
||||||
.append(DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY)
|
.append(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY)
|
||||||
.append("' in its configuration.")
|
.append("' in its configuration.")
|
||||||
.toString());
|
.toString());
|
||||||
}
|
}
|
||||||
|
@ -1240,7 +1241,7 @@ class DataStreamer extends Daemon {
|
||||||
}
|
}
|
||||||
DFSClient.LOG.warn("Failed to replace datanode."
|
DFSClient.LOG.warn("Failed to replace datanode."
|
||||||
+ " Continue with the remaining datanodes since "
|
+ " Continue with the remaining datanodes since "
|
||||||
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY
|
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
|
||||||
+ " is set to true.", ioe);
|
+ " is set to true.", ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,12 +21,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
|
||||||
|
@ -41,10 +35,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIR
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||||
|
@ -148,27 +138,28 @@ public class DfsClientConf {
|
||||||
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
||||||
HdfsServerConstants.READ_TIMEOUT);
|
HdfsServerConstants.READ_TIMEOUT);
|
||||||
/** dfs.write.packet.size is an internal config variable */
|
/** dfs.write.packet.size is an internal config variable */
|
||||||
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
writePacketSize = conf.getInt(
|
||||||
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
||||||
writeMaxPackets = conf.getInt(
|
writeMaxPackets = conf.getInt(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY,
|
HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT);
|
HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);
|
||||||
|
|
||||||
final boolean byteArrayManagerEnabled = conf.getBoolean(
|
final boolean byteArrayManagerEnabled = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY,
|
HdfsClientConfigKeys.Write.ByteArrayManager.ENABLED_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT);
|
HdfsClientConfigKeys.Write.ByteArrayManager.ENABLED_DEFAULT);
|
||||||
if (!byteArrayManagerEnabled) {
|
if (!byteArrayManagerEnabled) {
|
||||||
writeByteArrayManagerConf = null;
|
writeByteArrayManagerConf = null;
|
||||||
} else {
|
} else {
|
||||||
final int countThreshold = conf.getInt(
|
final int countThreshold = conf.getInt(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY,
|
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT);
|
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
|
||||||
final int countLimit = conf.getInt(
|
final int countLimit = conf.getInt(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY,
|
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_LIMIT_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT);
|
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
|
||||||
final long countResetTimePeriodMs = conf.getLong(
|
final long countResetTimePeriodMs = conf.getLong(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY,
|
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
|
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
|
||||||
writeByteArrayManagerConf = new ByteArrayManager.Conf(
|
writeByteArrayManagerConf = new ByteArrayManager.Conf(
|
||||||
countThreshold, countLimit, countResetTimePeriodMs);
|
countThreshold, countLimit, countResetTimePeriodMs);
|
||||||
}
|
}
|
||||||
|
@ -179,20 +170,21 @@ public class DfsClientConf {
|
||||||
DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
|
DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
|
||||||
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
|
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
|
||||||
excludedNodesCacheExpiry = conf.getLong(
|
excludedNodesCacheExpiry = conf.getLong(
|
||||||
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
|
HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY,
|
||||||
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
|
HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
|
||||||
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
|
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
|
||||||
10 * defaultBlockSize);
|
10 * defaultBlockSize);
|
||||||
numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
|
numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
|
||||||
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
|
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
|
||||||
numBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
|
numBlockWriteRetry = conf.getInt(
|
||||||
DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
|
HdfsClientConfigKeys.BlockWrite.RETRIES_KEY,
|
||||||
|
HdfsClientConfigKeys.BlockWrite.RETRIES_DEFAULT);
|
||||||
numBlockWriteLocateFollowingRetry = conf.getInt(
|
numBlockWriteLocateFollowingRetry = conf.getInt(
|
||||||
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
|
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
|
||||||
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
|
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
|
||||||
blockWriteLocateFollowingInitialDelayMs = conf.getInt(
|
blockWriteLocateFollowingInitialDelayMs = conf.getInt(
|
||||||
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY,
|
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
|
||||||
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT);
|
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT);
|
||||||
uMask = FsPermission.getUMask(conf);
|
uMask = FsPermission.getUMask(conf);
|
||||||
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
|
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
|
||||||
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
||||||
|
|
|
@ -21,7 +21,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -115,7 +115,7 @@ public class ReplaceDatanodeOnFailure {
|
||||||
if (policy == Policy.DISABLE) {
|
if (policy == Policy.DISABLE) {
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"This feature is disabled. Please refer to "
|
"This feature is disabled. Please refer to "
|
||||||
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
|
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY
|
||||||
+ " configuration property.");
|
+ " configuration property.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -156,23 +156,23 @@ public class ReplaceDatanodeOnFailure {
|
||||||
public static ReplaceDatanodeOnFailure get(final Configuration conf) {
|
public static ReplaceDatanodeOnFailure get(final Configuration conf) {
|
||||||
final Policy policy = getPolicy(conf);
|
final Policy policy = getPolicy(conf);
|
||||||
final boolean bestEffort = conf.getBoolean(
|
final boolean bestEffort = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY,
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT);
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT);
|
||||||
|
|
||||||
return new ReplaceDatanodeOnFailure(policy, bestEffort);
|
return new ReplaceDatanodeOnFailure(policy, bestEffort);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Policy getPolicy(final Configuration conf) {
|
private static Policy getPolicy(final Configuration conf) {
|
||||||
final boolean enabled = conf.getBoolean(
|
final boolean enabled = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_DEFAULT);
|
||||||
if (!enabled) {
|
if (!enabled) {
|
||||||
return Policy.DISABLE;
|
return Policy.DISABLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
final String policy = conf.get(
|
final String policy = conf.get(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_DEFAULT);
|
||||||
for(int i = 1; i < Policy.values().length; i++) {
|
for(int i = 1; i < Policy.values().length; i++) {
|
||||||
final Policy p = Policy.values()[i];
|
final Policy p = Policy.values()[i];
|
||||||
if (p.name().equalsIgnoreCase(policy)) {
|
if (p.name().equalsIgnoreCase(policy)) {
|
||||||
|
@ -180,7 +180,7 @@ public class ReplaceDatanodeOnFailure {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw new HadoopIllegalArgumentException("Illegal configuration value for "
|
throw new HadoopIllegalArgumentException("Illegal configuration value for "
|
||||||
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
|
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY
|
||||||
+ ": " + policy);
|
+ ": " + policy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,13 +188,13 @@ public class ReplaceDatanodeOnFailure {
|
||||||
public static void write(final Policy policy,
|
public static void write(final Policy policy,
|
||||||
final boolean bestEffort, final Configuration conf) {
|
final boolean bestEffort, final Configuration conf) {
|
||||||
conf.setBoolean(
|
conf.setBoolean(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
|
||||||
policy != Policy.DISABLE);
|
policy != Policy.DISABLE);
|
||||||
conf.set(
|
conf.set(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY,
|
||||||
policy.name());
|
policy.name());
|
||||||
conf.setBoolean(
|
conf.setBoolean(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY,
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY,
|
||||||
bestEffort);
|
bestEffort);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -23,18 +23,16 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.stubbing.Answer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This tests pipeline recovery related client protocol works correct or not.
|
* This tests pipeline recovery related client protocol works correct or not.
|
||||||
|
@ -130,7 +128,7 @@ public class TestClientProtocolForPipelineRecovery {
|
||||||
DFSClientFaultInjector.instance = faultInjector;
|
DFSClientFaultInjector.instance = faultInjector;
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
|
conf.setInt(HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -147,7 +145,7 @@ public class TestClientProtocolForPipelineRecovery {
|
||||||
// Read should succeed.
|
// Read should succeed.
|
||||||
FSDataInputStream in = fileSys.open(file);
|
FSDataInputStream in = fileSys.open(file);
|
||||||
try {
|
try {
|
||||||
int c = in.read();
|
in.read();
|
||||||
// Test will fail with BlockMissingException if NN does not update the
|
// Test will fail with BlockMissingException if NN does not update the
|
||||||
// replica state based on the latest report.
|
// replica state based on the latest report.
|
||||||
} catch (org.apache.hadoop.hdfs.BlockMissingException bme) {
|
} catch (org.apache.hadoop.hdfs.BlockMissingException bme) {
|
||||||
|
|
|
@ -21,15 +21,14 @@ import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.util.ThreadUtil;
|
import org.apache.hadoop.util.ThreadUtil;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -87,13 +86,12 @@ public class TestDFSClientExcludedNodes {
|
||||||
public void testExcludedNodesForgiveness() throws IOException {
|
public void testExcludedNodesForgiveness() throws IOException {
|
||||||
// Forgive nodes in under 2.5s for this test case.
|
// Forgive nodes in under 2.5s for this test case.
|
||||||
conf.setLong(
|
conf.setLong(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
|
HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY,
|
||||||
2500);
|
2500);
|
||||||
// We'll be using a 512 bytes block size just for tests
|
// We'll be using a 512 bytes block size just for tests
|
||||||
// so making sure the checksum bytes too match it.
|
// so making sure the checksum bytes too match it.
|
||||||
conf.setInt("io.bytes.per.checksum", 512);
|
conf.setInt("io.bytes.per.checksum", 512);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
List<DataNodeProperties> props = cluster.dataNodes;
|
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
Path filePath = new Path("/testForgivingExcludedNodes");
|
Path filePath = new Path("/testForgivingExcludedNodes");
|
||||||
|
|
||||||
|
|
|
@ -228,7 +228,7 @@ public class TestDFSClientRetries {
|
||||||
{
|
{
|
||||||
final String exceptionMsg = "Nope, not replicated yet...";
|
final String exceptionMsg = "Nope, not replicated yet...";
|
||||||
final int maxRetries = 1; // Allow one retry (total of two calls)
|
final int maxRetries = 1; // Allow one retry (total of two calls)
|
||||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries);
|
conf.setInt(HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries);
|
||||||
|
|
||||||
NamenodeProtocols mockNN = mock(NamenodeProtocols.class);
|
NamenodeProtocols mockNN = mock(NamenodeProtocols.class);
|
||||||
Answer<Object> answer = new ThrowsException(new IOException()) {
|
Answer<Object> answer = new ThrowsException(new IOException()) {
|
||||||
|
@ -1133,7 +1133,7 @@ public class TestDFSClientRetries {
|
||||||
@Test
|
@Test
|
||||||
public void testDFSClientConfigurationLocateFollowingBlockInitialDelay()
|
public void testDFSClientConfigurationLocateFollowingBlockInitialDelay()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
// test if DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY
|
// test if HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY
|
||||||
// is not configured, verify DFSClient uses the default value 400.
|
// is not configured, verify DFSClient uses the default value 400.
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
try {
|
try {
|
||||||
|
@ -1143,10 +1143,11 @@ public class TestDFSClientRetries {
|
||||||
assertEquals(client.getConf().
|
assertEquals(client.getConf().
|
||||||
getBlockWriteLocateFollowingInitialDelayMs(), 400);
|
getBlockWriteLocateFollowingInitialDelayMs(), 400);
|
||||||
|
|
||||||
// change DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY,
|
// change HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY,
|
||||||
// verify DFSClient uses the configured value 1000.
|
// verify DFSClient uses the configured value 1000.
|
||||||
conf.setInt(DFSConfigKeys.
|
conf.setInt(
|
||||||
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY, 1000);
|
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
|
||||||
|
1000);
|
||||||
client = new DFSClient(null, nn, conf, null);
|
client = new DFSClient(null, nn, conf, null);
|
||||||
assertEquals(client.getConf().
|
assertEquals(client.getConf().
|
||||||
getBlockWriteLocateFollowingInitialDelayMs(), 1000);
|
getBlockWriteLocateFollowingInitialDelayMs(), 1000);
|
||||||
|
|
|
@ -18,8 +18,8 @@
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
@ -169,7 +170,7 @@ public class TestNamenodeCapacityReport {
|
||||||
public void testXceiverCount() throws Exception {
|
public void testXceiverCount() throws Exception {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
// retry one time, if close fails
|
// retry one time, if close fails
|
||||||
conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 1);
|
conf.setInt(HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 1);
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
final int nodes = 8;
|
final int nodes = 8;
|
||||||
|
|
|
@ -34,8 +34,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.util.ByteArrayManager.Counter;
|
import org.apache.hadoop.hdfs.util.ByteArrayManager.Counter;
|
||||||
import org.apache.hadoop.hdfs.util.ByteArrayManager.CounterMap;
|
import org.apache.hadoop.hdfs.util.ByteArrayManager.CounterMap;
|
||||||
import org.apache.hadoop.hdfs.util.ByteArrayManager.FixedLengthManager;
|
import org.apache.hadoop.hdfs.util.ByteArrayManager.FixedLengthManager;
|
||||||
|
@ -578,9 +578,9 @@ public class TestByteArrayManager {
|
||||||
new ByteArrayManager.NewByteArrayWithoutLimit(),
|
new ByteArrayManager.NewByteArrayWithoutLimit(),
|
||||||
new NewByteArrayWithLimit(maxArrays),
|
new NewByteArrayWithLimit(maxArrays),
|
||||||
new ByteArrayManager.Impl(new ByteArrayManager.Conf(
|
new ByteArrayManager.Impl(new ByteArrayManager.Conf(
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT,
|
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT,
|
||||||
maxArrays,
|
maxArrays,
|
||||||
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT))
|
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT))
|
||||||
};
|
};
|
||||||
final double[] avg = new double[impls.length];
|
final double[] avg = new double[impls.length];
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue