HDFS-7276. Limit the number of byte arrays used by DFSOutputStream and provide a mechanism for recycling arrays.
Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
This commit is contained in:
parent
0cbc115704
commit
bac064de62
|
@ -434,6 +434,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-7313. Support optional configuration of AES cipher suite on
|
HDFS-7313. Support optional configuration of AES cipher suite on
|
||||||
DataTransferProtocol. (cnauroth)
|
DataTransferProtocol. (cnauroth)
|
||||||
|
|
||||||
|
HDFS-7276. Limit the number of byte arrays used by DFSOutputStream and
|
||||||
|
provide a mechanism for recycling arrays. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSClient.Conf;
|
import org.apache.hadoop.hdfs.DFSClient.Conf;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
|
import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
|
||||||
|
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -84,6 +85,9 @@ public class ClientContext {
|
||||||
*/
|
*/
|
||||||
private volatile boolean disableLegacyBlockReaderLocal = false;
|
private volatile boolean disableLegacyBlockReaderLocal = false;
|
||||||
|
|
||||||
|
/** Creating byte[] for {@link DFSOutputStream}. */
|
||||||
|
private final ByteArrayManager byteArrayManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether or not we complained about a DFSClient fetching a CacheContext that
|
* Whether or not we complained about a DFSClient fetching a CacheContext that
|
||||||
* didn't match its config values yet.
|
* didn't match its config values yet.
|
||||||
|
@ -105,6 +109,8 @@ public class ClientContext {
|
||||||
new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
|
new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
|
||||||
this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
|
this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
|
||||||
this.domainSocketFactory = new DomainSocketFactory(conf);
|
this.domainSocketFactory = new DomainSocketFactory(conf);
|
||||||
|
|
||||||
|
this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String confAsString(Conf conf) {
|
public static String confAsString(Conf conf) {
|
||||||
|
@ -204,4 +210,8 @@ public class ClientContext {
|
||||||
public DomainSocketFactory getDomainSocketFactory() {
|
public DomainSocketFactory getDomainSocketFactory() {
|
||||||
return domainSocketFactory;
|
return domainSocketFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ByteArrayManager getByteArrayManager() {
|
||||||
|
return byteArrayManager;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,8 +56,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
||||||
|
@ -195,6 +193,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
|
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -301,6 +300,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
final ChecksumOpt defaultChecksumOpt;
|
final ChecksumOpt defaultChecksumOpt;
|
||||||
final int writePacketSize;
|
final int writePacketSize;
|
||||||
final int writeMaxPackets;
|
final int writeMaxPackets;
|
||||||
|
final ByteArrayManager.Conf writeByteArrayManagerConf;
|
||||||
final int socketTimeout;
|
final int socketTimeout;
|
||||||
final int socketCacheCapacity;
|
final int socketCacheCapacity;
|
||||||
final long socketCacheExpiry;
|
final long socketCacheExpiry;
|
||||||
|
@ -371,8 +371,30 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
/** dfs.write.packet.size is an internal config variable */
|
/** dfs.write.packet.size is an internal config variable */
|
||||||
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
||||||
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
||||||
writeMaxPackets = conf.getInt(DFS_CLIENT_WRITE_MAX_PACKETS_KEY,
|
writeMaxPackets = conf.getInt(
|
||||||
DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT);
|
DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY,
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT);
|
||||||
|
|
||||||
|
final boolean byteArrayManagerEnabled = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY,
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT);
|
||||||
|
if (!byteArrayManagerEnabled) {
|
||||||
|
writeByteArrayManagerConf = null;
|
||||||
|
} else {
|
||||||
|
final int countThreshold = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY,
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT);
|
||||||
|
final int countLimit = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY,
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT);
|
||||||
|
final long countResetTimePeriodMs = conf.getLong(
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY,
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
|
||||||
|
writeByteArrayManagerConf = new ByteArrayManager.Conf(
|
||||||
|
countThreshold, countLimit, countResetTimePeriodMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
|
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
|
||||||
DFS_BLOCK_SIZE_DEFAULT);
|
DFS_BLOCK_SIZE_DEFAULT);
|
||||||
defaultReplication = (short) conf.getInt(
|
defaultReplication = (short) conf.getInt(
|
||||||
|
|
|
@ -50,10 +50,27 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,...
|
public static final String DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,...
|
||||||
public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
|
public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
|
||||||
public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
|
public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
|
||||||
public static final String DFS_CLIENT_WRITE_MAX_PACKETS_KEY = "dfs.client.write.max-packets";
|
public static final String DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY = "dfs.client.write.max-packets-in-flight";
|
||||||
public static final int DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT = 80;
|
public static final int DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT = 80;
|
||||||
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
|
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
|
||||||
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
|
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
|
||||||
|
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY
|
||||||
|
= "dfs.client.write.byte-array-manager.enabled";
|
||||||
|
public static final boolean DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT
|
||||||
|
= false;
|
||||||
|
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY
|
||||||
|
= "dfs.client.write.byte-array-manager.count-threshold";
|
||||||
|
public static final int DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT
|
||||||
|
= 128;
|
||||||
|
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY
|
||||||
|
= "dfs.client.write.byte-array-manager.count-limit";
|
||||||
|
public static final int DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT
|
||||||
|
= 2048;
|
||||||
|
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY
|
||||||
|
= "dfs.client.write.byte-array-manager.count-reset-time-period-ms";
|
||||||
|
public static final long DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT
|
||||||
|
= 10L * 1000;
|
||||||
|
|
||||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
|
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
|
||||||
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
|
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
|
||||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
|
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
|
||||||
|
|
|
@ -41,10 +41,9 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSOutputSummer;
|
import org.apache.hadoop.fs.FSOutputSummer;
|
||||||
|
@ -55,7 +54,6 @@ import org.apache.hadoop.fs.Syncable;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
||||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -83,6 +81,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
|
import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||||
|
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
@ -99,6 +98,7 @@ import org.htrace.Trace;
|
||||||
import org.htrace.TraceScope;
|
import org.htrace.TraceScope;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.cache.CacheBuilder;
|
import com.google.common.cache.CacheBuilder;
|
||||||
import com.google.common.cache.CacheLoader;
|
import com.google.common.cache.CacheLoader;
|
||||||
import com.google.common.cache.LoadingCache;
|
import com.google.common.cache.LoadingCache;
|
||||||
|
@ -143,6 +143,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
CryptoProtocolVersion.supported();
|
CryptoProtocolVersion.supported();
|
||||||
|
|
||||||
private final DFSClient dfsClient;
|
private final DFSClient dfsClient;
|
||||||
|
private final ByteArrayManager byteArrayManager;
|
||||||
private Socket s;
|
private Socket s;
|
||||||
// closed is accessed by different threads under different locks.
|
// closed is accessed by different threads under different locks.
|
||||||
private volatile boolean closed = false;
|
private volatile boolean closed = false;
|
||||||
|
@ -181,6 +182,33 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
private static final BlockStoragePolicySuite blockStoragePolicySuite =
|
private static final BlockStoragePolicySuite blockStoragePolicySuite =
|
||||||
BlockStoragePolicySuite.createDefaultSuite();
|
BlockStoragePolicySuite.createDefaultSuite();
|
||||||
|
|
||||||
|
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
||||||
|
private Packet createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
|
||||||
|
long seqno) throws InterruptedIOException {
|
||||||
|
final byte[] buf;
|
||||||
|
final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
|
||||||
|
|
||||||
|
try {
|
||||||
|
buf = byteArrayManager.newByteArray(bufferSize);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
final InterruptedIOException iioe = new InterruptedIOException(
|
||||||
|
"seqno=" + seqno);
|
||||||
|
iioe.initCause(ie);
|
||||||
|
throw iioe;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Packet(buf, chunksPerPkt, offsetInBlock, seqno, getChecksumSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For heartbeat packets, create buffer directly by new byte[]
|
||||||
|
* since heartbeats should not be blocked.
|
||||||
|
*/
|
||||||
|
private Packet createHeartbeatPacket() throws InterruptedIOException {
|
||||||
|
final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
|
||||||
|
return new Packet(buf, 0, 0, Packet.HEART_BEAT_SEQNO, getChecksumSize());
|
||||||
|
}
|
||||||
|
|
||||||
private static class Packet {
|
private static class Packet {
|
||||||
private static final long HEART_BEAT_SEQNO = -1L;
|
private static final long HEART_BEAT_SEQNO = -1L;
|
||||||
long seqno; // sequencenumber of buffer in block
|
long seqno; // sequencenumber of buffer in block
|
||||||
|
@ -188,7 +216,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
boolean syncBlock; // this packet forces the current block to disk
|
boolean syncBlock; // this packet forces the current block to disk
|
||||||
int numChunks; // number of chunks currently in packet
|
int numChunks; // number of chunks currently in packet
|
||||||
final int maxChunks; // max chunks in packet
|
final int maxChunks; // max chunks in packet
|
||||||
final byte[] buf;
|
private byte[] buf;
|
||||||
private boolean lastPacketInBlock; // is this the last packet in block?
|
private boolean lastPacketInBlock; // is this the last packet in block?
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -210,13 +238,6 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
final int dataStart;
|
final int dataStart;
|
||||||
int dataPos;
|
int dataPos;
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a heartbeat packet.
|
|
||||||
*/
|
|
||||||
Packet(int checksumSize) {
|
|
||||||
this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new packet.
|
* Create a new packet.
|
||||||
*
|
*
|
||||||
|
@ -225,15 +246,15 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
* @param chunksPerPkt maximum number of chunks per packet.
|
* @param chunksPerPkt maximum number of chunks per packet.
|
||||||
* @param offsetInBlock offset in bytes into the HDFS block.
|
* @param offsetInBlock offset in bytes into the HDFS block.
|
||||||
*/
|
*/
|
||||||
Packet(int pktSize, int chunksPerPkt, long offsetInBlock,
|
private Packet(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
|
||||||
long seqno, int checksumSize) {
|
int checksumSize) {
|
||||||
this.lastPacketInBlock = false;
|
this.lastPacketInBlock = false;
|
||||||
this.numChunks = 0;
|
this.numChunks = 0;
|
||||||
this.offsetInBlock = offsetInBlock;
|
this.offsetInBlock = offsetInBlock;
|
||||||
this.seqno = seqno;
|
this.seqno = seqno;
|
||||||
|
|
||||||
buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
|
this.buf = buf;
|
||||||
|
|
||||||
checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
|
checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
|
||||||
checksumPos = checksumStart;
|
checksumPos = checksumStart;
|
||||||
dataStart = checksumStart + (chunksPerPkt * checksumSize);
|
dataStart = checksumStart + (chunksPerPkt * checksumSize);
|
||||||
|
@ -304,6 +325,11 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
|
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void releaseBuffer(ByteArrayManager bam) {
|
||||||
|
bam.release(buf);
|
||||||
|
buf = null;
|
||||||
|
}
|
||||||
|
|
||||||
// get the packet's last byte's offset in the block
|
// get the packet's last byte's offset in the block
|
||||||
long getLastByteOffsetBlock() {
|
long getLastByteOffsetBlock() {
|
||||||
|
@ -547,7 +573,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
// get packet to be sent.
|
// get packet to be sent.
|
||||||
if (dataQueue.isEmpty()) {
|
if (dataQueue.isEmpty()) {
|
||||||
one = new Packet(getChecksumSize()); // heartbeat packet
|
one = createHeartbeatPacket();
|
||||||
} else {
|
} else {
|
||||||
one = dataQueue.getFirst(); // regular data packet
|
one = dataQueue.getFirst(); // regular data packet
|
||||||
}
|
}
|
||||||
|
@ -907,6 +933,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
lastAckedSeqno = seqno;
|
lastAckedSeqno = seqno;
|
||||||
ackQueue.removeFirst();
|
ackQueue.removeFirst();
|
||||||
dataQueue.notifyAll();
|
dataQueue.notifyAll();
|
||||||
|
|
||||||
|
one.releaseBuffer(byteArrayManager);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (!responderClosed) {
|
if (!responderClosed) {
|
||||||
|
@ -1657,6 +1685,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
|
|
||||||
this.dfsclientSlowLogThresholdMs =
|
this.dfsclientSlowLogThresholdMs =
|
||||||
dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
|
dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
|
||||||
|
this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Construct a new output stream for creating a file. */
|
/** Construct a new output stream for creating a file. */
|
||||||
|
@ -1836,8 +1865,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
|
|
||||||
if (currentPacket == null) {
|
if (currentPacket == null) {
|
||||||
currentPacket = new Packet(packetSize, chunksPerPacket,
|
currentPacket = createPacket(packetSize, chunksPerPacket,
|
||||||
bytesCurBlock, currentSeqno++, getChecksumSize());
|
bytesCurBlock, currentSeqno++);
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
|
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
|
||||||
currentPacket.seqno +
|
currentPacket.seqno +
|
||||||
|
@ -1884,8 +1913,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
// indicate the end of block and reset bytesCurBlock.
|
// indicate the end of block and reset bytesCurBlock.
|
||||||
//
|
//
|
||||||
if (bytesCurBlock == blockSize) {
|
if (bytesCurBlock == blockSize) {
|
||||||
currentPacket = new Packet(0, 0, bytesCurBlock,
|
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
|
||||||
currentSeqno++, getChecksumSize());
|
|
||||||
currentPacket.lastPacketInBlock = true;
|
currentPacket.lastPacketInBlock = true;
|
||||||
currentPacket.syncBlock = shouldSyncBlock;
|
currentPacket.syncBlock = shouldSyncBlock;
|
||||||
waitAndQueueCurrentPacket();
|
waitAndQueueCurrentPacket();
|
||||||
|
@ -1978,8 +2006,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
// Nothing to send right now,
|
// Nothing to send right now,
|
||||||
// but sync was requested.
|
// but sync was requested.
|
||||||
// Send an empty packet
|
// Send an empty packet
|
||||||
currentPacket = new Packet(packetSize, chunksPerPacket,
|
currentPacket = createPacket(packetSize, chunksPerPacket,
|
||||||
bytesCurBlock, currentSeqno++, getChecksumSize());
|
bytesCurBlock, currentSeqno++);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (isSync && bytesCurBlock > 0) {
|
if (isSync && bytesCurBlock > 0) {
|
||||||
|
@ -1987,8 +2015,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
// and the block was partially written,
|
// and the block was partially written,
|
||||||
// and sync was requested.
|
// and sync was requested.
|
||||||
// So send an empty sync packet.
|
// So send an empty sync packet.
|
||||||
currentPacket = new Packet(packetSize, chunksPerPacket,
|
currentPacket = createPacket(packetSize, chunksPerPacket,
|
||||||
bytesCurBlock, currentSeqno++, getChecksumSize());
|
bytesCurBlock, currentSeqno++);
|
||||||
} else {
|
} else {
|
||||||
// just discard the current packet since it is already been sent.
|
// just discard the current packet since it is already been sent.
|
||||||
currentPacket = null;
|
currentPacket = null;
|
||||||
|
@ -2192,7 +2220,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
|
|
||||||
if (bytesCurBlock != 0) {
|
if (bytesCurBlock != 0) {
|
||||||
// send an empty packet to mark the end of the block
|
// send an empty packet to mark the end of the block
|
||||||
currentPacket = new Packet(0, 0, bytesCurBlock, currentSeqno++, getChecksumSize());
|
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
|
||||||
currentPacket.lastPacketInBlock = true;
|
currentPacket.lastPacketInBlock = true;
|
||||||
currentPacket.syncBlock = shouldSyncBlock;
|
currentPacket.syncBlock = shouldSyncBlock;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,419 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.util;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Queue;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manage byte array creation and release.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public abstract class ByteArrayManager {
|
||||||
|
static final Log LOG = LogFactory.getLog(ByteArrayManager.class);
|
||||||
|
private static final ThreadLocal<StringBuilder> debugMessage = new ThreadLocal<StringBuilder>() {
|
||||||
|
protected StringBuilder initialValue() {
|
||||||
|
return new StringBuilder();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private static void logDebugMessage() {
|
||||||
|
final StringBuilder b = debugMessage.get();
|
||||||
|
LOG.debug(b);
|
||||||
|
b.setLength(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static final int MIN_ARRAY_LENGTH = 32;
|
||||||
|
static final byte[] EMPTY_BYTE_ARRAY = {};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the least power of two greater than or equal to n, i.e. return
|
||||||
|
* the least integer x with x >= n and x a power of two.
|
||||||
|
*
|
||||||
|
* @throws HadoopIllegalArgumentException
|
||||||
|
* if n <= 0.
|
||||||
|
*/
|
||||||
|
public static int leastPowerOfTwo(final int n) {
|
||||||
|
if (n <= 0) {
|
||||||
|
throw new HadoopIllegalArgumentException("n = " + n + " <= 0");
|
||||||
|
}
|
||||||
|
|
||||||
|
final int highestOne = Integer.highestOneBit(n);
|
||||||
|
if (highestOne == n) {
|
||||||
|
return n; // n is a power of two.
|
||||||
|
}
|
||||||
|
final int roundUp = highestOne << 1;
|
||||||
|
if (roundUp < 0) {
|
||||||
|
final long overflow = ((long) highestOne) << 1;
|
||||||
|
throw new ArithmeticException(
|
||||||
|
"Overflow: for n = " + n + ", the least power of two (the least"
|
||||||
|
+ " integer x with x >= n and x a power of two) = "
|
||||||
|
+ overflow + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE);
|
||||||
|
}
|
||||||
|
return roundUp;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A counter with a time stamp so that it is reset automatically
|
||||||
|
* if there is no increment for the time period.
|
||||||
|
*/
|
||||||
|
static class Counter {
|
||||||
|
private final long countResetTimePeriodMs;
|
||||||
|
private long count = 0L;
|
||||||
|
private long timestamp = Time.monotonicNow();
|
||||||
|
|
||||||
|
Counter(long countResetTimePeriodMs) {
|
||||||
|
this.countResetTimePeriodMs = countResetTimePeriodMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized long getCount() {
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increment the counter, and reset it if there is no increment
|
||||||
|
* for acertain time period.
|
||||||
|
*
|
||||||
|
* @return the new count.
|
||||||
|
*/
|
||||||
|
synchronized long increment() {
|
||||||
|
final long now = Time.monotonicNow();
|
||||||
|
if (now - timestamp > countResetTimePeriodMs) {
|
||||||
|
count = 0; // reset the counter
|
||||||
|
}
|
||||||
|
timestamp = now;
|
||||||
|
return ++count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** A map from integers to counters. */
|
||||||
|
static class CounterMap {
|
||||||
|
/** @see ByteArrayManager.Conf#countResetTimePeriodMs */
|
||||||
|
private final long countResetTimePeriodMs;
|
||||||
|
private final Map<Integer, Counter> map = new HashMap<Integer, Counter>();
|
||||||
|
|
||||||
|
private CounterMap(long countResetTimePeriodMs) {
|
||||||
|
this.countResetTimePeriodMs = countResetTimePeriodMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the counter for the given key;
|
||||||
|
* and create a new counter if it does not exist.
|
||||||
|
*/
|
||||||
|
synchronized Counter get(final Integer key, final boolean createIfNotExist) {
|
||||||
|
Counter count = map.get(key);
|
||||||
|
if (count == null && createIfNotExist) {
|
||||||
|
count = new Counter(countResetTimePeriodMs);
|
||||||
|
map.put(key, count);
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void clear() {
|
||||||
|
map.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Manage byte arrays with the same fixed length. */
|
||||||
|
static class FixedLengthManager {
|
||||||
|
private final int byteArrayLength;
|
||||||
|
private final int maxAllocated;
|
||||||
|
private final Queue<byte[]> freeQueue = new LinkedList<byte[]>();
|
||||||
|
|
||||||
|
private int numAllocated = 0;
|
||||||
|
|
||||||
|
FixedLengthManager(int arrayLength, int maxAllocated) {
|
||||||
|
this.byteArrayLength = arrayLength;
|
||||||
|
this.maxAllocated = maxAllocated;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a byte array.
|
||||||
|
*
|
||||||
|
* If the number of allocated arrays >= maximum, the current thread is
|
||||||
|
* blocked until the number of allocated arrays drops to below the maximum.
|
||||||
|
*
|
||||||
|
* The byte array allocated by this method must be returned for recycling
|
||||||
|
* via the {@link FixedLengthManager#recycle(byte[])} method.
|
||||||
|
*/
|
||||||
|
synchronized byte[] allocate() throws InterruptedException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
debugMessage.get().append(", ").append(this);
|
||||||
|
}
|
||||||
|
for(; numAllocated >= maxAllocated;) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
debugMessage.get().append(": wait ...");
|
||||||
|
logDebugMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
wait();
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
debugMessage.get().append("wake up: ").append(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
numAllocated++;
|
||||||
|
|
||||||
|
final byte[] array = freeQueue.poll();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
debugMessage.get().append(", recycled? ").append(array != null);
|
||||||
|
}
|
||||||
|
return array != null? array : new byte[byteArrayLength];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recycle the given byte array, which must have the same length as the
|
||||||
|
* array length managed by this object.
|
||||||
|
*
|
||||||
|
* The byte array may or may not be allocated
|
||||||
|
* by the {@link FixedLengthManager#allocate()} method.
|
||||||
|
*/
|
||||||
|
synchronized int recycle(byte[] array) {
|
||||||
|
Preconditions.checkNotNull(array);
|
||||||
|
Preconditions.checkArgument(array.length == byteArrayLength);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
debugMessage.get().append(", ").append(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numAllocated == maxAllocated) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
debugMessage.get().append(", notifyAll");
|
||||||
|
}
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
numAllocated--;
|
||||||
|
if (numAllocated < 0) {
|
||||||
|
// it is possible to drop below 0 since
|
||||||
|
// some byte arrays may not be created by the allocate() method.
|
||||||
|
numAllocated = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (freeQueue.size() < maxAllocated - numAllocated) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
debugMessage.get().append(", freeQueue.offer");
|
||||||
|
}
|
||||||
|
freeQueue.offer(array);
|
||||||
|
}
|
||||||
|
return freeQueue.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized String toString() {
|
||||||
|
return "[" + byteArrayLength + ": " + numAllocated + "/"
|
||||||
|
+ maxAllocated + ", free=" + freeQueue.size() + "]";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** A map from array lengths to byte array managers. */
|
||||||
|
static class ManagerMap {
|
||||||
|
private final int countLimit;
|
||||||
|
private final Map<Integer, FixedLengthManager> map = new HashMap<Integer, FixedLengthManager>();
|
||||||
|
|
||||||
|
ManagerMap(int countLimit) {
|
||||||
|
this.countLimit = countLimit;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @return the manager for the given array length. */
|
||||||
|
synchronized FixedLengthManager get(final Integer arrayLength,
|
||||||
|
final boolean createIfNotExist) {
|
||||||
|
FixedLengthManager manager = map.get(arrayLength);
|
||||||
|
if (manager == null && createIfNotExist) {
|
||||||
|
manager = new FixedLengthManager(arrayLength, countLimit);
|
||||||
|
map.put(arrayLength, manager);
|
||||||
|
}
|
||||||
|
return manager;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void clear() {
|
||||||
|
map.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Conf {
|
||||||
|
/**
|
||||||
|
* The count threshold for each array length so that a manager is created
|
||||||
|
* only after the allocation count exceeds the threshold.
|
||||||
|
*/
|
||||||
|
private final int countThreshold;
|
||||||
|
/**
|
||||||
|
* The maximum number of arrays allowed for each array length.
|
||||||
|
*/
|
||||||
|
private final int countLimit;
|
||||||
|
/**
|
||||||
|
* The time period in milliseconds that the allocation count for each array
|
||||||
|
* length is reset to zero if there is no increment.
|
||||||
|
*/
|
||||||
|
private final long countResetTimePeriodMs;
|
||||||
|
|
||||||
|
public Conf(int countThreshold, int countLimit, long countResetTimePeriodMs) {
|
||||||
|
this.countThreshold = countThreshold;
|
||||||
|
this.countLimit = countLimit;
|
||||||
|
this.countResetTimePeriodMs = countResetTimePeriodMs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a byte array for the given length, where the length of
|
||||||
|
* the returned array is larger than or equal to the given length.
|
||||||
|
*
|
||||||
|
* The current thread may be blocked if some resource is unavailable.
|
||||||
|
*
|
||||||
|
* The byte array created by this method must be released
|
||||||
|
* via the {@link ByteArrayManager#release(byte[])} method.
|
||||||
|
*
|
||||||
|
* @return a byte array with length larger than or equal to the given length.
|
||||||
|
*/
|
||||||
|
public abstract byte[] newByteArray(int size) throws InterruptedException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release the given byte array.
|
||||||
|
*
|
||||||
|
* The byte array may or may not be created
|
||||||
|
* by the {@link ByteArrayManager#newByteArray(int)} method.
|
||||||
|
*
|
||||||
|
* @return the number of free array.
|
||||||
|
*/
|
||||||
|
public abstract int release(byte[] array);
|
||||||
|
|
||||||
|
public static ByteArrayManager newInstance(Conf conf) {
|
||||||
|
return conf == null? new NewByteArrayWithoutLimit(): new Impl(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A dummy implementation which simply calls new byte[].
|
||||||
|
*/
|
||||||
|
static class NewByteArrayWithoutLimit extends ByteArrayManager {
|
||||||
|
@Override
|
||||||
|
public byte[] newByteArray(int size) throws InterruptedException {
|
||||||
|
return new byte[size];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int release(byte[] array) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manage byte array allocation and provide a mechanism for recycling the byte
|
||||||
|
* array objects.
|
||||||
|
*/
|
||||||
|
static class Impl extends ByteArrayManager {
|
||||||
|
private final Conf conf;
|
||||||
|
|
||||||
|
private final CounterMap counters;
|
||||||
|
private final ManagerMap managers;
|
||||||
|
|
||||||
|
Impl(Conf conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
this.counters = new CounterMap(conf.countResetTimePeriodMs);
|
||||||
|
this.managers = new ManagerMap(conf.countLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a byte array, where the length of the allocated array
|
||||||
|
* is the least power of two of the given length
|
||||||
|
* unless the given length is less than {@link #MIN_ARRAY_LENGTH}.
|
||||||
|
* In such case, the returned array length is equal to {@link #MIN_ARRAY_LENGTH}.
|
||||||
|
*
|
||||||
|
* If the number of allocated arrays exceeds the capacity,
|
||||||
|
* the current thread is blocked until
|
||||||
|
* the number of allocated arrays drops to below the capacity.
|
||||||
|
*
|
||||||
|
* The byte array allocated by this method must be returned for recycling
|
||||||
|
* via the {@link ByteArrayManager#recycle(byte[])} method.
|
||||||
|
*
|
||||||
|
* @return a byte array with length larger than or equal to the given length.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public byte[] newByteArray(final int arrayLength) throws InterruptedException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
debugMessage.get().append("allocate(").append(arrayLength).append(")");
|
||||||
|
}
|
||||||
|
|
||||||
|
final byte[] array;
|
||||||
|
if (arrayLength == 0) {
|
||||||
|
array = EMPTY_BYTE_ARRAY;
|
||||||
|
} else {
|
||||||
|
final int powerOfTwo = arrayLength <= MIN_ARRAY_LENGTH?
|
||||||
|
MIN_ARRAY_LENGTH: leastPowerOfTwo(arrayLength);
|
||||||
|
final long count = counters.get(powerOfTwo, true).increment();
|
||||||
|
final boolean aboveThreshold = count > conf.countThreshold;
|
||||||
|
// create a new manager only if the count is above threshold.
|
||||||
|
final FixedLengthManager manager = managers.get(powerOfTwo, aboveThreshold);
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
debugMessage.get().append(": count=").append(count)
|
||||||
|
.append(aboveThreshold? ", aboveThreshold": ", belowThreshold");
|
||||||
|
}
|
||||||
|
array = manager != null? manager.allocate(): new byte[powerOfTwo];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
logDebugMessage();
|
||||||
|
}
|
||||||
|
return array;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recycle the given byte array.
|
||||||
|
*
|
||||||
|
* The byte array may or may not be allocated
|
||||||
|
* by the {@link ByteArrayManager#allocate(int)} method.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int release(final byte[] array) {
|
||||||
|
Preconditions.checkNotNull(array);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
debugMessage.get().append("recycle: array.length=").append(array.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
final int freeQueueSize;
|
||||||
|
if (array.length == 0) {
|
||||||
|
freeQueueSize = -1;
|
||||||
|
} else {
|
||||||
|
final FixedLengthManager manager = managers.get(array.length, false);
|
||||||
|
freeQueueSize = manager == null? -1: manager.recycle(array);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
debugMessage.get().append(", freeQueueSize=").append(freeQueueSize);
|
||||||
|
logDebugMessage();
|
||||||
|
}
|
||||||
|
return freeQueueSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
CounterMap getCounters() {
|
||||||
|
return counters;
|
||||||
|
}
|
||||||
|
|
||||||
|
ManagerMap getManagers() {
|
||||||
|
return managers;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,635 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.util;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.util.ByteArrayManager.Counter;
|
||||||
|
import org.apache.hadoop.hdfs.util.ByteArrayManager.CounterMap;
|
||||||
|
import org.apache.hadoop.hdfs.util.ByteArrayManager.FixedLengthManager;
|
||||||
|
import org.apache.hadoop.hdfs.util.ByteArrayManager.ManagerMap;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test {@link ByteArrayManager}.
|
||||||
|
*/
|
||||||
|
public class TestByteArrayManager {
|
||||||
|
static {
|
||||||
|
((Log4JLogger)LogFactory.getLog(ByteArrayManager.class)
|
||||||
|
).getLogger().setLevel(Level.ALL);
|
||||||
|
}
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(TestByteArrayManager.class);
|
||||||
|
|
||||||
|
private static final Comparator<Future<Integer>> CMP = new Comparator<Future<Integer>>() {
|
||||||
|
@Override
|
||||||
|
public int compare(Future<Integer> left, Future<Integer> right) {
|
||||||
|
try {
|
||||||
|
return left.get().intValue() - right.get().intValue();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCounter() throws Exception {
|
||||||
|
final long countResetTimePeriodMs = 200L;
|
||||||
|
final Counter c = new Counter(countResetTimePeriodMs);
|
||||||
|
|
||||||
|
final int n = DFSUtil.getRandom().nextInt(512) + 512;
|
||||||
|
final List<Future<Integer>> futures = new ArrayList<Future<Integer>>(n);
|
||||||
|
|
||||||
|
final ExecutorService pool = Executors.newFixedThreadPool(32);
|
||||||
|
try {
|
||||||
|
// increment
|
||||||
|
for(int i = 0; i < n; i++) {
|
||||||
|
futures.add(pool.submit(new Callable<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
return (int)c.increment();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort and wait for the futures
|
||||||
|
Collections.sort(futures, CMP);
|
||||||
|
} finally {
|
||||||
|
pool.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
// check futures
|
||||||
|
Assert.assertEquals(n, futures.size());
|
||||||
|
for(int i = 0; i < n; i++) {
|
||||||
|
Assert.assertEquals(i + 1, futures.get(i).get().intValue());
|
||||||
|
}
|
||||||
|
Assert.assertEquals(n, c.getCount());
|
||||||
|
|
||||||
|
// test auto-reset
|
||||||
|
Thread.sleep(countResetTimePeriodMs + 100);
|
||||||
|
Assert.assertEquals(1, c.increment());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAllocateRecycle() throws Exception {
|
||||||
|
final int countThreshold = 4;
|
||||||
|
final int countLimit = 8;
|
||||||
|
final long countResetTimePeriodMs = 200L;
|
||||||
|
final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
|
||||||
|
new ByteArrayManager.Conf(
|
||||||
|
countThreshold, countLimit, countResetTimePeriodMs));
|
||||||
|
|
||||||
|
final CounterMap counters = bam.getCounters();
|
||||||
|
final ManagerMap managers = bam.getManagers();
|
||||||
|
|
||||||
|
final int[] uncommonArrays = {0, 1, 2, 4, 8, 16, 32, 64};
|
||||||
|
final int arrayLength = 1024;
|
||||||
|
|
||||||
|
|
||||||
|
final Allocator allocator = new Allocator(bam);
|
||||||
|
final Recycler recycler = new Recycler(bam);
|
||||||
|
try {
|
||||||
|
{ // allocate within threshold
|
||||||
|
for(int i = 0; i < countThreshold; i++) {
|
||||||
|
allocator.submit(arrayLength);
|
||||||
|
}
|
||||||
|
waitForAll(allocator.futures);
|
||||||
|
|
||||||
|
Assert.assertEquals(countThreshold,
|
||||||
|
counters.get(arrayLength, false).getCount());
|
||||||
|
Assert.assertNull(managers.get(arrayLength, false));
|
||||||
|
for(int n : uncommonArrays) {
|
||||||
|
Assert.assertNull(counters.get(n, false));
|
||||||
|
Assert.assertNull(managers.get(n, false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // recycle half of the arrays
|
||||||
|
for(int i = 0; i < countThreshold/2; i++) {
|
||||||
|
recycler.submit(removeLast(allocator.futures));
|
||||||
|
}
|
||||||
|
|
||||||
|
for(Future<Integer> f : recycler.furtures) {
|
||||||
|
Assert.assertEquals(-1, f.get().intValue());
|
||||||
|
}
|
||||||
|
recycler.furtures.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // allocate one more
|
||||||
|
allocator.submit(arrayLength).get();
|
||||||
|
|
||||||
|
Assert.assertEquals(countThreshold + 1, counters.get(arrayLength, false).getCount());
|
||||||
|
Assert.assertNotNull(managers.get(arrayLength, false));
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // recycle the remaining arrays
|
||||||
|
final int n = allocator.recycleAll(recycler);
|
||||||
|
|
||||||
|
recycler.verify(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// allocate until the maximum.
|
||||||
|
for(int i = 0; i < countLimit; i++) {
|
||||||
|
allocator.submit(arrayLength);
|
||||||
|
}
|
||||||
|
waitForAll(allocator.futures);
|
||||||
|
|
||||||
|
// allocate one more should be blocked
|
||||||
|
final AllocatorThread t = new AllocatorThread(arrayLength, bam);
|
||||||
|
t.start();
|
||||||
|
|
||||||
|
// check if the thread is waiting, timed wait or runnable.
|
||||||
|
for(int i = 0; i < 5; i++) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
final Thread.State threadState = t.getState();
|
||||||
|
if (threadState != Thread.State.RUNNABLE
|
||||||
|
&& threadState != Thread.State.WAITING
|
||||||
|
&& threadState != Thread.State.TIMED_WAITING) {
|
||||||
|
Assert.fail("threadState = " + threadState);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// recycle an array
|
||||||
|
recycler.submit(removeLast(allocator.futures));
|
||||||
|
Assert.assertEquals(1, removeLast(recycler.furtures).intValue());
|
||||||
|
|
||||||
|
// check if the thread is unblocked
|
||||||
|
Thread.sleep(100);
|
||||||
|
Assert.assertEquals(Thread.State.TERMINATED, t.getState());
|
||||||
|
|
||||||
|
// recycle the remaining, the recycle should be full.
|
||||||
|
Assert.assertEquals(countLimit-1, allocator.recycleAll(recycler));
|
||||||
|
recycler.submit(t.array);
|
||||||
|
recycler.verify(countLimit);
|
||||||
|
|
||||||
|
// recycle one more; it should not increase the free queue size
|
||||||
|
Assert.assertEquals(countLimit, bam.release(new byte[arrayLength]));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
allocator.pool.shutdown();
|
||||||
|
recycler.pool.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static <T> T removeLast(List<Future<T>> furtures) throws Exception {
|
||||||
|
return remove(furtures, furtures.size() - 1);
|
||||||
|
}
|
||||||
|
static <T> T remove(List<Future<T>> furtures, int i) throws Exception {
|
||||||
|
return furtures.isEmpty()? null: furtures.remove(i).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
static <T> void waitForAll(List<Future<T>> furtures) throws Exception {
|
||||||
|
for(Future<T> f : furtures) {
|
||||||
|
f.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class AllocatorThread extends Thread {
|
||||||
|
private final ByteArrayManager bam;
|
||||||
|
private final int arrayLength;
|
||||||
|
private byte[] array;
|
||||||
|
|
||||||
|
AllocatorThread(int arrayLength, ByteArrayManager bam) {
|
||||||
|
this.bam = bam;
|
||||||
|
this.arrayLength = arrayLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
array = bam.newByteArray(arrayLength);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class Allocator {
|
||||||
|
private final ByteArrayManager bam;
|
||||||
|
final ExecutorService pool = Executors.newFixedThreadPool(8);
|
||||||
|
final List<Future<byte[]>> futures = new LinkedList<Future<byte[]>>();
|
||||||
|
|
||||||
|
Allocator(ByteArrayManager bam) {
|
||||||
|
this.bam = bam;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<byte[]> submit(final int arrayLength) {
|
||||||
|
final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
|
||||||
|
@Override
|
||||||
|
public byte[] call() throws Exception {
|
||||||
|
final byte[] array = bam.newByteArray(arrayLength);
|
||||||
|
Assert.assertEquals(arrayLength, array.length);
|
||||||
|
return array;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
futures.add(f);
|
||||||
|
return f;
|
||||||
|
}
|
||||||
|
|
||||||
|
int recycleAll(Recycler recycler) throws Exception {
|
||||||
|
final int n = futures.size();
|
||||||
|
for(Future<byte[]> f : futures) {
|
||||||
|
recycler.submit(f.get());
|
||||||
|
}
|
||||||
|
futures.clear();
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class Recycler {
|
||||||
|
private final ByteArrayManager bam;
|
||||||
|
final ExecutorService pool = Executors.newFixedThreadPool(8);
|
||||||
|
final List<Future<Integer>> furtures = new LinkedList<Future<Integer>>();
|
||||||
|
|
||||||
|
Recycler(ByteArrayManager bam) {
|
||||||
|
this.bam = bam;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Integer> submit(final byte[] array) {
|
||||||
|
final Future<Integer> f = pool.submit(new Callable<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
return bam.release(array);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
furtures.add(f);
|
||||||
|
return f;
|
||||||
|
}
|
||||||
|
|
||||||
|
void verify(final int expectedSize) throws Exception {
|
||||||
|
Assert.assertEquals(expectedSize, furtures.size());
|
||||||
|
Collections.sort(furtures, CMP);
|
||||||
|
for(int i = 0; i < furtures.size(); i++) {
|
||||||
|
Assert.assertEquals(i+1, furtures.get(i).get().intValue());
|
||||||
|
}
|
||||||
|
furtures.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testByteArrayManager() throws Exception {
|
||||||
|
final int countThreshold = 32;
|
||||||
|
final int countLimit = 64;
|
||||||
|
final long countResetTimePeriodMs = 1000L;
|
||||||
|
final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
|
||||||
|
new ByteArrayManager.Conf(
|
||||||
|
countThreshold, countLimit, countResetTimePeriodMs));
|
||||||
|
|
||||||
|
final CounterMap counters = bam.getCounters();
|
||||||
|
final ManagerMap managers = bam.getManagers();
|
||||||
|
|
||||||
|
final ExecutorService pool = Executors.newFixedThreadPool(128);
|
||||||
|
|
||||||
|
final Runner[] runners = new Runner[Runner.NUM_RUNNERS];
|
||||||
|
final Thread[] threads = new Thread[runners.length];
|
||||||
|
|
||||||
|
final int num = 1 << 8;
|
||||||
|
for(int i = 0; i < runners.length; i++) {
|
||||||
|
runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam);
|
||||||
|
threads[i] = runners[i].start(num);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Thread randomRecycler = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
LOG.info("randomRecycler start");
|
||||||
|
for(int i = 0; shouldRun(); i++) {
|
||||||
|
final int j = DFSUtil.getRandom().nextInt(runners.length);
|
||||||
|
try {
|
||||||
|
runners[j].recycle();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
Assert.fail(this + " has " + e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((i & 0xFF) == 0) {
|
||||||
|
sleepMs(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("randomRecycler done");
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean shouldRun() {
|
||||||
|
for(int i = 0; i < runners.length; i++) {
|
||||||
|
if (threads[i].isAlive()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!runners[i].isEmpty()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
randomRecycler.start();
|
||||||
|
|
||||||
|
randomRecycler.join();
|
||||||
|
|
||||||
|
Assert.assertNull(counters.get(0, false));
|
||||||
|
for(int i = 1; i < runners.length; i++) {
|
||||||
|
if (!runners[i].assertionErrors.isEmpty()) {
|
||||||
|
for(AssertionError e : runners[i].assertionErrors) {
|
||||||
|
LOG.error("AssertionError " + i, e);
|
||||||
|
}
|
||||||
|
Assert.fail(runners[i].assertionErrors.size() + " AssertionError(s)");
|
||||||
|
}
|
||||||
|
|
||||||
|
final int arrayLength = Runner.index2arrayLength(i);
|
||||||
|
final boolean exceedCountThreshold = counters.get(arrayLength, false).getCount() > countThreshold;
|
||||||
|
final FixedLengthManager m = managers.get(arrayLength, false);
|
||||||
|
if (exceedCountThreshold) {
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
} else {
|
||||||
|
Assert.assertNull(m);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void sleepMs(long ms) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(ms);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
Assert.fail("Sleep is interrupted: " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class Runner implements Runnable {
|
||||||
|
static final int NUM_RUNNERS = 4;
|
||||||
|
|
||||||
|
static int index2arrayLength(int index) {
|
||||||
|
return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final ByteArrayManager bam;
|
||||||
|
final int maxArrayLength;
|
||||||
|
final int countThreshold;
|
||||||
|
final int maxArrays;
|
||||||
|
final ExecutorService pool;
|
||||||
|
final List<Future<byte[]>> arrays = new ArrayList<Future<byte[]>>();
|
||||||
|
|
||||||
|
final AtomicInteger count = new AtomicInteger();
|
||||||
|
final int p;
|
||||||
|
private int n;
|
||||||
|
|
||||||
|
final List<AssertionError> assertionErrors = new ArrayList<AssertionError>();
|
||||||
|
|
||||||
|
Runner(int index, int countThreshold, int maxArrays,
|
||||||
|
ExecutorService pool, int p, ByteArrayManager bam) {
|
||||||
|
this.maxArrayLength = index2arrayLength(index);
|
||||||
|
this.countThreshold = countThreshold;
|
||||||
|
this.maxArrays = maxArrays;
|
||||||
|
this.pool = pool;
|
||||||
|
this.p = p;
|
||||||
|
this.bam = bam;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isEmpty() {
|
||||||
|
synchronized (arrays) {
|
||||||
|
return arrays.isEmpty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<byte[]> submitAllocate() {
|
||||||
|
count.incrementAndGet();
|
||||||
|
|
||||||
|
final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
|
||||||
|
@Override
|
||||||
|
public byte[] call() throws Exception {
|
||||||
|
final int lower = maxArrayLength == ByteArrayManager.MIN_ARRAY_LENGTH?
|
||||||
|
0: maxArrayLength >> 1;
|
||||||
|
final int arrayLength = DFSUtil.getRandom().nextInt(
|
||||||
|
maxArrayLength - lower) + lower + 1;
|
||||||
|
final byte[] array = bam.newByteArray(arrayLength);
|
||||||
|
try {
|
||||||
|
Assert.assertEquals("arrayLength=" + arrayLength + ", lower=" + lower,
|
||||||
|
maxArrayLength, array.length);
|
||||||
|
} catch(AssertionError e) {
|
||||||
|
assertionErrors.add(e);
|
||||||
|
}
|
||||||
|
return array;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
synchronized (arrays) {
|
||||||
|
arrays.add(f);
|
||||||
|
}
|
||||||
|
return f;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] removeFirst() throws Exception {
|
||||||
|
synchronized (arrays) {
|
||||||
|
return remove(arrays, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void recycle() throws Exception {
|
||||||
|
final byte[] a = removeFirst();
|
||||||
|
if (a != null) {
|
||||||
|
recycle(a);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int recycle(final byte[] array) {
|
||||||
|
return bam.release(array);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Integer> submitRecycle(final byte[] array) {
|
||||||
|
count.decrementAndGet();
|
||||||
|
|
||||||
|
final Future<Integer> f = pool.submit(new Callable<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
return recycle(array);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return f;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for(int i = 0; i < n; i++) {
|
||||||
|
final boolean isAllocate = DFSUtil.getRandom().nextInt(NUM_RUNNERS) < p;
|
||||||
|
if (isAllocate) {
|
||||||
|
submitAllocate();
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
final byte[] a = removeFirst();
|
||||||
|
if (a != null) {
|
||||||
|
submitRecycle(a);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
Assert.fail(this + " has " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((i & 0xFF) == 0) {
|
||||||
|
sleepMs(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread start(int n) {
|
||||||
|
this.n = n;
|
||||||
|
final Thread t = new Thread(this);
|
||||||
|
t.start();
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return getClass().getSimpleName() + ": max=" + maxArrayLength
|
||||||
|
+ ", count=" + count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class NewByteArrayWithLimit extends ByteArrayManager {
|
||||||
|
private final int maxCount;
|
||||||
|
private int count = 0;
|
||||||
|
|
||||||
|
NewByteArrayWithLimit(int maxCount) {
|
||||||
|
this.maxCount = maxCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized byte[] newByteArray(int size) throws InterruptedException {
|
||||||
|
for(; count >= maxCount; ) {
|
||||||
|
wait();
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
return new byte[size];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized int release(byte[] array) {
|
||||||
|
if (count == maxCount) {
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
count--;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
((Log4JLogger)LogFactory.getLog(ByteArrayManager.class)
|
||||||
|
).getLogger().setLevel(Level.OFF);
|
||||||
|
|
||||||
|
final int arrayLength = 64 * 1024; //64k
|
||||||
|
final int nThreads = 512;
|
||||||
|
final int nAllocations = 1 << 15;
|
||||||
|
final int maxArrays = 1 << 10;
|
||||||
|
final int nTrials = 5;
|
||||||
|
|
||||||
|
System.out.println("arrayLength=" + arrayLength
|
||||||
|
+ ", nThreads=" + nThreads
|
||||||
|
+ ", nAllocations=" + nAllocations
|
||||||
|
+ ", maxArrays=" + maxArrays);
|
||||||
|
|
||||||
|
final Random ran = DFSUtil.getRandom();
|
||||||
|
final ByteArrayManager[] impls = {
|
||||||
|
new ByteArrayManager.NewByteArrayWithoutLimit(),
|
||||||
|
new NewByteArrayWithLimit(maxArrays),
|
||||||
|
new ByteArrayManager.Impl(new ByteArrayManager.Conf(
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT,
|
||||||
|
maxArrays,
|
||||||
|
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT))
|
||||||
|
};
|
||||||
|
final double[] avg = new double[impls.length];
|
||||||
|
|
||||||
|
for(int i = 0; i < impls.length; i++) {
|
||||||
|
double duration = 0;
|
||||||
|
printf("%26s:", impls[i].getClass().getSimpleName());
|
||||||
|
for(int j = 0; j < nTrials; j++) {
|
||||||
|
final int[] sleepTime = new int[nAllocations];
|
||||||
|
for(int k = 0; k < sleepTime.length; k++) {
|
||||||
|
sleepTime[k] = ran.nextInt(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
final long elapsed = performanceTest(arrayLength, maxArrays, nThreads,
|
||||||
|
sleepTime, impls[i]);
|
||||||
|
duration += elapsed;
|
||||||
|
printf("%5d, ", elapsed);
|
||||||
|
}
|
||||||
|
avg[i] = duration/nTrials;
|
||||||
|
printf("avg=%6.3fs", avg[i]/1000);
|
||||||
|
for(int j = 0; j < i; j++) {
|
||||||
|
printf(" (%6.2f%%)", percentageDiff(avg[j], avg[i]));
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static double percentageDiff(double original, double newValue) {
|
||||||
|
return (newValue - original)/original*100;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void printf(String format, Object... args) {
|
||||||
|
System.out.printf(format, args);
|
||||||
|
System.out.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
static long performanceTest(final int arrayLength, final int maxArrays,
|
||||||
|
final int nThreads, final int[] sleepTimeMSs, final ByteArrayManager impl)
|
||||||
|
throws Exception {
|
||||||
|
final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
|
||||||
|
final List<Future<Void>> futures = new ArrayList<Future<Void>>(sleepTimeMSs.length);
|
||||||
|
final long startTime = Time.monotonicNow();
|
||||||
|
|
||||||
|
for(int i = 0; i < sleepTimeMSs.length; i++) {
|
||||||
|
final long sleepTime = sleepTimeMSs[i];
|
||||||
|
futures.add(pool.submit(new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
byte[] array = impl.newByteArray(arrayLength);
|
||||||
|
sleepMs(sleepTime);
|
||||||
|
impl.release(array);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
for(Future<Void> f : futures) {
|
||||||
|
f.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
final long endTime = Time.monotonicNow();
|
||||||
|
pool.shutdown();
|
||||||
|
return endTime - startTime;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue