HDFS-7260. Change DFSOutputStream.MAX_PACKETS to be configurable.
Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
This commit is contained in:
parent
1c52b6551b
commit
4aae0fe976
|
@ -619,6 +619,8 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-5089. When a LayoutVersion support SNAPSHOT, it must support
|
HDFS-5089. When a LayoutVersion support SNAPSHOT, it must support
|
||||||
FSIMAGE_NAME_OPTIMIZATION. (szetszwo)
|
FSIMAGE_NAME_OPTIMIZATION. (szetszwo)
|
||||||
|
|
||||||
|
HDFS-7260. Change DFSOutputStream.MAX_PACKETS to be configurable. (szetszwo)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)
|
HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)
|
||||||
|
|
|
@ -17,9 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
|
||||||
import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension
|
|
||||||
.EncryptedKeyVersion;
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||||
|
@ -59,6 +56,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
||||||
|
@ -89,10 +88,10 @@ import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
@ -107,7 +106,9 @@ import org.apache.hadoop.crypto.CryptoInputStream;
|
||||||
import org.apache.hadoop.crypto.CryptoOutputStream;
|
import org.apache.hadoop.crypto.CryptoOutputStream;
|
||||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||||
|
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||||
|
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.BlockStorageLocation;
|
import org.apache.hadoop.fs.BlockStorageLocation;
|
||||||
import org.apache.hadoop.fs.CacheFlag;
|
import org.apache.hadoop.fs.CacheFlag;
|
||||||
|
@ -135,8 +136,8 @@ import org.apache.hadoop.fs.XAttr;
|
||||||
import org.apache.hadoop.fs.XAttrSetFlag;
|
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||||
import org.apache.hadoop.fs.permission.AclEntry;
|
import org.apache.hadoop.fs.permission.AclEntry;
|
||||||
import org.apache.hadoop.fs.permission.AclStatus;
|
import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
|
@ -217,6 +218,10 @@ import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.DataChecksum.Type;
|
import org.apache.hadoop.util.DataChecksum.Type;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.htrace.Sampler;
|
||||||
|
import org.htrace.Span;
|
||||||
|
import org.htrace.Trace;
|
||||||
|
import org.htrace.TraceScope;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
@ -294,6 +299,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
final int ioBufferSize;
|
final int ioBufferSize;
|
||||||
final ChecksumOpt defaultChecksumOpt;
|
final ChecksumOpt defaultChecksumOpt;
|
||||||
final int writePacketSize;
|
final int writePacketSize;
|
||||||
|
final int writeMaxPackets;
|
||||||
final int socketTimeout;
|
final int socketTimeout;
|
||||||
final int socketCacheCapacity;
|
final int socketCacheCapacity;
|
||||||
final long socketCacheExpiry;
|
final long socketCacheExpiry;
|
||||||
|
@ -364,6 +370,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
/** dfs.write.packet.size is an internal config variable */
|
/** dfs.write.packet.size is an internal config variable */
|
||||||
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
||||||
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
||||||
|
writeMaxPackets = conf.getInt(DFS_CLIENT_WRITE_MAX_PACKETS_KEY,
|
||||||
|
DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT);
|
||||||
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
|
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
|
||||||
DFS_BLOCK_SIZE_DEFAULT);
|
DFS_BLOCK_SIZE_DEFAULT);
|
||||||
defaultReplication = (short) conf.getInt(
|
defaultReplication = (short) conf.getInt(
|
||||||
|
|
|
@ -50,6 +50,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,...
|
public static final String DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,...
|
||||||
public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
|
public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
|
||||||
public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
|
public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
|
||||||
|
public static final String DFS_CLIENT_WRITE_MAX_PACKETS_KEY = "dfs.client.write.max-packets";
|
||||||
|
public static final int DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT = 80;
|
||||||
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
|
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
|
||||||
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
|
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
|
||||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
|
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
|
||||||
|
|
|
@ -130,7 +130,6 @@ import com.google.common.cache.RemovalNotification;
|
||||||
public class DFSOutputStream extends FSOutputSummer
|
public class DFSOutputStream extends FSOutputSummer
|
||||||
implements Syncable, CanSetDropBehind {
|
implements Syncable, CanSetDropBehind {
|
||||||
private final long dfsclientSlowLogThresholdMs;
|
private final long dfsclientSlowLogThresholdMs;
|
||||||
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
|
|
||||||
/**
|
/**
|
||||||
* Number of times to retry creating a file when there are transient
|
* Number of times to retry creating a file when there are transient
|
||||||
* errors (typically related to encryption zones and KeyProvider operations).
|
* errors (typically related to encryption zones and KeyProvider operations).
|
||||||
|
@ -1783,7 +1782,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
synchronized (dataQueue) {
|
synchronized (dataQueue) {
|
||||||
try {
|
try {
|
||||||
// If queue is full, then wait till we have enough space
|
// If queue is full, then wait till we have enough space
|
||||||
while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
|
while (!closed && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
|
||||||
try {
|
try {
|
||||||
dataQueue.wait();
|
dataQueue.wait();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
Loading…
Reference in New Issue