diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 569ab217e44..3329baf1506 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -536,6 +536,9 @@ Release 2.0.5-beta - UNRELEASED HADOOP-9358. "Auth failed" log should include exception string (todd) + HADOOP-9401. CodecPool: Add counters for number of (de)compressors + leased out. (kkambatl via tucu) + OPTIMIZATIONS HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java index ed76012f808..63e3a09c307 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +30,10 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + /** * A global compressor/decompressor pool used to save and reuse * (possibly native) compression/decompression codecs. @@ -52,6 +57,29 @@ public class CodecPool { private static final Map, List> decompressorPool = new HashMap, List>(); + private static LoadingCache, AtomicInteger> createCache( + Class klass) { + return CacheBuilder.newBuilder().build( + new CacheLoader, AtomicInteger>() { + @Override + public AtomicInteger load(Class key) throws Exception { + return new AtomicInteger(); + } + }); + } + + /** + * Map to track the number of leased compressors + */ + private static final LoadingCache, AtomicInteger> compressorCounts = + createCache(Compressor.class); + + /** + * Map to tracks the number of leased decompressors + */ + private static final LoadingCache, AtomicInteger> decompressorCounts = + createCache(Decompressor.class); + private static T borrow(Map, List> pool, Class codecClass) { T codec = null; @@ -90,6 +118,21 @@ public class CodecPool { } } + @SuppressWarnings("unchecked") + private static int getLeaseCount( + LoadingCache, AtomicInteger> usageCounts, + Class codecClass) { + return usageCounts.getUnchecked((Class) codecClass).get(); + } + + private static void updateLeaseCount( + LoadingCache, AtomicInteger> usageCounts, T codec, int delta) { + if (codec != null) { + Class codecClass = ReflectionUtils.getClass(codec); + usageCounts.getUnchecked(codecClass).addAndGet(delta); + } + } + /** * Get a {@link Compressor} for the given {@link CompressionCodec} from the * pool or a new one. @@ -111,6 +154,7 @@ public class CodecPool { LOG.debug("Got recycled compressor"); } } + updateLeaseCount(compressorCounts, compressor, 1); return compressor; } @@ -137,6 +181,7 @@ public class CodecPool { LOG.debug("Got recycled decompressor"); } } + updateLeaseCount(decompressorCounts, decompressor, 1); return decompressor; } @@ -155,6 +200,7 @@ public class CodecPool { } compressor.reset(); payback(compressorPool, compressor); + updateLeaseCount(compressorCounts, compressor, -1); } /** @@ -173,5 +219,24 @@ public class CodecPool { } decompressor.reset(); payback(decompressorPool, decompressor); + updateLeaseCount(decompressorCounts, decompressor, -1); + } + + /** + * Return the number of leased {@link Compressor}s for this + * {@link CompressionCodec} + */ + public static int getLeasedCompressorsCount(CompressionCodec codec) { + return (codec == null) ? 0 : getLeaseCount(compressorCounts, + codec.getCompressorType()); + } + + /** + * Return the number of leased {@link Decompressor}s for this + * {@link CompressionCodec} + */ + public static int getLeasedDecompressorsCount(CompressionCodec codec) { + return (codec == null) ? 0 : getLeaseCount(decompressorCounts, + codec.getDecompressorType()); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java new file mode 100644 index 00000000000..551f282889e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Before; +import org.junit.Test; + +public class TestCodecPool { + private final String LEASE_COUNT_ERR = + "Incorrect number of leased (de)compressors"; + DefaultCodec codec; + + @Before + public void setup() { + this.codec = new DefaultCodec(); + this.codec.setConf(new Configuration()); + } + + @Test(timeout = 1000) + public void testCompressorPoolCounts() { + // Get two compressors and return them + Compressor comp1 = CodecPool.getCompressor(codec); + Compressor comp2 = CodecPool.getCompressor(codec); + assertEquals(LEASE_COUNT_ERR, 2, + CodecPool.getLeasedCompressorsCount(codec)); + + CodecPool.returnCompressor(comp2); + assertEquals(LEASE_COUNT_ERR, 1, + CodecPool.getLeasedCompressorsCount(codec)); + + CodecPool.returnCompressor(comp1); + assertEquals(LEASE_COUNT_ERR, 0, + CodecPool.getLeasedCompressorsCount(codec)); + } + + @Test(timeout = 1000) + public void testDecompressorPoolCounts() { + // Get two decompressors and return them + Decompressor decomp1 = CodecPool.getDecompressor(codec); + Decompressor decomp2 = CodecPool.getDecompressor(codec); + assertEquals(LEASE_COUNT_ERR, 2, + CodecPool.getLeasedDecompressorsCount(codec)); + + CodecPool.returnDecompressor(decomp2); + assertEquals(LEASE_COUNT_ERR, 1, + CodecPool.getLeasedDecompressorsCount(codec)); + + CodecPool.returnDecompressor(decomp1); + assertEquals(LEASE_COUNT_ERR, 0, + CodecPool.getLeasedDecompressorsCount(codec)); + } +}