diff --git a/CHANGES.txt b/CHANGES.txt index 77c02a86013..c17c8eac0eb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -175,6 +175,7 @@ Release 0.91.0 - Unreleased (Erik Onnen) HBASE-3757 Upgrade to ZK 3.3.3 HBASE-3609 Improve the selection of regions to balance; part 2 (Ted Yu) + HBASE-2939 Allow Client-Side Connection Pooling (Karthik Sankarachary) TASKS HBASE-3559 Move report of split to master OFF the heartbeat channel diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index ce0ea12ad4d..5701639d769 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -134,6 +134,12 @@ public final class HConstants { /** Parameter name for HBase instance root directory */ public static final String HBASE_DIR = "hbase.rootdir"; + /** Parameter name for HBase client IPC pool type */ + public static final String HBASE_CLIENT_IPC_POOL_TYPE = "hbase.client.ipc.pool.type"; + + /** Parameter name for HBase client IPC pool size */ + public static final String HBASE_CLIENT_IPC_POOL_SIZE = "hbase.client.ipc.pool.size"; + /** Used to construct the name of the log directory for a region server * Use '.' as a special character to seperate the log files from table data */ public static final String HREGION_LOGDIR_NAME = ".logs"; diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index beb7fcd3a88..470e741d8a7 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -20,22 +20,6 @@ package org.apache.hadoop.hbase.ipc; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.VersionedProtocol; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.ReflectionUtils; - -import javax.net.SocketFactory; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; @@ -50,10 +34,31 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Hashtable; import java.util.Iterator; +import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.PoolMap; +import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; + /** A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on * a port and is defined by a parameter class and a value class. @@ -67,8 +72,7 @@ public class HBaseClient { private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient"); - protected final Hashtable connections = - new Hashtable(); + protected final Map connections; protected final Class valueClass; // class of call values protected int counter; // counter for call ids @@ -689,6 +693,8 @@ public class HBaseClient { this.conf = conf; this.socketFactory = factory; this.clusterId = conf.get(HConstants.CLUSTER_ID, "default"); + this.connections = new PoolMap( + getPoolType(conf), getPoolSize(conf)); } /** @@ -700,6 +706,30 @@ public class HBaseClient { this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf)); } + /** + * Return the pool type specified in the configuration, if it roughly equals either + * the name of {@link PoolType#Reusable} or {@link PoolType#ThreadLocal}, otherwise + * default to the former type. + * + * @param config configuration + * @return either a {@link PoolType#Reusable} or {@link PoolType#ThreadLocal} + */ + private static PoolType getPoolType(Configuration config) { + return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), + PoolType.RoundRobin, PoolType.ThreadLocal); + } + + /** + * Return the pool size specified in the configuration, otherwise the maximum allowable + * size (which for all intents and purposes represents an unbounded pool). + * + * @param config + * @return the maximum pool size + */ + private static int getPoolSize(Configuration config) { + return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); + } + /** Return the socket factory of this client * * @return this client's socket factory diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index b4a1e536280..8a64439ab67 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -537,6 +537,33 @@ public class HBaseTestingUtility { return new HTable(c, tableName); } + /** + * Create a table. + * @param tableName + * @param families + * @param c Configuration to use + * @param numVersions + * @return An HTable instance for the created table. + * @throws IOException + */ + public HTable createTable(byte[] tableName, byte[][] families, + final Configuration c, int numVersions) + throws IOException { + HTableDescriptor desc = new HTableDescriptor(tableName); + for(byte[] family : families) { + HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions, + HColumnDescriptor.DEFAULT_COMPRESSION, + HColumnDescriptor.DEFAULT_IN_MEMORY, + HColumnDescriptor.DEFAULT_BLOCKCACHE, + HColumnDescriptor.DEFAULT_BLOCKSIZE, HColumnDescriptor.DEFAULT_TTL, + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); + desc.addFamily(hcd); + } + getHBaseAdmin().createTable(desc); + return new HTable(c, tableName); + } + /** * Create a table. * @param tableName diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 9ad0dadc9b4..7177b05c13c 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -38,6 +38,9 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -4032,6 +4035,116 @@ public class TestFromClientSide { queue.put(new Object()); } + @Test + public void testClientPoolRoundRobin() throws IOException { + final byte[] tableName = Bytes.toBytes("testClientPoolRoundRobin"); + int poolSize = 3; + int numVersions = poolSize * 2; + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "round-robin"); + conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize); + + HTable table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, + conf, Integer.MAX_VALUE); + table.setAutoFlush(true); + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, VALUE); + + Get get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(); + + for (int versions = 1; versions <= numVersions; versions++) { + table.put(put); + + Result result = table.get(get); + NavigableMap navigableMap = result.getMap().get(FAMILY) + .get(QUALIFIER); + + assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER + + " did not match " + versions, versions, navigableMap.size()); + for (Map.Entry entry : navigableMap.entrySet()) { + assertTrue("The value at time " + entry.getKey() + + " did not match what was put", + Bytes.equals(VALUE, entry.getValue())); + } + } + } + + @Test + public void testClientPoolThreadLocal() throws IOException { + final byte[] tableName = Bytes.toBytes("testClientPoolThreadLocal"); + + int poolSize = Integer.MAX_VALUE; + int numVersions = 3; + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "thread-local"); + conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize); + + final HTable table = TEST_UTIL.createTable(tableName, + new byte[][] { FAMILY }, conf); + table.setAutoFlush(true); + final Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, VALUE); + + final Get get = new Get(ROW); + get.addColumn(FAMILY, QUALIFIER); + get.setMaxVersions(); + + for (int versions = 1; versions <= numVersions; versions++) { + table.put(put); + + Result result = table.get(get); + NavigableMap navigableMap = result.getMap().get(FAMILY) + .get(QUALIFIER); + + assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER + + " did not match " + versions, versions, navigableMap.size()); + for (Map.Entry entry : navigableMap.entrySet()) { + assertTrue("The value at time " + entry.getKey() + + " did not match what was put", + Bytes.equals(VALUE, entry.getValue())); + } + } + + final Object waitLock = new Object(); + + ExecutorService executorService = Executors.newFixedThreadPool(numVersions); + for (int versions = numVersions; versions < numVersions * 2; versions++) { + final int versionsCopy = versions; + executorService.submit(new Callable() { + @Override + public Void call() { + try { + table.put(put); + + Result result = table.get(get); + NavigableMap navigableMap = result.getMap() + .get(FAMILY).get(QUALIFIER); + + assertEquals("The number of versions of '" + FAMILY + ":" + + QUALIFIER + " did not match " + versionsCopy, versionsCopy, + navigableMap.size()); + for (Map.Entry entry : navigableMap.entrySet()) { + assertTrue("The value at time " + entry.getKey() + + " did not match what was put", + Bytes.equals(VALUE, entry.getValue())); + } + synchronized (waitLock) { + waitLock.wait(); + } + } catch (Exception e) { + } + + return null; + } + }); + } + synchronized (waitLock) { + waitLock.notifyAll(); + } + executorService.shutdownNow(); + } }