From bac064de6298370a96956abfae419aa9ba70983b Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 1 Nov 2014 11:22:13 -0700 Subject: [PATCH] HDFS-7276. Limit the number of byte arrays used by DFSOutputStream and provide a mechanism for recycling arrays. Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/ClientContext.java | 10 + .../org/apache/hadoop/hdfs/DFSClient.java | 30 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 21 +- .../apache/hadoop/hdfs/DFSOutputStream.java | 80 ++- .../hadoop/hdfs/util/ByteArrayManager.java | 419 ++++++++++++ .../hdfs/util/TestByteArrayManager.java | 635 ++++++++++++++++++ 7 files changed, 1166 insertions(+), 32 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a5ab23978cd..7433dbe38bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -434,6 +434,9 @@ Release 2.6.0 - UNRELEASED HDFS-7313. Support optional configuration of AES cipher suite on DataTransferProtocol. (cnauroth) + HDFS-7276. Limit the number of byte arrays used by DFSOutputStream and + provide a mechanism for recycling arrays. (szetszwo) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index f55aff5fd3a..e106fca55d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; +import org.apache.hadoop.hdfs.util.ByteArrayManager; import com.google.common.annotations.VisibleForTesting; @@ -84,6 +85,9 @@ public class ClientContext { */ private volatile boolean disableLegacyBlockReaderLocal = false; + /** Creating byte[] for {@link DFSOutputStream}. */ + private final ByteArrayManager byteArrayManager; + /** * Whether or not we complained about a DFSClient fetching a CacheContext that * didn't match its config values yet. @@ -105,6 +109,8 @@ public class ClientContext { new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry); this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal; this.domainSocketFactory = new DomainSocketFactory(conf); + + this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf); } public static String confAsString(Conf conf) { @@ -204,4 +210,8 @@ public class ClientContext { public DomainSocketFactory getDomainSocketFactory() { return domainSocketFactory; } + + public ByteArrayManager getByteArrayManager() { + return byteArrayManager; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 902fc1c4f5b..4d6de34bd99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -56,8 +56,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; @@ -195,6 +193,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; @@ -301,6 +300,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, final ChecksumOpt defaultChecksumOpt; final int writePacketSize; final int writeMaxPackets; + final ByteArrayManager.Conf writeByteArrayManagerConf; final int socketTimeout; final int socketCacheCapacity; final long socketCacheExpiry; @@ -371,8 +371,30 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, /** dfs.write.packet.size is an internal config variable */ writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); - writeMaxPackets = conf.getInt(DFS_CLIENT_WRITE_MAX_PACKETS_KEY, - DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT); + writeMaxPackets = conf.getInt( + DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT); + + final boolean byteArrayManagerEnabled = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT); + if (!byteArrayManagerEnabled) { + writeByteArrayManagerConf = null; + } else { + final int countThreshold = conf.getInt( + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT); + final int countLimit = conf.getInt( + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT); + final long countResetTimePeriodMs = conf.getLong( + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT); + writeByteArrayManagerConf = new ByteArrayManager.Conf( + countThreshold, countLimit, countResetTimePeriodMs); + } + + defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT); defaultReplication = (short) conf.getInt( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 4982c7128aa..fd313bb96cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -50,10 +50,27 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,... public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type"; public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C"; - public static final String DFS_CLIENT_WRITE_MAX_PACKETS_KEY = "dfs.client.write.max-packets"; - public static final int DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT = 80; + public static final String DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY = "dfs.client.write.max-packets-in-flight"; + public static final int DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT = 80; public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; + public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY + = "dfs.client.write.byte-array-manager.enabled"; + public static final boolean DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT + = false; + public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY + = "dfs.client.write.byte-array-manager.count-threshold"; + public static final int DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT + = 128; + public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY + = "dfs.client.write.byte-array-manager.count-limit"; + public static final int DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT + = 2048; + public static final String DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY + = "dfs.client.write.byte-array-manager.count-reset-time-period-ms"; + public static final long DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT + = 10L * 1000; + public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable"; public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true; public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index a83c854d035..6cbf27a7099 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -41,10 +41,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.base.Preconditions; - import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSOutputSummer; @@ -55,7 +54,6 @@ import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; -import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -83,6 +81,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; @@ -99,6 +98,7 @@ import org.htrace.Trace; import org.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -143,6 +143,7 @@ public class DFSOutputStream extends FSOutputSummer CryptoProtocolVersion.supported(); private final DFSClient dfsClient; + private final ByteArrayManager byteArrayManager; private Socket s; // closed is accessed by different threads under different locks. private volatile boolean closed = false; @@ -181,6 +182,33 @@ public class DFSOutputStream extends FSOutputSummer private static final BlockStoragePolicySuite blockStoragePolicySuite = BlockStoragePolicySuite.createDefaultSuite(); + /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ + private Packet createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, + long seqno) throws InterruptedIOException { + final byte[] buf; + final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize; + + try { + buf = byteArrayManager.newByteArray(bufferSize); + } catch (InterruptedException ie) { + final InterruptedIOException iioe = new InterruptedIOException( + "seqno=" + seqno); + iioe.initCause(ie); + throw iioe; + } + + return new Packet(buf, chunksPerPkt, offsetInBlock, seqno, getChecksumSize()); + } + + /** + * For heartbeat packets, create buffer directly by new byte[] + * since heartbeats should not be blocked. + */ + private Packet createHeartbeatPacket() throws InterruptedIOException { + final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN]; + return new Packet(buf, 0, 0, Packet.HEART_BEAT_SEQNO, getChecksumSize()); + } + private static class Packet { private static final long HEART_BEAT_SEQNO = -1L; long seqno; // sequencenumber of buffer in block @@ -188,7 +216,7 @@ public class DFSOutputStream extends FSOutputSummer boolean syncBlock; // this packet forces the current block to disk int numChunks; // number of chunks currently in packet final int maxChunks; // max chunks in packet - final byte[] buf; + private byte[] buf; private boolean lastPacketInBlock; // is this the last packet in block? /** @@ -210,13 +238,6 @@ public class DFSOutputStream extends FSOutputSummer final int dataStart; int dataPos; - /** - * Create a heartbeat packet. - */ - Packet(int checksumSize) { - this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize); - } - /** * Create a new packet. * @@ -225,15 +246,15 @@ public class DFSOutputStream extends FSOutputSummer * @param chunksPerPkt maximum number of chunks per packet. * @param offsetInBlock offset in bytes into the HDFS block. */ - Packet(int pktSize, int chunksPerPkt, long offsetInBlock, - long seqno, int checksumSize) { + private Packet(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, + int checksumSize) { this.lastPacketInBlock = false; this.numChunks = 0; this.offsetInBlock = offsetInBlock; this.seqno = seqno; - - buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize]; - + + this.buf = buf; + checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; checksumPos = checksumStart; dataStart = checksumStart + (chunksPerPkt * checksumSize); @@ -304,6 +325,11 @@ public class DFSOutputStream extends FSOutputSummer buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; } } + + private void releaseBuffer(ByteArrayManager bam) { + bam.release(buf); + buf = null; + } // get the packet's last byte's offset in the block long getLastByteOffsetBlock() { @@ -547,7 +573,7 @@ public class DFSOutputStream extends FSOutputSummer } // get packet to be sent. if (dataQueue.isEmpty()) { - one = new Packet(getChecksumSize()); // heartbeat packet + one = createHeartbeatPacket(); } else { one = dataQueue.getFirst(); // regular data packet } @@ -907,6 +933,8 @@ public class DFSOutputStream extends FSOutputSummer lastAckedSeqno = seqno; ackQueue.removeFirst(); dataQueue.notifyAll(); + + one.releaseBuffer(byteArrayManager); } } catch (Exception e) { if (!responderClosed) { @@ -1657,6 +1685,7 @@ public class DFSOutputStream extends FSOutputSummer this.dfsclientSlowLogThresholdMs = dfsClient.getConf().dfsclientSlowIoWarningThresholdMs; + this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); } /** Construct a new output stream for creating a file. */ @@ -1836,8 +1865,8 @@ public class DFSOutputStream extends FSOutputSummer } if (currentPacket == null) { - currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++, getChecksumSize()); + currentPacket = createPacket(packetSize, chunksPerPacket, + bytesCurBlock, currentSeqno++); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.seqno + @@ -1884,8 +1913,7 @@ public class DFSOutputStream extends FSOutputSummer // indicate the end of block and reset bytesCurBlock. // if (bytesCurBlock == blockSize) { - currentPacket = new Packet(0, 0, bytesCurBlock, - currentSeqno++, getChecksumSize()); + currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; waitAndQueueCurrentPacket(); @@ -1978,8 +2006,8 @@ public class DFSOutputStream extends FSOutputSummer // Nothing to send right now, // but sync was requested. // Send an empty packet - currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++, getChecksumSize()); + currentPacket = createPacket(packetSize, chunksPerPacket, + bytesCurBlock, currentSeqno++); } } else { if (isSync && bytesCurBlock > 0) { @@ -1987,8 +2015,8 @@ public class DFSOutputStream extends FSOutputSummer // and the block was partially written, // and sync was requested. // So send an empty sync packet. - currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++, getChecksumSize()); + currentPacket = createPacket(packetSize, chunksPerPacket, + bytesCurBlock, currentSeqno++); } else { // just discard the current packet since it is already been sent. currentPacket = null; @@ -2192,7 +2220,7 @@ public class DFSOutputStream extends FSOutputSummer if (bytesCurBlock != 0) { // send an empty packet to mark the end of the block - currentPacket = new Packet(0, 0, bytesCurBlock, currentSeqno++, getChecksumSize()); + currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java new file mode 100644 index 00000000000..4751e72800e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.Time; + +import com.google.common.base.Preconditions; + +/** + * Manage byte array creation and release. + */ +@InterfaceAudience.Private +public abstract class ByteArrayManager { + static final Log LOG = LogFactory.getLog(ByteArrayManager.class); + private static final ThreadLocal debugMessage = new ThreadLocal() { + protected StringBuilder initialValue() { + return new StringBuilder(); + } + }; + + private static void logDebugMessage() { + final StringBuilder b = debugMessage.get(); + LOG.debug(b); + b.setLength(0); + } + + static final int MIN_ARRAY_LENGTH = 32; + static final byte[] EMPTY_BYTE_ARRAY = {}; + + /** + * @return the least power of two greater than or equal to n, i.e. return + * the least integer x with x >= n and x a power of two. + * + * @throws HadoopIllegalArgumentException + * if n <= 0. + */ + public static int leastPowerOfTwo(final int n) { + if (n <= 0) { + throw new HadoopIllegalArgumentException("n = " + n + " <= 0"); + } + + final int highestOne = Integer.highestOneBit(n); + if (highestOne == n) { + return n; // n is a power of two. + } + final int roundUp = highestOne << 1; + if (roundUp < 0) { + final long overflow = ((long) highestOne) << 1; + throw new ArithmeticException( + "Overflow: for n = " + n + ", the least power of two (the least" + + " integer x with x >= n and x a power of two) = " + + overflow + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE); + } + return roundUp; + } + + /** + * A counter with a time stamp so that it is reset automatically + * if there is no increment for the time period. + */ + static class Counter { + private final long countResetTimePeriodMs; + private long count = 0L; + private long timestamp = Time.monotonicNow(); + + Counter(long countResetTimePeriodMs) { + this.countResetTimePeriodMs = countResetTimePeriodMs; + } + + synchronized long getCount() { + return count; + } + + /** + * Increment the counter, and reset it if there is no increment + * for acertain time period. + * + * @return the new count. + */ + synchronized long increment() { + final long now = Time.monotonicNow(); + if (now - timestamp > countResetTimePeriodMs) { + count = 0; // reset the counter + } + timestamp = now; + return ++count; + } + } + + /** A map from integers to counters. */ + static class CounterMap { + /** @see ByteArrayManager.Conf#countResetTimePeriodMs */ + private final long countResetTimePeriodMs; + private final Map map = new HashMap(); + + private CounterMap(long countResetTimePeriodMs) { + this.countResetTimePeriodMs = countResetTimePeriodMs; + } + + /** + * @return the counter for the given key; + * and create a new counter if it does not exist. + */ + synchronized Counter get(final Integer key, final boolean createIfNotExist) { + Counter count = map.get(key); + if (count == null && createIfNotExist) { + count = new Counter(countResetTimePeriodMs); + map.put(key, count); + } + return count; + } + + synchronized void clear() { + map.clear(); + } + } + + /** Manage byte arrays with the same fixed length. */ + static class FixedLengthManager { + private final int byteArrayLength; + private final int maxAllocated; + private final Queue freeQueue = new LinkedList(); + + private int numAllocated = 0; + + FixedLengthManager(int arrayLength, int maxAllocated) { + this.byteArrayLength = arrayLength; + this.maxAllocated = maxAllocated; + } + + /** + * Allocate a byte array. + * + * If the number of allocated arrays >= maximum, the current thread is + * blocked until the number of allocated arrays drops to below the maximum. + * + * The byte array allocated by this method must be returned for recycling + * via the {@link FixedLengthManager#recycle(byte[])} method. + */ + synchronized byte[] allocate() throws InterruptedException { + if (LOG.isDebugEnabled()) { + debugMessage.get().append(", ").append(this); + } + for(; numAllocated >= maxAllocated;) { + if (LOG.isDebugEnabled()) { + debugMessage.get().append(": wait ..."); + logDebugMessage(); + } + + wait(); + + if (LOG.isDebugEnabled()) { + debugMessage.get().append("wake up: ").append(this); + } + } + numAllocated++; + + final byte[] array = freeQueue.poll(); + if (LOG.isDebugEnabled()) { + debugMessage.get().append(", recycled? ").append(array != null); + } + return array != null? array : new byte[byteArrayLength]; + } + + /** + * Recycle the given byte array, which must have the same length as the + * array length managed by this object. + * + * The byte array may or may not be allocated + * by the {@link FixedLengthManager#allocate()} method. + */ + synchronized int recycle(byte[] array) { + Preconditions.checkNotNull(array); + Preconditions.checkArgument(array.length == byteArrayLength); + if (LOG.isDebugEnabled()) { + debugMessage.get().append(", ").append(this); + } + + if (numAllocated == maxAllocated) { + if (LOG.isDebugEnabled()) { + debugMessage.get().append(", notifyAll"); + } + notifyAll(); + } + numAllocated--; + if (numAllocated < 0) { + // it is possible to drop below 0 since + // some byte arrays may not be created by the allocate() method. + numAllocated = 0; + } + + if (freeQueue.size() < maxAllocated - numAllocated) { + if (LOG.isDebugEnabled()) { + debugMessage.get().append(", freeQueue.offer"); + } + freeQueue.offer(array); + } + return freeQueue.size(); + } + + @Override + public synchronized String toString() { + return "[" + byteArrayLength + ": " + numAllocated + "/" + + maxAllocated + ", free=" + freeQueue.size() + "]"; + } + } + + /** A map from array lengths to byte array managers. */ + static class ManagerMap { + private final int countLimit; + private final Map map = new HashMap(); + + ManagerMap(int countLimit) { + this.countLimit = countLimit; + } + + /** @return the manager for the given array length. */ + synchronized FixedLengthManager get(final Integer arrayLength, + final boolean createIfNotExist) { + FixedLengthManager manager = map.get(arrayLength); + if (manager == null && createIfNotExist) { + manager = new FixedLengthManager(arrayLength, countLimit); + map.put(arrayLength, manager); + } + return manager; + } + + synchronized void clear() { + map.clear(); + } + } + + public static class Conf { + /** + * The count threshold for each array length so that a manager is created + * only after the allocation count exceeds the threshold. + */ + private final int countThreshold; + /** + * The maximum number of arrays allowed for each array length. + */ + private final int countLimit; + /** + * The time period in milliseconds that the allocation count for each array + * length is reset to zero if there is no increment. + */ + private final long countResetTimePeriodMs; + + public Conf(int countThreshold, int countLimit, long countResetTimePeriodMs) { + this.countThreshold = countThreshold; + this.countLimit = countLimit; + this.countResetTimePeriodMs = countResetTimePeriodMs; + } + } + + /** + * Create a byte array for the given length, where the length of + * the returned array is larger than or equal to the given length. + * + * The current thread may be blocked if some resource is unavailable. + * + * The byte array created by this method must be released + * via the {@link ByteArrayManager#release(byte[])} method. + * + * @return a byte array with length larger than or equal to the given length. + */ + public abstract byte[] newByteArray(int size) throws InterruptedException; + + /** + * Release the given byte array. + * + * The byte array may or may not be created + * by the {@link ByteArrayManager#newByteArray(int)} method. + * + * @return the number of free array. + */ + public abstract int release(byte[] array); + + public static ByteArrayManager newInstance(Conf conf) { + return conf == null? new NewByteArrayWithoutLimit(): new Impl(conf); + } + + /** + * A dummy implementation which simply calls new byte[]. + */ + static class NewByteArrayWithoutLimit extends ByteArrayManager { + @Override + public byte[] newByteArray(int size) throws InterruptedException { + return new byte[size]; + } + + @Override + public int release(byte[] array) { + return 0; + } + } + + /** + * Manage byte array allocation and provide a mechanism for recycling the byte + * array objects. + */ + static class Impl extends ByteArrayManager { + private final Conf conf; + + private final CounterMap counters; + private final ManagerMap managers; + + Impl(Conf conf) { + this.conf = conf; + this.counters = new CounterMap(conf.countResetTimePeriodMs); + this.managers = new ManagerMap(conf.countLimit); + } + + /** + * Allocate a byte array, where the length of the allocated array + * is the least power of two of the given length + * unless the given length is less than {@link #MIN_ARRAY_LENGTH}. + * In such case, the returned array length is equal to {@link #MIN_ARRAY_LENGTH}. + * + * If the number of allocated arrays exceeds the capacity, + * the current thread is blocked until + * the number of allocated arrays drops to below the capacity. + * + * The byte array allocated by this method must be returned for recycling + * via the {@link ByteArrayManager#recycle(byte[])} method. + * + * @return a byte array with length larger than or equal to the given length. + */ + @Override + public byte[] newByteArray(final int arrayLength) throws InterruptedException { + if (LOG.isDebugEnabled()) { + debugMessage.get().append("allocate(").append(arrayLength).append(")"); + } + + final byte[] array; + if (arrayLength == 0) { + array = EMPTY_BYTE_ARRAY; + } else { + final int powerOfTwo = arrayLength <= MIN_ARRAY_LENGTH? + MIN_ARRAY_LENGTH: leastPowerOfTwo(arrayLength); + final long count = counters.get(powerOfTwo, true).increment(); + final boolean aboveThreshold = count > conf.countThreshold; + // create a new manager only if the count is above threshold. + final FixedLengthManager manager = managers.get(powerOfTwo, aboveThreshold); + + if (LOG.isDebugEnabled()) { + debugMessage.get().append(": count=").append(count) + .append(aboveThreshold? ", aboveThreshold": ", belowThreshold"); + } + array = manager != null? manager.allocate(): new byte[powerOfTwo]; + } + + if (LOG.isDebugEnabled()) { + logDebugMessage(); + } + return array; + } + + /** + * Recycle the given byte array. + * + * The byte array may or may not be allocated + * by the {@link ByteArrayManager#allocate(int)} method. + */ + @Override + public int release(final byte[] array) { + Preconditions.checkNotNull(array); + if (LOG.isDebugEnabled()) { + debugMessage.get().append("recycle: array.length=").append(array.length); + } + + final int freeQueueSize; + if (array.length == 0) { + freeQueueSize = -1; + } else { + final FixedLengthManager manager = managers.get(array.length, false); + freeQueueSize = manager == null? -1: manager.recycle(array); + } + + if (LOG.isDebugEnabled()) { + debugMessage.get().append(", freeQueueSize=").append(freeQueueSize); + logDebugMessage(); + } + return freeQueueSize; + } + + CounterMap getCounters() { + return counters; + } + + ManagerMap getManagers() { + return managers; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java new file mode 100644 index 00000000000..289617a6b92 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java @@ -0,0 +1,635 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.util.ByteArrayManager.Counter; +import org.apache.hadoop.hdfs.util.ByteArrayManager.CounterMap; +import org.apache.hadoop.hdfs.util.ByteArrayManager.FixedLengthManager; +import org.apache.hadoop.hdfs.util.ByteArrayManager.ManagerMap; +import org.apache.hadoop.util.Time; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test {@link ByteArrayManager}. + */ +public class TestByteArrayManager { + static { + ((Log4JLogger)LogFactory.getLog(ByteArrayManager.class) + ).getLogger().setLevel(Level.ALL); + } + + static final Log LOG = LogFactory.getLog(TestByteArrayManager.class); + + private static final Comparator> CMP = new Comparator>() { + @Override + public int compare(Future left, Future right) { + try { + return left.get().intValue() - right.get().intValue(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + @Test + public void testCounter() throws Exception { + final long countResetTimePeriodMs = 200L; + final Counter c = new Counter(countResetTimePeriodMs); + + final int n = DFSUtil.getRandom().nextInt(512) + 512; + final List> futures = new ArrayList>(n); + + final ExecutorService pool = Executors.newFixedThreadPool(32); + try { + // increment + for(int i = 0; i < n; i++) { + futures.add(pool.submit(new Callable() { + @Override + public Integer call() throws Exception { + return (int)c.increment(); + } + })); + } + + // sort and wait for the futures + Collections.sort(futures, CMP); + } finally { + pool.shutdown(); + } + + // check futures + Assert.assertEquals(n, futures.size()); + for(int i = 0; i < n; i++) { + Assert.assertEquals(i + 1, futures.get(i).get().intValue()); + } + Assert.assertEquals(n, c.getCount()); + + // test auto-reset + Thread.sleep(countResetTimePeriodMs + 100); + Assert.assertEquals(1, c.increment()); + } + + + + @Test + public void testAllocateRecycle() throws Exception { + final int countThreshold = 4; + final int countLimit = 8; + final long countResetTimePeriodMs = 200L; + final ByteArrayManager.Impl bam = new ByteArrayManager.Impl( + new ByteArrayManager.Conf( + countThreshold, countLimit, countResetTimePeriodMs)); + + final CounterMap counters = bam.getCounters(); + final ManagerMap managers = bam.getManagers(); + + final int[] uncommonArrays = {0, 1, 2, 4, 8, 16, 32, 64}; + final int arrayLength = 1024; + + + final Allocator allocator = new Allocator(bam); + final Recycler recycler = new Recycler(bam); + try { + { // allocate within threshold + for(int i = 0; i < countThreshold; i++) { + allocator.submit(arrayLength); + } + waitForAll(allocator.futures); + + Assert.assertEquals(countThreshold, + counters.get(arrayLength, false).getCount()); + Assert.assertNull(managers.get(arrayLength, false)); + for(int n : uncommonArrays) { + Assert.assertNull(counters.get(n, false)); + Assert.assertNull(managers.get(n, false)); + } + } + + { // recycle half of the arrays + for(int i = 0; i < countThreshold/2; i++) { + recycler.submit(removeLast(allocator.futures)); + } + + for(Future f : recycler.furtures) { + Assert.assertEquals(-1, f.get().intValue()); + } + recycler.furtures.clear(); + } + + { // allocate one more + allocator.submit(arrayLength).get(); + + Assert.assertEquals(countThreshold + 1, counters.get(arrayLength, false).getCount()); + Assert.assertNotNull(managers.get(arrayLength, false)); + } + + { // recycle the remaining arrays + final int n = allocator.recycleAll(recycler); + + recycler.verify(n); + } + + { + // allocate until the maximum. + for(int i = 0; i < countLimit; i++) { + allocator.submit(arrayLength); + } + waitForAll(allocator.futures); + + // allocate one more should be blocked + final AllocatorThread t = new AllocatorThread(arrayLength, bam); + t.start(); + + // check if the thread is waiting, timed wait or runnable. + for(int i = 0; i < 5; i++) { + Thread.sleep(100); + final Thread.State threadState = t.getState(); + if (threadState != Thread.State.RUNNABLE + && threadState != Thread.State.WAITING + && threadState != Thread.State.TIMED_WAITING) { + Assert.fail("threadState = " + threadState); + } + } + + // recycle an array + recycler.submit(removeLast(allocator.futures)); + Assert.assertEquals(1, removeLast(recycler.furtures).intValue()); + + // check if the thread is unblocked + Thread.sleep(100); + Assert.assertEquals(Thread.State.TERMINATED, t.getState()); + + // recycle the remaining, the recycle should be full. + Assert.assertEquals(countLimit-1, allocator.recycleAll(recycler)); + recycler.submit(t.array); + recycler.verify(countLimit); + + // recycle one more; it should not increase the free queue size + Assert.assertEquals(countLimit, bam.release(new byte[arrayLength])); + } + } finally { + allocator.pool.shutdown(); + recycler.pool.shutdown(); + } + } + + static T removeLast(List> furtures) throws Exception { + return remove(furtures, furtures.size() - 1); + } + static T remove(List> furtures, int i) throws Exception { + return furtures.isEmpty()? null: furtures.remove(i).get(); + } + + static void waitForAll(List> furtures) throws Exception { + for(Future f : furtures) { + f.get(); + } + } + + static class AllocatorThread extends Thread { + private final ByteArrayManager bam; + private final int arrayLength; + private byte[] array; + + AllocatorThread(int arrayLength, ByteArrayManager bam) { + this.bam = bam; + this.arrayLength = arrayLength; + } + + @Override + public void run() { + try { + array = bam.newByteArray(arrayLength); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + static class Allocator { + private final ByteArrayManager bam; + final ExecutorService pool = Executors.newFixedThreadPool(8); + final List> futures = new LinkedList>(); + + Allocator(ByteArrayManager bam) { + this.bam = bam; + } + + Future submit(final int arrayLength) { + final Future f = pool.submit(new Callable() { + @Override + public byte[] call() throws Exception { + final byte[] array = bam.newByteArray(arrayLength); + Assert.assertEquals(arrayLength, array.length); + return array; + } + }); + futures.add(f); + return f; + } + + int recycleAll(Recycler recycler) throws Exception { + final int n = futures.size(); + for(Future f : futures) { + recycler.submit(f.get()); + } + futures.clear(); + return n; + } + } + + static class Recycler { + private final ByteArrayManager bam; + final ExecutorService pool = Executors.newFixedThreadPool(8); + final List> furtures = new LinkedList>(); + + Recycler(ByteArrayManager bam) { + this.bam = bam; + } + + Future submit(final byte[] array) { + final Future f = pool.submit(new Callable() { + @Override + public Integer call() throws Exception { + return bam.release(array); + } + }); + furtures.add(f); + return f; + } + + void verify(final int expectedSize) throws Exception { + Assert.assertEquals(expectedSize, furtures.size()); + Collections.sort(furtures, CMP); + for(int i = 0; i < furtures.size(); i++) { + Assert.assertEquals(i+1, furtures.get(i).get().intValue()); + } + furtures.clear(); + } + } + + + @Test + public void testByteArrayManager() throws Exception { + final int countThreshold = 32; + final int countLimit = 64; + final long countResetTimePeriodMs = 1000L; + final ByteArrayManager.Impl bam = new ByteArrayManager.Impl( + new ByteArrayManager.Conf( + countThreshold, countLimit, countResetTimePeriodMs)); + + final CounterMap counters = bam.getCounters(); + final ManagerMap managers = bam.getManagers(); + + final ExecutorService pool = Executors.newFixedThreadPool(128); + + final Runner[] runners = new Runner[Runner.NUM_RUNNERS]; + final Thread[] threads = new Thread[runners.length]; + + final int num = 1 << 8; + for(int i = 0; i < runners.length; i++) { + runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam); + threads[i] = runners[i].start(num); + } + + final Thread randomRecycler = new Thread() { + @Override + public void run() { + LOG.info("randomRecycler start"); + for(int i = 0; shouldRun(); i++) { + final int j = DFSUtil.getRandom().nextInt(runners.length); + try { + runners[j].recycle(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(this + " has " + e); + } + + if ((i & 0xFF) == 0) { + sleepMs(100); + } + } + LOG.info("randomRecycler done"); + } + + boolean shouldRun() { + for(int i = 0; i < runners.length; i++) { + if (threads[i].isAlive()) { + return true; + } + if (!runners[i].isEmpty()) { + return true; + } + } + return false; + } + }; + randomRecycler.start(); + + randomRecycler.join(); + + Assert.assertNull(counters.get(0, false)); + for(int i = 1; i < runners.length; i++) { + if (!runners[i].assertionErrors.isEmpty()) { + for(AssertionError e : runners[i].assertionErrors) { + LOG.error("AssertionError " + i, e); + } + Assert.fail(runners[i].assertionErrors.size() + " AssertionError(s)"); + } + + final int arrayLength = Runner.index2arrayLength(i); + final boolean exceedCountThreshold = counters.get(arrayLength, false).getCount() > countThreshold; + final FixedLengthManager m = managers.get(arrayLength, false); + if (exceedCountThreshold) { + Assert.assertNotNull(m); + } else { + Assert.assertNull(m); + } + } + } + + static void sleepMs(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + e.printStackTrace(); + Assert.fail("Sleep is interrupted: " + e); + } + } + + static class Runner implements Runnable { + static final int NUM_RUNNERS = 4; + + static int index2arrayLength(int index) { + return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1); + } + + private final ByteArrayManager bam; + final int maxArrayLength; + final int countThreshold; + final int maxArrays; + final ExecutorService pool; + final List> arrays = new ArrayList>(); + + final AtomicInteger count = new AtomicInteger(); + final int p; + private int n; + + final List assertionErrors = new ArrayList(); + + Runner(int index, int countThreshold, int maxArrays, + ExecutorService pool, int p, ByteArrayManager bam) { + this.maxArrayLength = index2arrayLength(index); + this.countThreshold = countThreshold; + this.maxArrays = maxArrays; + this.pool = pool; + this.p = p; + this.bam = bam; + } + + boolean isEmpty() { + synchronized (arrays) { + return arrays.isEmpty(); + } + } + + Future submitAllocate() { + count.incrementAndGet(); + + final Future f = pool.submit(new Callable() { + @Override + public byte[] call() throws Exception { + final int lower = maxArrayLength == ByteArrayManager.MIN_ARRAY_LENGTH? + 0: maxArrayLength >> 1; + final int arrayLength = DFSUtil.getRandom().nextInt( + maxArrayLength - lower) + lower + 1; + final byte[] array = bam.newByteArray(arrayLength); + try { + Assert.assertEquals("arrayLength=" + arrayLength + ", lower=" + lower, + maxArrayLength, array.length); + } catch(AssertionError e) { + assertionErrors.add(e); + } + return array; + } + }); + synchronized (arrays) { + arrays.add(f); + } + return f; + } + + byte[] removeFirst() throws Exception { + synchronized (arrays) { + return remove(arrays, 0); + } + } + + void recycle() throws Exception { + final byte[] a = removeFirst(); + if (a != null) { + recycle(a); + } + } + + int recycle(final byte[] array) { + return bam.release(array); + } + + Future submitRecycle(final byte[] array) { + count.decrementAndGet(); + + final Future f = pool.submit(new Callable() { + @Override + public Integer call() throws Exception { + return recycle(array); + } + }); + return f; + } + + @Override + public void run() { + for(int i = 0; i < n; i++) { + final boolean isAllocate = DFSUtil.getRandom().nextInt(NUM_RUNNERS) < p; + if (isAllocate) { + submitAllocate(); + } else { + try { + final byte[] a = removeFirst(); + if (a != null) { + submitRecycle(a); + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(this + " has " + e); + } + } + + if ((i & 0xFF) == 0) { + sleepMs(100); + } + } + } + + Thread start(int n) { + this.n = n; + final Thread t = new Thread(this); + t.start(); + return t; + } + + @Override + public String toString() { + return getClass().getSimpleName() + ": max=" + maxArrayLength + + ", count=" + count; + } + } + + static class NewByteArrayWithLimit extends ByteArrayManager { + private final int maxCount; + private int count = 0; + + NewByteArrayWithLimit(int maxCount) { + this.maxCount = maxCount; + } + + @Override + public synchronized byte[] newByteArray(int size) throws InterruptedException { + for(; count >= maxCount; ) { + wait(); + } + count++; + return new byte[size]; + } + + @Override + public synchronized int release(byte[] array) { + if (count == maxCount) { + notifyAll(); + } + count--; + return 0; + } + } + + public static void main(String[] args) throws Exception { + ((Log4JLogger)LogFactory.getLog(ByteArrayManager.class) + ).getLogger().setLevel(Level.OFF); + + final int arrayLength = 64 * 1024; //64k + final int nThreads = 512; + final int nAllocations = 1 << 15; + final int maxArrays = 1 << 10; + final int nTrials = 5; + + System.out.println("arrayLength=" + arrayLength + + ", nThreads=" + nThreads + + ", nAllocations=" + nAllocations + + ", maxArrays=" + maxArrays); + + final Random ran = DFSUtil.getRandom(); + final ByteArrayManager[] impls = { + new ByteArrayManager.NewByteArrayWithoutLimit(), + new NewByteArrayWithLimit(maxArrays), + new ByteArrayManager.Impl(new ByteArrayManager.Conf( + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT, + maxArrays, + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT)) + }; + final double[] avg = new double[impls.length]; + + for(int i = 0; i < impls.length; i++) { + double duration = 0; + printf("%26s:", impls[i].getClass().getSimpleName()); + for(int j = 0; j < nTrials; j++) { + final int[] sleepTime = new int[nAllocations]; + for(int k = 0; k < sleepTime.length; k++) { + sleepTime[k] = ran.nextInt(100); + } + + final long elapsed = performanceTest(arrayLength, maxArrays, nThreads, + sleepTime, impls[i]); + duration += elapsed; + printf("%5d, ", elapsed); + } + avg[i] = duration/nTrials; + printf("avg=%6.3fs", avg[i]/1000); + for(int j = 0; j < i; j++) { + printf(" (%6.2f%%)", percentageDiff(avg[j], avg[i])); + } + printf("\n"); + } + } + + static double percentageDiff(double original, double newValue) { + return (newValue - original)/original*100; + } + + static void printf(String format, Object... args) { + System.out.printf(format, args); + System.out.flush(); + } + + static long performanceTest(final int arrayLength, final int maxArrays, + final int nThreads, final int[] sleepTimeMSs, final ByteArrayManager impl) + throws Exception { + final ExecutorService pool = Executors.newFixedThreadPool(nThreads); + final List> futures = new ArrayList>(sleepTimeMSs.length); + final long startTime = Time.monotonicNow(); + + for(int i = 0; i < sleepTimeMSs.length; i++) { + final long sleepTime = sleepTimeMSs[i]; + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws Exception { + byte[] array = impl.newByteArray(arrayLength); + sleepMs(sleepTime); + impl.release(array); + return null; + } + })); + } + for(Future f : futures) { + f.get(); + } + + final long endTime = Time.monotonicNow(); + pool.shutdown(); + return endTime - startTime; + } +}