HBASE-2939 Allow Client-Side Connection Pooling

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1095160 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-04-19 18:32:11 +00:00
parent 49f339cccf
commit 1183777e8a
5 changed files with 195 additions and 18 deletions

View File

@ -175,6 +175,7 @@ Release 0.91.0 - Unreleased
(Erik Onnen)
HBASE-3757 Upgrade to ZK 3.3.3
HBASE-3609 Improve the selection of regions to balance; part 2 (Ted Yu)
HBASE-2939 Allow Client-Side Connection Pooling (Karthik Sankarachary)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -134,6 +134,12 @@ public final class HConstants {
/** Parameter name for HBase instance root directory */
public static final String HBASE_DIR = "hbase.rootdir";
/** Parameter name for HBase client IPC pool type */
public static final String HBASE_CLIENT_IPC_POOL_TYPE = "hbase.client.ipc.pool.type";
/** Parameter name for HBase client IPC pool size */
public static final String HBASE_CLIENT_IPC_POOL_SIZE = "hbase.client.ipc.pool.size";
/** Used to construct the name of the log directory for a region server
* Use '.' as a special character to seperate the log files from table data */
public static final String HREGION_LOGDIR_NAME = ".logs";

View File

@ -20,22 +20,6 @@
package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import javax.net.SocketFactory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@ -50,10 +34,31 @@ import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
@ -67,8 +72,7 @@ public class HBaseClient {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
protected final Hashtable<ConnectionId, Connection> connections =
new Hashtable<ConnectionId, Connection>();
protected final Map<ConnectionId, Connection> connections;
protected final Class<? extends Writable> valueClass; // class of call values
protected int counter; // counter for call ids
@ -689,6 +693,8 @@ public class HBaseClient {
this.conf = conf;
this.socketFactory = factory;
this.clusterId = conf.get(HConstants.CLUSTER_ID, "default");
this.connections = new PoolMap<ConnectionId, Connection>(
getPoolType(conf), getPoolSize(conf));
}
/**
@ -700,6 +706,30 @@ public class HBaseClient {
this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
}
/**
* Return the pool type specified in the configuration, if it roughly equals either
* the name of {@link PoolType#Reusable} or {@link PoolType#ThreadLocal}, otherwise
* default to the former type.
*
* @param config configuration
* @return either a {@link PoolType#Reusable} or {@link PoolType#ThreadLocal}
*/
private static PoolType getPoolType(Configuration config) {
return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
PoolType.RoundRobin, PoolType.ThreadLocal);
}
/**
* Return the pool size specified in the configuration, otherwise the maximum allowable
* size (which for all intents and purposes represents an unbounded pool).
*
* @param config
* @return the maximum pool size
*/
private static int getPoolSize(Configuration config) {
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
}
/** Return the socket factory of this client
*
* @return this client's socket factory

View File

@ -537,6 +537,33 @@ public class HBaseTestingUtility {
return new HTable(c, tableName);
}
/**
* Create a table.
* @param tableName
* @param families
* @param c Configuration to use
* @param numVersions
* @return An HTable instance for the created table.
* @throws IOException
*/
public HTable createTable(byte[] tableName, byte[][] families,
final Configuration c, int numVersions)
throws IOException {
HTableDescriptor desc = new HTableDescriptor(tableName);
for(byte[] family : families) {
HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions,
HColumnDescriptor.DEFAULT_COMPRESSION,
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE,
HColumnDescriptor.DEFAULT_BLOCKSIZE, HColumnDescriptor.DEFAULT_TTL,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
desc.addFamily(hcd);
}
getHBaseAdmin().createTable(desc);
return new HTable(c, tableName);
}
/**
* Create a table.
* @param tableName

View File

@ -38,6 +38,9 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@ -4032,6 +4035,116 @@ public class TestFromClientSide {
queue.put(new Object());
}
@Test
public void testClientPoolRoundRobin() throws IOException {
final byte[] tableName = Bytes.toBytes("testClientPoolRoundRobin");
int poolSize = 3;
int numVersions = poolSize * 2;
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "round-robin");
conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize);
HTable table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY },
conf, Integer.MAX_VALUE);
table.setAutoFlush(true);
Put put = new Put(ROW);
put.add(FAMILY, QUALIFIER, VALUE);
Get get = new Get(ROW);
get.addColumn(FAMILY, QUALIFIER);
get.setMaxVersions();
for (int versions = 1; versions <= numVersions; versions++) {
table.put(put);
Result result = table.get(get);
NavigableMap<Long, byte[]> navigableMap = result.getMap().get(FAMILY)
.get(QUALIFIER);
assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER
+ " did not match " + versions, versions, navigableMap.size());
for (Map.Entry<Long, byte[]> entry : navigableMap.entrySet()) {
assertTrue("The value at time " + entry.getKey()
+ " did not match what was put",
Bytes.equals(VALUE, entry.getValue()));
}
}
}
@Test
public void testClientPoolThreadLocal() throws IOException {
final byte[] tableName = Bytes.toBytes("testClientPoolThreadLocal");
int poolSize = Integer.MAX_VALUE;
int numVersions = 3;
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "thread-local");
conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize);
final HTable table = TEST_UTIL.createTable(tableName,
new byte[][] { FAMILY }, conf);
table.setAutoFlush(true);
final Put put = new Put(ROW);
put.add(FAMILY, QUALIFIER, VALUE);
final Get get = new Get(ROW);
get.addColumn(FAMILY, QUALIFIER);
get.setMaxVersions();
for (int versions = 1; versions <= numVersions; versions++) {
table.put(put);
Result result = table.get(get);
NavigableMap<Long, byte[]> navigableMap = result.getMap().get(FAMILY)
.get(QUALIFIER);
assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER
+ " did not match " + versions, versions, navigableMap.size());
for (Map.Entry<Long, byte[]> entry : navigableMap.entrySet()) {
assertTrue("The value at time " + entry.getKey()
+ " did not match what was put",
Bytes.equals(VALUE, entry.getValue()));
}
}
final Object waitLock = new Object();
ExecutorService executorService = Executors.newFixedThreadPool(numVersions);
for (int versions = numVersions; versions < numVersions * 2; versions++) {
final int versionsCopy = versions;
executorService.submit(new Callable<Void>() {
@Override
public Void call() {
try {
table.put(put);
Result result = table.get(get);
NavigableMap<Long, byte[]> navigableMap = result.getMap()
.get(FAMILY).get(QUALIFIER);
assertEquals("The number of versions of '" + FAMILY + ":"
+ QUALIFIER + " did not match " + versionsCopy, versionsCopy,
navigableMap.size());
for (Map.Entry<Long, byte[]> entry : navigableMap.entrySet()) {
assertTrue("The value at time " + entry.getKey()
+ " did not match what was put",
Bytes.equals(VALUE, entry.getValue()));
}
synchronized (waitLock) {
waitLock.wait();
}
} catch (Exception e) {
}
return null;
}
});
}
synchronized (waitLock) {
waitLock.notifyAll();
}
executorService.shutdownNow();
}
}