HDFS-7276. Limit the number of byte arrays used by DFSOutputStream and provide a mechanism for recycling arrays.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
This commit is contained in:
Tsz-Wo Nicholas Sze 2014-11-01 11:22:13 -07:00
parent 5d4d11400e
commit a4dca48676
7 changed files with 1166 additions and 32 deletions

View File

@ -300,6 +300,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7313. Support optional configuration of AES cipher suite on
DataTransferProtocol. (cnauroth)
HDFS-7276. Limit the number of byte arrays used by DFSOutputStream and
provide a mechanism for recycling arrays. (szetszwo)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import com.google.common.annotations.VisibleForTesting;
@ -84,6 +85,9 @@ public class ClientContext {
*/
private volatile boolean disableLegacyBlockReaderLocal = false;
/** Creating byte[] for {@link DFSOutputStream}. */
private final ByteArrayManager byteArrayManager;
/**
* Whether or not we complained about a DFSClient fetching a CacheContext that
* didn't match its config values yet.
@ -105,6 +109,8 @@ public class ClientContext {
new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
this.domainSocketFactory = new DomainSocketFactory(conf);
this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf);
}
public static String confAsString(Conf conf) {
@ -204,4 +210,8 @@ public class ClientContext {
public DomainSocketFactory getDomainSocketFactory() {
return domainSocketFactory;
}
public ByteArrayManager getByteArrayManager() {
return byteArrayManager;
}
}

View File

@ -56,8 +56,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
@ -193,6 +191,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
@ -288,6 +287,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final ChecksumOpt defaultChecksumOpt;
final int writePacketSize;
final int writeMaxPackets;
final ByteArrayManager.Conf writeByteArrayManagerConf;
final int socketTimeout;
final int socketCacheCapacity;
final long socketCacheExpiry;
@ -358,8 +358,30 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
/** dfs.write.packet.size is an internal config variable */
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
writeMaxPackets = conf.getInt(DFS_CLIENT_WRITE_MAX_PACKETS_KEY,
DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT);
writeMaxPackets = conf.getInt(
DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT);
final boolean byteArrayManagerEnabled = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT);
if (!byteArrayManagerEnabled) {
writeByteArrayManagerConf = null;
} else {
final int countThreshold = conf.getInt(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT);
final int countLimit = conf.getInt(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT);
final long countResetTimePeriodMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
writeByteArrayManagerConf = new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs);
}
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
DFS_BLOCK_SIZE_DEFAULT);
defaultReplication = (short) conf.getInt(

View File

@ -50,10 +50,27 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,...
public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
public static final String DFS_CLIENT_WRITE_MAX_PACKETS_KEY = "dfs.client.write.max-packets";
public static final int DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT = 80;
public static final String DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY = "dfs.client.write.max-packets-in-flight";
public static final int DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT = 80;
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY
= "dfs.client.write.byte-array-manager.enabled";
public static final boolean DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT
= false;
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY
= "dfs.client.write.byte-array-manager.count-threshold";
public static final int DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT
= 128;
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY
= "dfs.client.write.byte-array-manager.count-limit";
public static final int DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT
= 2048;
public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY
= "dfs.client.write.byte-array-manager.count-reset-time-period-ms";
public static final long DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT
= 10L * 1000;
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";

View File

@ -41,10 +41,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSOutputSummer;
@ -55,7 +54,6 @@ import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -83,6 +81,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
@ -99,6 +98,7 @@ import org.htrace.Trace;
import org.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@ -143,6 +143,7 @@ public class DFSOutputStream extends FSOutputSummer
CryptoProtocolVersion.supported();
private final DFSClient dfsClient;
private final ByteArrayManager byteArrayManager;
private Socket s;
// closed is accessed by different threads under different locks.
private volatile boolean closed = false;
@ -181,6 +182,33 @@ public class DFSOutputStream extends FSOutputSummer
private static final BlockStoragePolicySuite blockStoragePolicySuite =
BlockStoragePolicySuite.createDefaultSuite();
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
private Packet createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
long seqno) throws InterruptedIOException {
final byte[] buf;
final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
try {
buf = byteArrayManager.newByteArray(bufferSize);
} catch (InterruptedException ie) {
final InterruptedIOException iioe = new InterruptedIOException(
"seqno=" + seqno);
iioe.initCause(ie);
throw iioe;
}
return new Packet(buf, chunksPerPkt, offsetInBlock, seqno, getChecksumSize());
}
/**
* For heartbeat packets, create buffer directly by new byte[]
* since heartbeats should not be blocked.
*/
private Packet createHeartbeatPacket() throws InterruptedIOException {
final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
return new Packet(buf, 0, 0, Packet.HEART_BEAT_SEQNO, getChecksumSize());
}
private static class Packet {
private static final long HEART_BEAT_SEQNO = -1L;
long seqno; // sequencenumber of buffer in block
@ -188,7 +216,7 @@ public class DFSOutputStream extends FSOutputSummer
boolean syncBlock; // this packet forces the current block to disk
int numChunks; // number of chunks currently in packet
final int maxChunks; // max chunks in packet
final byte[] buf;
private byte[] buf;
private boolean lastPacketInBlock; // is this the last packet in block?
/**
@ -210,13 +238,6 @@ public class DFSOutputStream extends FSOutputSummer
final int dataStart;
int dataPos;
/**
* Create a heartbeat packet.
*/
Packet(int checksumSize) {
this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize);
}
/**
* Create a new packet.
*
@ -225,15 +246,15 @@ public class DFSOutputStream extends FSOutputSummer
* @param chunksPerPkt maximum number of chunks per packet.
* @param offsetInBlock offset in bytes into the HDFS block.
*/
Packet(int pktSize, int chunksPerPkt, long offsetInBlock,
long seqno, int checksumSize) {
private Packet(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
int checksumSize) {
this.lastPacketInBlock = false;
this.numChunks = 0;
this.offsetInBlock = offsetInBlock;
this.seqno = seqno;
buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
this.buf = buf;
checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
checksumPos = checksumStart;
dataStart = checksumStart + (chunksPerPkt * checksumSize);
@ -304,6 +325,11 @@ public class DFSOutputStream extends FSOutputSummer
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
}
}
private void releaseBuffer(ByteArrayManager bam) {
bam.release(buf);
buf = null;
}
// get the packet's last byte's offset in the block
long getLastByteOffsetBlock() {
@ -547,7 +573,7 @@ public class DFSOutputStream extends FSOutputSummer
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one = new Packet(getChecksumSize()); // heartbeat packet
one = createHeartbeatPacket();
} else {
one = dataQueue.getFirst(); // regular data packet
}
@ -907,6 +933,8 @@ public class DFSOutputStream extends FSOutputSummer
lastAckedSeqno = seqno;
ackQueue.removeFirst();
dataQueue.notifyAll();
one.releaseBuffer(byteArrayManager);
}
} catch (Exception e) {
if (!responderClosed) {
@ -1657,6 +1685,7 @@ public class DFSOutputStream extends FSOutputSummer
this.dfsclientSlowLogThresholdMs =
dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
}
/** Construct a new output stream for creating a file. */
@ -1836,8 +1865,8 @@ public class DFSOutputStream extends FSOutputSummer
}
if (currentPacket == null) {
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, getChecksumSize());
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.seqno +
@ -1884,8 +1913,7 @@ public class DFSOutputStream extends FSOutputSummer
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
currentPacket = new Packet(0, 0, bytesCurBlock,
currentSeqno++, getChecksumSize());
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
@ -1978,8 +2006,8 @@ public class DFSOutputStream extends FSOutputSummer
// Nothing to send right now,
// but sync was requested.
// Send an empty packet
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, getChecksumSize());
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
}
} else {
if (isSync && bytesCurBlock > 0) {
@ -1987,8 +2015,8 @@ public class DFSOutputStream extends FSOutputSummer
// and the block was partially written,
// and sync was requested.
// So send an empty sync packet.
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, getChecksumSize());
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
} else {
// just discard the current packet since it is already been sent.
currentPacket = null;
@ -2192,7 +2220,7 @@ public class DFSOutputStream extends FSOutputSummer
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
currentPacket = new Packet(0, 0, bytesCurBlock, currentSeqno++, getChecksumSize());
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
}

View File

@ -0,0 +1,419 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
/**
* Manage byte array creation and release.
*/
@InterfaceAudience.Private
public abstract class ByteArrayManager {
static final Log LOG = LogFactory.getLog(ByteArrayManager.class);
private static final ThreadLocal<StringBuilder> debugMessage = new ThreadLocal<StringBuilder>() {
protected StringBuilder initialValue() {
return new StringBuilder();
}
};
private static void logDebugMessage() {
final StringBuilder b = debugMessage.get();
LOG.debug(b);
b.setLength(0);
}
static final int MIN_ARRAY_LENGTH = 32;
static final byte[] EMPTY_BYTE_ARRAY = {};
/**
* @return the least power of two greater than or equal to n, i.e. return
* the least integer x with x >= n and x a power of two.
*
* @throws HadoopIllegalArgumentException
* if n <= 0.
*/
public static int leastPowerOfTwo(final int n) {
if (n <= 0) {
throw new HadoopIllegalArgumentException("n = " + n + " <= 0");
}
final int highestOne = Integer.highestOneBit(n);
if (highestOne == n) {
return n; // n is a power of two.
}
final int roundUp = highestOne << 1;
if (roundUp < 0) {
final long overflow = ((long) highestOne) << 1;
throw new ArithmeticException(
"Overflow: for n = " + n + ", the least power of two (the least"
+ " integer x with x >= n and x a power of two) = "
+ overflow + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE);
}
return roundUp;
}
/**
* A counter with a time stamp so that it is reset automatically
* if there is no increment for the time period.
*/
static class Counter {
private final long countResetTimePeriodMs;
private long count = 0L;
private long timestamp = Time.monotonicNow();
Counter(long countResetTimePeriodMs) {
this.countResetTimePeriodMs = countResetTimePeriodMs;
}
synchronized long getCount() {
return count;
}
/**
* Increment the counter, and reset it if there is no increment
* for acertain time period.
*
* @return the new count.
*/
synchronized long increment() {
final long now = Time.monotonicNow();
if (now - timestamp > countResetTimePeriodMs) {
count = 0; // reset the counter
}
timestamp = now;
return ++count;
}
}
/** A map from integers to counters. */
static class CounterMap {
/** @see ByteArrayManager.Conf#countResetTimePeriodMs */
private final long countResetTimePeriodMs;
private final Map<Integer, Counter> map = new HashMap<Integer, Counter>();
private CounterMap(long countResetTimePeriodMs) {
this.countResetTimePeriodMs = countResetTimePeriodMs;
}
/**
* @return the counter for the given key;
* and create a new counter if it does not exist.
*/
synchronized Counter get(final Integer key, final boolean createIfNotExist) {
Counter count = map.get(key);
if (count == null && createIfNotExist) {
count = new Counter(countResetTimePeriodMs);
map.put(key, count);
}
return count;
}
synchronized void clear() {
map.clear();
}
}
/** Manage byte arrays with the same fixed length. */
static class FixedLengthManager {
private final int byteArrayLength;
private final int maxAllocated;
private final Queue<byte[]> freeQueue = new LinkedList<byte[]>();
private int numAllocated = 0;
FixedLengthManager(int arrayLength, int maxAllocated) {
this.byteArrayLength = arrayLength;
this.maxAllocated = maxAllocated;
}
/**
* Allocate a byte array.
*
* If the number of allocated arrays >= maximum, the current thread is
* blocked until the number of allocated arrays drops to below the maximum.
*
* The byte array allocated by this method must be returned for recycling
* via the {@link FixedLengthManager#recycle(byte[])} method.
*/
synchronized byte[] allocate() throws InterruptedException {
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", ").append(this);
}
for(; numAllocated >= maxAllocated;) {
if (LOG.isDebugEnabled()) {
debugMessage.get().append(": wait ...");
logDebugMessage();
}
wait();
if (LOG.isDebugEnabled()) {
debugMessage.get().append("wake up: ").append(this);
}
}
numAllocated++;
final byte[] array = freeQueue.poll();
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", recycled? ").append(array != null);
}
return array != null? array : new byte[byteArrayLength];
}
/**
* Recycle the given byte array, which must have the same length as the
* array length managed by this object.
*
* The byte array may or may not be allocated
* by the {@link FixedLengthManager#allocate()} method.
*/
synchronized int recycle(byte[] array) {
Preconditions.checkNotNull(array);
Preconditions.checkArgument(array.length == byteArrayLength);
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", ").append(this);
}
if (numAllocated == maxAllocated) {
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", notifyAll");
}
notifyAll();
}
numAllocated--;
if (numAllocated < 0) {
// it is possible to drop below 0 since
// some byte arrays may not be created by the allocate() method.
numAllocated = 0;
}
if (freeQueue.size() < maxAllocated - numAllocated) {
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", freeQueue.offer");
}
freeQueue.offer(array);
}
return freeQueue.size();
}
@Override
public synchronized String toString() {
return "[" + byteArrayLength + ": " + numAllocated + "/"
+ maxAllocated + ", free=" + freeQueue.size() + "]";
}
}
/** A map from array lengths to byte array managers. */
static class ManagerMap {
private final int countLimit;
private final Map<Integer, FixedLengthManager> map = new HashMap<Integer, FixedLengthManager>();
ManagerMap(int countLimit) {
this.countLimit = countLimit;
}
/** @return the manager for the given array length. */
synchronized FixedLengthManager get(final Integer arrayLength,
final boolean createIfNotExist) {
FixedLengthManager manager = map.get(arrayLength);
if (manager == null && createIfNotExist) {
manager = new FixedLengthManager(arrayLength, countLimit);
map.put(arrayLength, manager);
}
return manager;
}
synchronized void clear() {
map.clear();
}
}
public static class Conf {
/**
* The count threshold for each array length so that a manager is created
* only after the allocation count exceeds the threshold.
*/
private final int countThreshold;
/**
* The maximum number of arrays allowed for each array length.
*/
private final int countLimit;
/**
* The time period in milliseconds that the allocation count for each array
* length is reset to zero if there is no increment.
*/
private final long countResetTimePeriodMs;
public Conf(int countThreshold, int countLimit, long countResetTimePeriodMs) {
this.countThreshold = countThreshold;
this.countLimit = countLimit;
this.countResetTimePeriodMs = countResetTimePeriodMs;
}
}
/**
* Create a byte array for the given length, where the length of
* the returned array is larger than or equal to the given length.
*
* The current thread may be blocked if some resource is unavailable.
*
* The byte array created by this method must be released
* via the {@link ByteArrayManager#release(byte[])} method.
*
* @return a byte array with length larger than or equal to the given length.
*/
public abstract byte[] newByteArray(int size) throws InterruptedException;
/**
* Release the given byte array.
*
* The byte array may or may not be created
* by the {@link ByteArrayManager#newByteArray(int)} method.
*
* @return the number of free array.
*/
public abstract int release(byte[] array);
public static ByteArrayManager newInstance(Conf conf) {
return conf == null? new NewByteArrayWithoutLimit(): new Impl(conf);
}
/**
* A dummy implementation which simply calls new byte[].
*/
static class NewByteArrayWithoutLimit extends ByteArrayManager {
@Override
public byte[] newByteArray(int size) throws InterruptedException {
return new byte[size];
}
@Override
public int release(byte[] array) {
return 0;
}
}
/**
* Manage byte array allocation and provide a mechanism for recycling the byte
* array objects.
*/
static class Impl extends ByteArrayManager {
private final Conf conf;
private final CounterMap counters;
private final ManagerMap managers;
Impl(Conf conf) {
this.conf = conf;
this.counters = new CounterMap(conf.countResetTimePeriodMs);
this.managers = new ManagerMap(conf.countLimit);
}
/**
* Allocate a byte array, where the length of the allocated array
* is the least power of two of the given length
* unless the given length is less than {@link #MIN_ARRAY_LENGTH}.
* In such case, the returned array length is equal to {@link #MIN_ARRAY_LENGTH}.
*
* If the number of allocated arrays exceeds the capacity,
* the current thread is blocked until
* the number of allocated arrays drops to below the capacity.
*
* The byte array allocated by this method must be returned for recycling
* via the {@link ByteArrayManager#recycle(byte[])} method.
*
* @return a byte array with length larger than or equal to the given length.
*/
@Override
public byte[] newByteArray(final int arrayLength) throws InterruptedException {
if (LOG.isDebugEnabled()) {
debugMessage.get().append("allocate(").append(arrayLength).append(")");
}
final byte[] array;
if (arrayLength == 0) {
array = EMPTY_BYTE_ARRAY;
} else {
final int powerOfTwo = arrayLength <= MIN_ARRAY_LENGTH?
MIN_ARRAY_LENGTH: leastPowerOfTwo(arrayLength);
final long count = counters.get(powerOfTwo, true).increment();
final boolean aboveThreshold = count > conf.countThreshold;
// create a new manager only if the count is above threshold.
final FixedLengthManager manager = managers.get(powerOfTwo, aboveThreshold);
if (LOG.isDebugEnabled()) {
debugMessage.get().append(": count=").append(count)
.append(aboveThreshold? ", aboveThreshold": ", belowThreshold");
}
array = manager != null? manager.allocate(): new byte[powerOfTwo];
}
if (LOG.isDebugEnabled()) {
logDebugMessage();
}
return array;
}
/**
* Recycle the given byte array.
*
* The byte array may or may not be allocated
* by the {@link ByteArrayManager#allocate(int)} method.
*/
@Override
public int release(final byte[] array) {
Preconditions.checkNotNull(array);
if (LOG.isDebugEnabled()) {
debugMessage.get().append("recycle: array.length=").append(array.length);
}
final int freeQueueSize;
if (array.length == 0) {
freeQueueSize = -1;
} else {
final FixedLengthManager manager = managers.get(array.length, false);
freeQueueSize = manager == null? -1: manager.recycle(array);
}
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", freeQueueSize=").append(freeQueueSize);
logDebugMessage();
}
return freeQueueSize;
}
CounterMap getCounters() {
return counters;
}
ManagerMap getManagers() {
return managers;
}
}
}

View File

@ -0,0 +1,635 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.util.ByteArrayManager.Counter;
import org.apache.hadoop.hdfs.util.ByteArrayManager.CounterMap;
import org.apache.hadoop.hdfs.util.ByteArrayManager.FixedLengthManager;
import org.apache.hadoop.hdfs.util.ByteArrayManager.ManagerMap;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
/**
* Test {@link ByteArrayManager}.
*/
public class TestByteArrayManager {
static {
((Log4JLogger)LogFactory.getLog(ByteArrayManager.class)
).getLogger().setLevel(Level.ALL);
}
static final Log LOG = LogFactory.getLog(TestByteArrayManager.class);
private static final Comparator<Future<Integer>> CMP = new Comparator<Future<Integer>>() {
@Override
public int compare(Future<Integer> left, Future<Integer> right) {
try {
return left.get().intValue() - right.get().intValue();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
@Test
public void testCounter() throws Exception {
final long countResetTimePeriodMs = 200L;
final Counter c = new Counter(countResetTimePeriodMs);
final int n = DFSUtil.getRandom().nextInt(512) + 512;
final List<Future<Integer>> futures = new ArrayList<Future<Integer>>(n);
final ExecutorService pool = Executors.newFixedThreadPool(32);
try {
// increment
for(int i = 0; i < n; i++) {
futures.add(pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return (int)c.increment();
}
}));
}
// sort and wait for the futures
Collections.sort(futures, CMP);
} finally {
pool.shutdown();
}
// check futures
Assert.assertEquals(n, futures.size());
for(int i = 0; i < n; i++) {
Assert.assertEquals(i + 1, futures.get(i).get().intValue());
}
Assert.assertEquals(n, c.getCount());
// test auto-reset
Thread.sleep(countResetTimePeriodMs + 100);
Assert.assertEquals(1, c.increment());
}
@Test
public void testAllocateRecycle() throws Exception {
final int countThreshold = 4;
final int countLimit = 8;
final long countResetTimePeriodMs = 200L;
final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs));
final CounterMap counters = bam.getCounters();
final ManagerMap managers = bam.getManagers();
final int[] uncommonArrays = {0, 1, 2, 4, 8, 16, 32, 64};
final int arrayLength = 1024;
final Allocator allocator = new Allocator(bam);
final Recycler recycler = new Recycler(bam);
try {
{ // allocate within threshold
for(int i = 0; i < countThreshold; i++) {
allocator.submit(arrayLength);
}
waitForAll(allocator.futures);
Assert.assertEquals(countThreshold,
counters.get(arrayLength, false).getCount());
Assert.assertNull(managers.get(arrayLength, false));
for(int n : uncommonArrays) {
Assert.assertNull(counters.get(n, false));
Assert.assertNull(managers.get(n, false));
}
}
{ // recycle half of the arrays
for(int i = 0; i < countThreshold/2; i++) {
recycler.submit(removeLast(allocator.futures));
}
for(Future<Integer> f : recycler.furtures) {
Assert.assertEquals(-1, f.get().intValue());
}
recycler.furtures.clear();
}
{ // allocate one more
allocator.submit(arrayLength).get();
Assert.assertEquals(countThreshold + 1, counters.get(arrayLength, false).getCount());
Assert.assertNotNull(managers.get(arrayLength, false));
}
{ // recycle the remaining arrays
final int n = allocator.recycleAll(recycler);
recycler.verify(n);
}
{
// allocate until the maximum.
for(int i = 0; i < countLimit; i++) {
allocator.submit(arrayLength);
}
waitForAll(allocator.futures);
// allocate one more should be blocked
final AllocatorThread t = new AllocatorThread(arrayLength, bam);
t.start();
// check if the thread is waiting, timed wait or runnable.
for(int i = 0; i < 5; i++) {
Thread.sleep(100);
final Thread.State threadState = t.getState();
if (threadState != Thread.State.RUNNABLE
&& threadState != Thread.State.WAITING
&& threadState != Thread.State.TIMED_WAITING) {
Assert.fail("threadState = " + threadState);
}
}
// recycle an array
recycler.submit(removeLast(allocator.futures));
Assert.assertEquals(1, removeLast(recycler.furtures).intValue());
// check if the thread is unblocked
Thread.sleep(100);
Assert.assertEquals(Thread.State.TERMINATED, t.getState());
// recycle the remaining, the recycle should be full.
Assert.assertEquals(countLimit-1, allocator.recycleAll(recycler));
recycler.submit(t.array);
recycler.verify(countLimit);
// recycle one more; it should not increase the free queue size
Assert.assertEquals(countLimit, bam.release(new byte[arrayLength]));
}
} finally {
allocator.pool.shutdown();
recycler.pool.shutdown();
}
}
static <T> T removeLast(List<Future<T>> furtures) throws Exception {
return remove(furtures, furtures.size() - 1);
}
static <T> T remove(List<Future<T>> furtures, int i) throws Exception {
return furtures.isEmpty()? null: furtures.remove(i).get();
}
static <T> void waitForAll(List<Future<T>> furtures) throws Exception {
for(Future<T> f : furtures) {
f.get();
}
}
static class AllocatorThread extends Thread {
private final ByteArrayManager bam;
private final int arrayLength;
private byte[] array;
AllocatorThread(int arrayLength, ByteArrayManager bam) {
this.bam = bam;
this.arrayLength = arrayLength;
}
@Override
public void run() {
try {
array = bam.newByteArray(arrayLength);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Allocator {
private final ByteArrayManager bam;
final ExecutorService pool = Executors.newFixedThreadPool(8);
final List<Future<byte[]>> futures = new LinkedList<Future<byte[]>>();
Allocator(ByteArrayManager bam) {
this.bam = bam;
}
Future<byte[]> submit(final int arrayLength) {
final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
final byte[] array = bam.newByteArray(arrayLength);
Assert.assertEquals(arrayLength, array.length);
return array;
}
});
futures.add(f);
return f;
}
int recycleAll(Recycler recycler) throws Exception {
final int n = futures.size();
for(Future<byte[]> f : futures) {
recycler.submit(f.get());
}
futures.clear();
return n;
}
}
static class Recycler {
private final ByteArrayManager bam;
final ExecutorService pool = Executors.newFixedThreadPool(8);
final List<Future<Integer>> furtures = new LinkedList<Future<Integer>>();
Recycler(ByteArrayManager bam) {
this.bam = bam;
}
Future<Integer> submit(final byte[] array) {
final Future<Integer> f = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return bam.release(array);
}
});
furtures.add(f);
return f;
}
void verify(final int expectedSize) throws Exception {
Assert.assertEquals(expectedSize, furtures.size());
Collections.sort(furtures, CMP);
for(int i = 0; i < furtures.size(); i++) {
Assert.assertEquals(i+1, furtures.get(i).get().intValue());
}
furtures.clear();
}
}
@Test
public void testByteArrayManager() throws Exception {
final int countThreshold = 32;
final int countLimit = 64;
final long countResetTimePeriodMs = 1000L;
final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs));
final CounterMap counters = bam.getCounters();
final ManagerMap managers = bam.getManagers();
final ExecutorService pool = Executors.newFixedThreadPool(128);
final Runner[] runners = new Runner[Runner.NUM_RUNNERS];
final Thread[] threads = new Thread[runners.length];
final int num = 1 << 8;
for(int i = 0; i < runners.length; i++) {
runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam);
threads[i] = runners[i].start(num);
}
final Thread randomRecycler = new Thread() {
@Override
public void run() {
LOG.info("randomRecycler start");
for(int i = 0; shouldRun(); i++) {
final int j = DFSUtil.getRandom().nextInt(runners.length);
try {
runners[j].recycle();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(this + " has " + e);
}
if ((i & 0xFF) == 0) {
sleepMs(100);
}
}
LOG.info("randomRecycler done");
}
boolean shouldRun() {
for(int i = 0; i < runners.length; i++) {
if (threads[i].isAlive()) {
return true;
}
if (!runners[i].isEmpty()) {
return true;
}
}
return false;
}
};
randomRecycler.start();
randomRecycler.join();
Assert.assertNull(counters.get(0, false));
for(int i = 1; i < runners.length; i++) {
if (!runners[i].assertionErrors.isEmpty()) {
for(AssertionError e : runners[i].assertionErrors) {
LOG.error("AssertionError " + i, e);
}
Assert.fail(runners[i].assertionErrors.size() + " AssertionError(s)");
}
final int arrayLength = Runner.index2arrayLength(i);
final boolean exceedCountThreshold = counters.get(arrayLength, false).getCount() > countThreshold;
final FixedLengthManager m = managers.get(arrayLength, false);
if (exceedCountThreshold) {
Assert.assertNotNull(m);
} else {
Assert.assertNull(m);
}
}
}
static void sleepMs(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
Assert.fail("Sleep is interrupted: " + e);
}
}
static class Runner implements Runnable {
static final int NUM_RUNNERS = 4;
static int index2arrayLength(int index) {
return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1);
}
private final ByteArrayManager bam;
final int maxArrayLength;
final int countThreshold;
final int maxArrays;
final ExecutorService pool;
final List<Future<byte[]>> arrays = new ArrayList<Future<byte[]>>();
final AtomicInteger count = new AtomicInteger();
final int p;
private int n;
final List<AssertionError> assertionErrors = new ArrayList<AssertionError>();
Runner(int index, int countThreshold, int maxArrays,
ExecutorService pool, int p, ByteArrayManager bam) {
this.maxArrayLength = index2arrayLength(index);
this.countThreshold = countThreshold;
this.maxArrays = maxArrays;
this.pool = pool;
this.p = p;
this.bam = bam;
}
boolean isEmpty() {
synchronized (arrays) {
return arrays.isEmpty();
}
}
Future<byte[]> submitAllocate() {
count.incrementAndGet();
final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
final int lower = maxArrayLength == ByteArrayManager.MIN_ARRAY_LENGTH?
0: maxArrayLength >> 1;
final int arrayLength = DFSUtil.getRandom().nextInt(
maxArrayLength - lower) + lower + 1;
final byte[] array = bam.newByteArray(arrayLength);
try {
Assert.assertEquals("arrayLength=" + arrayLength + ", lower=" + lower,
maxArrayLength, array.length);
} catch(AssertionError e) {
assertionErrors.add(e);
}
return array;
}
});
synchronized (arrays) {
arrays.add(f);
}
return f;
}
byte[] removeFirst() throws Exception {
synchronized (arrays) {
return remove(arrays, 0);
}
}
void recycle() throws Exception {
final byte[] a = removeFirst();
if (a != null) {
recycle(a);
}
}
int recycle(final byte[] array) {
return bam.release(array);
}
Future<Integer> submitRecycle(final byte[] array) {
count.decrementAndGet();
final Future<Integer> f = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return recycle(array);
}
});
return f;
}
@Override
public void run() {
for(int i = 0; i < n; i++) {
final boolean isAllocate = DFSUtil.getRandom().nextInt(NUM_RUNNERS) < p;
if (isAllocate) {
submitAllocate();
} else {
try {
final byte[] a = removeFirst();
if (a != null) {
submitRecycle(a);
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail(this + " has " + e);
}
}
if ((i & 0xFF) == 0) {
sleepMs(100);
}
}
}
Thread start(int n) {
this.n = n;
final Thread t = new Thread(this);
t.start();
return t;
}
@Override
public String toString() {
return getClass().getSimpleName() + ": max=" + maxArrayLength
+ ", count=" + count;
}
}
static class NewByteArrayWithLimit extends ByteArrayManager {
private final int maxCount;
private int count = 0;
NewByteArrayWithLimit(int maxCount) {
this.maxCount = maxCount;
}
@Override
public synchronized byte[] newByteArray(int size) throws InterruptedException {
for(; count >= maxCount; ) {
wait();
}
count++;
return new byte[size];
}
@Override
public synchronized int release(byte[] array) {
if (count == maxCount) {
notifyAll();
}
count--;
return 0;
}
}
public static void main(String[] args) throws Exception {
((Log4JLogger)LogFactory.getLog(ByteArrayManager.class)
).getLogger().setLevel(Level.OFF);
final int arrayLength = 64 * 1024; //64k
final int nThreads = 512;
final int nAllocations = 1 << 15;
final int maxArrays = 1 << 10;
final int nTrials = 5;
System.out.println("arrayLength=" + arrayLength
+ ", nThreads=" + nThreads
+ ", nAllocations=" + nAllocations
+ ", maxArrays=" + maxArrays);
final Random ran = DFSUtil.getRandom();
final ByteArrayManager[] impls = {
new ByteArrayManager.NewByteArrayWithoutLimit(),
new NewByteArrayWithLimit(maxArrays),
new ByteArrayManager.Impl(new ByteArrayManager.Conf(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT,
maxArrays,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT))
};
final double[] avg = new double[impls.length];
for(int i = 0; i < impls.length; i++) {
double duration = 0;
printf("%26s:", impls[i].getClass().getSimpleName());
for(int j = 0; j < nTrials; j++) {
final int[] sleepTime = new int[nAllocations];
for(int k = 0; k < sleepTime.length; k++) {
sleepTime[k] = ran.nextInt(100);
}
final long elapsed = performanceTest(arrayLength, maxArrays, nThreads,
sleepTime, impls[i]);
duration += elapsed;
printf("%5d, ", elapsed);
}
avg[i] = duration/nTrials;
printf("avg=%6.3fs", avg[i]/1000);
for(int j = 0; j < i; j++) {
printf(" (%6.2f%%)", percentageDiff(avg[j], avg[i]));
}
printf("\n");
}
}
static double percentageDiff(double original, double newValue) {
return (newValue - original)/original*100;
}
static void printf(String format, Object... args) {
System.out.printf(format, args);
System.out.flush();
}
static long performanceTest(final int arrayLength, final int maxArrays,
final int nThreads, final int[] sleepTimeMSs, final ByteArrayManager impl)
throws Exception {
final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
final List<Future<Void>> futures = new ArrayList<Future<Void>>(sleepTimeMSs.length);
final long startTime = Time.monotonicNow();
for(int i = 0; i < sleepTimeMSs.length; i++) {
final long sleepTime = sleepTimeMSs[i];
futures.add(pool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
byte[] array = impl.newByteArray(arrayLength);
sleepMs(sleepTime);
impl.release(array);
return null;
}
}));
}
for(Future<Void> f : futures) {
f.get();
}
final long endTime = Time.monotonicNow();
pool.shutdown();
return endTime - startTime;
}
}