diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java index 3cd38f34161..f174c964ade 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java @@ -228,7 +228,7 @@ public class PoolMap implements Map { } public enum PoolType { - ThreadLocal, RoundRobin; + Reusable, ThreadLocal, RoundRobin; public static PoolType valueOf(String poolTypeName, PoolType defaultPoolType, PoolType... allowedPoolTypes) { @@ -270,6 +270,8 @@ public class PoolMap implements Map { protected Pool createPool() { switch (poolType) { + case Reusable: + return new ReusablePool<>(poolMaxSize); case RoundRobin: return new RoundRobinPool<>(poolMaxSize); case ThreadLocal: @@ -278,6 +280,51 @@ public class PoolMap implements Map { return null; } + /** + * The ReusablePool represents a {@link PoolMap.Pool} that builds + * on the {@link java.util.LinkedList} class. It essentially allows resources to be + * checked out, at which point it is removed from this pool. When the resource + * is no longer required, it should be returned to the pool in order to be + * reused. + * + *

+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of + * the pool is unbounded. Otherwise, it caps the number of consumers that can + * check out a resource from this pool to the (non-zero positive) value + * specified in {@link #maxSize}. + *

+ * + * @param + * the type of the resource + */ + @SuppressWarnings("serial") + public static class ReusablePool extends ConcurrentLinkedQueue implements Pool { + private int maxSize; + + public ReusablePool(int maxSize) { + this.maxSize = maxSize; + + } + + @Override + public R get() { + return poll(); + } + + @Override + public R put(R resource) { + if (super.size() < maxSize) { + add(resource); + } + return null; + } + + @Override + public Collection values() { + return this; + } + } + /** * The RoundRobinPool represents a {@link PoolMap.Pool}, which * stores its resources in an {@link ArrayList}. It load-balances access to diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusablePoolMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusablePoolMap.java new file mode 100644 index 00000000000..3fcaebb5fe5 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusablePoolMap.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MiscTests.class, SmallTests.class }) +public class TestReusablePoolMap extends PoolMapTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReusablePoolMap.class); + + @Override + protected PoolType getPoolType() { + return PoolType.Reusable; + } + + @Test + public void testSingleThreadedClient() throws InterruptedException, ExecutionException { + Random rand = ThreadLocalRandom.current(); + String randomKey = String.valueOf(rand.nextInt()); + String randomValue = String.valueOf(rand.nextInt()); + // As long as we poll values we put, the pool size should remain zero + runThread(randomKey, randomValue, randomValue); + assertEquals(0, poolMap.size(randomKey)); + } + + @Test + public void testMultiThreadedClients() throws InterruptedException, ExecutionException { + Random rand = ThreadLocalRandom.current(); + // As long as we poll values we put, the pool size should remain zero + for (int i = 0; i < POOL_SIZE; i++) { + String randomKey = String.valueOf(rand.nextInt()); + String randomValue = String.valueOf(rand.nextInt()); + runThread(randomKey, randomValue, randomValue); + assertEquals(0, poolMap.size(randomKey)); + } + poolMap.clear(); + String randomKey = String.valueOf(rand.nextInt()); + for (int i = 0; i < POOL_SIZE - 1; i++) { + String randomValue = String.valueOf(rand.nextInt()); + runThread(randomKey, randomValue, randomValue); + assertEquals(0, poolMap.size(randomKey)); + } + assertEquals(0, poolMap.size(randomKey)); + } + + @Test + public void testPoolCap() throws InterruptedException, ExecutionException { + Random rand = ThreadLocalRandom.current(); + // As long as we poll values we put, the pool size should remain zero + String randomKey = String.valueOf(rand.nextInt()); + List randomValues = new ArrayList<>(); + for (int i = 0; i < POOL_SIZE * 2; i++) { + String randomValue = String.valueOf(rand.nextInt()); + randomValues.add(randomValue); + runThread(randomKey, randomValue, randomValue); + } + assertEquals(0, poolMap.size(randomKey)); + } +}