Revert "BackPort HBASE-11554 Remove Reusable poolmap Rpc client type. (#2208)"

incorrect commit message and author

This reverts commit c645cb54e6.
This commit is contained in:
Sean Busbey 2020-08-11 22:07:46 -05:00
parent a535c7b1af
commit b9c415ac61
2 changed files with 138 additions and 1 deletions

View File

@ -228,7 +228,7 @@ public class PoolMap<K, V> implements Map<K, V> {
}
public enum PoolType {
ThreadLocal, RoundRobin;
Reusable, ThreadLocal, RoundRobin;
public static PoolType valueOf(String poolTypeName,
PoolType defaultPoolType, PoolType... allowedPoolTypes) {
@ -270,6 +270,8 @@ public class PoolMap<K, V> implements Map<K, V> {
protected Pool<V> createPool() {
switch (poolType) {
case Reusable:
return new ReusablePool<>(poolMaxSize);
case RoundRobin:
return new RoundRobinPool<>(poolMaxSize);
case ThreadLocal:
@ -278,6 +280,51 @@ public class PoolMap<K, V> implements Map<K, V> {
return null;
}
/**
* The <code>ReusablePool</code> represents a {@link PoolMap.Pool} that builds
* on the {@link java.util.LinkedList} class. It essentially allows resources to be
* checked out, at which point it is removed from this pool. When the resource
* is no longer required, it should be returned to the pool in order to be
* reused.
*
* <p>
* If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
* the pool is unbounded. Otherwise, it caps the number of consumers that can
* check out a resource from this pool to the (non-zero positive) value
* specified in {@link #maxSize}.
* </p>
*
* @param <R>
* the type of the resource
*/
@SuppressWarnings("serial")
public static class ReusablePool<R> extends ConcurrentLinkedQueue<R> implements Pool<R> {
private int maxSize;
public ReusablePool(int maxSize) {
this.maxSize = maxSize;
}
@Override
public R get() {
return poll();
}
@Override
public R put(R resource) {
if (super.size() < maxSize) {
add(resource);
}
return null;
}
@Override
public Collection<R> values() {
return this;
}
}
/**
* The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
* stores its resources in an {@link ArrayList}. It load-balances access to

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MiscTests.class, SmallTests.class })
public class TestReusablePoolMap extends PoolMapTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReusablePoolMap.class);
@Override
protected PoolType getPoolType() {
return PoolType.Reusable;
}
@Test
public void testSingleThreadedClient() throws InterruptedException, ExecutionException {
Random rand = ThreadLocalRandom.current();
String randomKey = String.valueOf(rand.nextInt());
String randomValue = String.valueOf(rand.nextInt());
// As long as we poll values we put, the pool size should remain zero
runThread(randomKey, randomValue, randomValue);
assertEquals(0, poolMap.size(randomKey));
}
@Test
public void testMultiThreadedClients() throws InterruptedException, ExecutionException {
Random rand = ThreadLocalRandom.current();
// As long as we poll values we put, the pool size should remain zero
for (int i = 0; i < POOL_SIZE; i++) {
String randomKey = String.valueOf(rand.nextInt());
String randomValue = String.valueOf(rand.nextInt());
runThread(randomKey, randomValue, randomValue);
assertEquals(0, poolMap.size(randomKey));
}
poolMap.clear();
String randomKey = String.valueOf(rand.nextInt());
for (int i = 0; i < POOL_SIZE - 1; i++) {
String randomValue = String.valueOf(rand.nextInt());
runThread(randomKey, randomValue, randomValue);
assertEquals(0, poolMap.size(randomKey));
}
assertEquals(0, poolMap.size(randomKey));
}
@Test
public void testPoolCap() throws InterruptedException, ExecutionException {
Random rand = ThreadLocalRandom.current();
// As long as we poll values we put, the pool size should remain zero
String randomKey = String.valueOf(rand.nextInt());
List<String> randomValues = new ArrayList<>();
for (int i = 0; i < POOL_SIZE * 2; i++) {
String randomValue = String.valueOf(rand.nextInt());
randomValues.add(randomValue);
runThread(randomKey, randomValue, randomValue);
}
assertEquals(0, poolMap.size(randomKey));
}
}