diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f8cdb576e52..2dbe263f1c3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashMap; @@ -53,6 +52,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.htrace.Trace; + import com.google.common.annotations.VisibleForTesting; /** @@ -288,6 +288,10 @@ class AsyncProcess { this.rpcFactory = rpcFactory; } + /** + * @return pool if non null, otherwise returns this.pool if non null, otherwise throws + * RuntimeException + */ private ExecutorService getPool(ExecutorService pool) { if (pool != null) return pool; if (this.pool != null) return this.pool; @@ -352,8 +356,8 @@ class AsyncProcess { RegionLocations locs = hConnection.locateRegion( tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { - throw new IOException("#" + id + ", no location found, aborting submit for" + - " tableName=" + tableName + " rowkey=" + Arrays.toString(r.getRow())); + throw new IOException("#" + id + ", no location found, aborting submit for" + + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow())); } loc = locs.getDefaultRegionLocation(); } catch (IOException ex) { @@ -383,15 +387,24 @@ class AsyncProcess { if (retainedActions.isEmpty()) return NO_REQS_RESULT; + return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults, + locationErrors, locationErrorRows, actionsByServer, pool); + } + + AsyncRequestFuture submitMultiActions(TableName tableName, + List> retainedActions, long nonceGroup, Batch.Callback callback, + Object[] results, boolean needResults, List locationErrors, + List locationErrorRows, Map> actionsByServer, + ExecutorService pool) { AsyncRequestFutureImpl ars = createAsyncRequestFuture( - tableName, retainedActions, nonceGroup, pool, callback, null, needResults); + tableName, retainedActions, nonceGroup, pool, callback, results, needResults); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { int originalIndex = locationErrorRows.get(i); Row row = retainedActions.get(originalIndex).getAction(); ars.manageError(originalIndex, row, - Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); + Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); } } ars.sendMultiAction(actionsByServer, 1, null, false); @@ -406,7 +419,7 @@ class AsyncProcess { * @param actionsByServer the multiaction per server * @param nonceGroup Nonce group. */ - private void addAction(ServerName server, byte[] regionName, Action action, + private static void addAction(ServerName server, byte[] regionName, Action action, Map> actionsByServer, long nonceGroup) { MultiAction multiAction = actionsByServer.get(server); if (multiAction == null) { @@ -531,7 +544,7 @@ class AsyncProcess { return ars; } - private void setNonce(NonceGenerator ng, Row r, Action action) { + private static void setNonce(NonceGenerator ng, Row r, Action action) { if (!(r instanceof Append) && !(r instanceof Increment)) return; action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 26da937d879..8a6575e3222 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -318,6 +318,13 @@ public class HTable implements HTableInterface, RegionLocator { cleanupConnectionOnClose = false; } + /** + * @return maxKeyValueSize from configuration. + */ + public static int getMaxKeyValueSize(Configuration conf) { + return conf.getInt("hbase.client.keyvalue.maxsize", -1); + } + /** * setup this HTable's parameter based on the passed configuration */ @@ -348,8 +355,7 @@ public class HTable implements HTableInterface, RegionLocator { ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory); multiAp = this.connection.getAsyncProcess(); - this.maxKeyValueSize = this.configuration.getInt( - "hbase.client.keyvalue.maxsize", -1); + this.maxKeyValueSize = getMaxKeyValueSize(this.configuration); this.closed = false; } @@ -1470,7 +1476,12 @@ public class HTable implements HTableInterface, RegionLocator { } // validate for well-formedness - public void validatePut(final Put put) throws IllegalArgumentException{ + public void validatePut(final Put put) throws IllegalArgumentException { + validatePut(put, maxKeyValueSize); + } + + // validate for well-formedness + public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException { if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 9f5e8361e91..e8c690974e5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -22,13 +22,12 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -41,8 +40,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** @@ -67,35 +67,35 @@ public class HTableMultiplexer { static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms"; - private Map tableNameToHTableMap; - /** The map between each region server to its corresponding buffer queue */ - private Map> - serverToBufferQueueMap; + private final Map> serverToBufferQueueMap = + new ConcurrentHashMap<>(); /** The map between each region server to its flush worker */ - private Map serverToFlushWorkerMap; + private final Map serverToFlushWorkerMap = + new ConcurrentHashMap<>(); - private Configuration conf; - private int retryNum; + private final Configuration conf; + private final ClusterConnection conn; + private final ExecutorService pool; + private final int retryNum; private int perRegionServerBufferQueueSize; + private final int maxKeyValueSize; /** - * * @param conf The HBaseConfiguration - * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops - * for each region server before dropping the request. + * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for + * each region server before dropping the request. */ - public HTableMultiplexer(Configuration conf, - int perRegionServerBufferQueueSize) throws ZooKeeperConnectionException { + public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) + throws IOException { this.conf = conf; - this.serverToBufferQueueMap = new ConcurrentHashMap>(); - this.serverToFlushWorkerMap = new ConcurrentHashMap(); - this.tableNameToHTableMap = new ConcurrentSkipListMap(); + this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf); + this.pool = HTable.getDefaultExecutor(conf); this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; + this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); } /** @@ -110,10 +110,6 @@ public class HTableMultiplexer { return put(tableName, put, this.retryNum); } - public boolean put(byte[] tableName, final Put put) throws IOException { - return put(TableName.valueOf(tableName), put); - } - /** * The puts request will be buffered by their corresponding buffer queue. * Return the list of puts which could not be queued. @@ -165,15 +161,14 @@ public class HTableMultiplexer { return false; } - LinkedBlockingQueue queue; - HTable htable = getHTable(tableName); try { - htable.validatePut(put); - HRegionLocation loc = htable.getRegionLocation(put.getRow(), false); + HTable.validatePut(put, maxKeyValueSize); + HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); if (loc != null) { // Add the put pair into its corresponding queue. - queue = addNewRegionServer(loc, htable); - // Generate a MultiPutStatus obj and offer it into the queue + + LinkedBlockingQueue queue = getQueue(loc); + // Generate a MultiPutStatus object and offer it into the queue PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry); return queue.offer(s); @@ -196,43 +191,30 @@ public class HTableMultiplexer { return new HTableMultiplexerStatus(serverToFlushWorkerMap); } + private LinkedBlockingQueue getQueue(HRegionLocation addr) { + LinkedBlockingQueue queue = serverToBufferQueueMap.get(addr); + if (queue == null) { + synchronized (this.serverToBufferQueueMap) { + queue = serverToBufferQueueMap.get(addr); + if (queue == null) { + // Create a queue for the new region server + queue = new LinkedBlockingQueue(perRegionServerBufferQueueSize); + serverToBufferQueueMap.put(addr, queue); - private HTable getHTable(TableName tableName) throws IOException { - HTable htable = this.tableNameToHTableMap.get(tableName); - if (htable == null) { - synchronized (this.tableNameToHTableMap) { - htable = this.tableNameToHTableMap.get(tableName); - if (htable == null) { - htable = new HTable(conf, tableName); - this.tableNameToHTableMap.put(tableName, htable); + // Create the flush worker + HTableFlushWorker worker = + new HTableFlushWorker(conf, this.conn, addr, this, queue, pool); + this.serverToFlushWorkerMap.put(addr, worker); + + // Launch a daemon thread to flush the puts + // from the queue to its corresponding region server. + String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + (poolID++); + Thread t = new Thread(worker, name); + t.setDaemon(true); + t.start(); } } } - return htable; - } - - private synchronized LinkedBlockingQueue addNewRegionServer( - HRegionLocation addr, HTable htable) { - LinkedBlockingQueue queue = - serverToBufferQueueMap.get(addr); - if (queue == null) { - // Create a queue for the new region server - queue = new LinkedBlockingQueue(perRegionServerBufferQueueSize); - serverToBufferQueueMap.put(addr, queue); - - // Create the flush worker - HTableFlushWorker worker = new HTableFlushWorker(conf, addr, - this, queue, htable); - this.serverToFlushWorkerMap.put(addr, worker); - - // Launch a daemon thread to flush the puts - // from the queue to its corresponding region server. - String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" - + (poolID++); - Thread t = new Thread(worker, name); - t.setDaemon(true); - t.start(); - } return queue; } @@ -405,28 +387,25 @@ public class HTableMultiplexer { } private static class HTableFlushWorker implements Runnable { - private HRegionLocation addr; - private Configuration conf; - private LinkedBlockingQueue queue; - private HTableMultiplexer htableMultiplexer; - private AtomicLong totalFailedPutCount; - private AtomicInteger currentProcessingPutCount; - private AtomicAverageCounter averageLatency; - private AtomicLong maxLatency; - private HTable htable; // For Multi + private final HRegionLocation addr; + private final Configuration conf; + private final ClusterConnection conn; + private final LinkedBlockingQueue queue; + private final HTableMultiplexer htableMultiplexer; + private final AtomicLong totalFailedPutCount = new AtomicLong(0); + private final AtomicInteger currentProcessingPutCount = new AtomicInteger(0); + private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); + private final AtomicLong maxLatency = new AtomicLong(0); + private final ExecutorService pool; - public HTableFlushWorker(Configuration conf, HRegionLocation addr, - HTableMultiplexer htableMultiplexer, - LinkedBlockingQueue queue, HTable htable) { + public HTableFlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, + HTableMultiplexer htableMultiplexer, LinkedBlockingQueue queue, ExecutorService pool) { this.addr = addr; this.conf = conf; + this.conn = conn; this.htableMultiplexer = htableMultiplexer; this.queue = queue; - this.totalFailedPutCount = new AtomicLong(0); - this.currentProcessingPutCount = new AtomicInteger(0); - this.averageLatency = new AtomicAverageCounter(); - this.maxLatency = new AtomicLong(0); - this.htable = htable; + this.pool = pool; } public long getTotalFailedCount() { @@ -466,7 +445,7 @@ public class HTableMultiplexer { @edu.umd.cs.findbugs.annotations.SuppressWarnings (value = "REC_CATCH_EXCEPTION", justification = "na") public void run() { - List processingList = new ArrayList(); + List processingList = new ArrayList<>(); /** * The frequency in milliseconds for the current thread to process the corresponding * buffer queue. @@ -481,6 +460,8 @@ public class HTableMultiplexer { Thread.currentThread().interrupt(); } + AsyncProcess ap = conn.getAsyncProcess(); + long start, elapsed; int failedCount = 0; while (true) { @@ -496,16 +477,29 @@ public class HTableMultiplexer { currentProcessingPutCount.set(processingList.size()); if (processingList.size() > 0) { - ArrayList list = new ArrayList(processingList.size()); - for (PutStatus putStatus: processingList) { - list.add(putStatus.getPut()); + List> retainedActions = new ArrayList<>(processingList.size()); + MultiAction actions = new MultiAction<>(); + for (int i = 0; i < processingList.size(); i++) { + PutStatus putStatus = processingList.get(i); + Action action = new Action(putStatus.getPut(), i); + actions.add(putStatus.getRegionInfo().getRegionName(), action); + retainedActions.add(action); } - // Process this multiput request - List failed = null; - Object[] results = new Object[list.size()]; + // Process this multi-put request + List failed = null; + Object[] results = new Object[actions.size()]; + ServerName server = addr.getServerName(); + Map> actionsByServer = + Collections.singletonMap(server, actions); try { - htable.batch(list, results); + AsyncRequestFuture arf = + ap.submitMultiActions(null, retainedActions, 0L, null, results, + true, null, null, actionsByServer, pool); + arf.waitUntilDone(); + if (arf.hasError()) { + throw arf.getErrors(); + } } catch (IOException e) { LOG.debug("Caught some exceptions " + e + " when flushing puts to region server " + addr.getHostnamePort()); @@ -515,35 +509,26 @@ public class HTableMultiplexer { // results are returned in the same order as the requests in list // walk the list backwards, so we can remove from list without // impacting the indexes of earlier members - for (int i = results.length - 1; i >= 0; i--) { - if (results[i] instanceof Result) { - // successful Puts are removed from the list here. - list.remove(i); + for (int i = 0; i < results.length; i++) { + if (results[i] == null) { + if (failed == null) { + failed = new ArrayList(); + } + failed.add(processingList.get(i)); } } - failed = list; } if (failed != null) { - if (failed.size() == processingList.size()) { - // All the puts for this region server are failed. Going to retry it later - for (PutStatus putStatus: processingList) { - if (!resubmitFailedPut(putStatus, this.addr)) { - failedCount++; - } - } - } else { - Set failedPutSet = new HashSet(failed); - for (PutStatus putStatus: processingList) { - if (failedPutSet.contains(putStatus.getPut()) - && !resubmitFailedPut(putStatus, this.addr)) { - failedCount++; - } + // Resubmit failed puts + for (PutStatus putStatus : processingList) { + if (!resubmitFailedPut(putStatus, this.addr)) { + failedCount++; } } + // Update the totalFailedCount + this.totalFailedPutCount.addAndGet(failedCount); } - // Update the totalFailedCount - this.totalFailedPutCount.addAndGet(failedCount); elapsed = EnvironmentEdgeManager.currentTime() - start; // Update latency counters @@ -580,7 +565,7 @@ public class HTableMultiplexer { // Log all the exceptions and move on LOG.debug("Caught some exceptions " + e + " when flushing puts to region server " - + addr.getHostnamePort()); + + addr.getHostnamePort(), e); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java index 4fa66788972..26fe485eab9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java @@ -27,8 +27,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; @@ -64,10 +64,27 @@ public class TestHTableMultiplexer { TEST_UTIL.shutdownMiniCluster(); } + private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality) + throws Exception { + // verify that the Get returns the correct result + Result r; + Get get = new Get(row); + get.addColumn(FAMILY, QUALIFIER); + int nbTry = 0; + do { + assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50); + nbTry++; + Thread.sleep(100); + r = htable.get(get); + } while (r == null || r.getValue(FAMILY, QUALIFIER) == null); + assertEquals("value", Bytes.toStringBinary(VALUE1), + Bytes.toStringBinary(r.getValue(FAMILY, QUALIFIER))); + } + @Test public void testHTableMultiplexer() throws Exception { - TableName TABLE = - TableName.valueOf("testHTableMultiplexer"); + TableName TABLE_1 = TableName.valueOf("testHTableMultiplexer_1"); + TableName TABLE_2 = TableName.valueOf("testHTableMultiplexer_2"); final int NUM_REGIONS = 10; final int VERSION = 3; List failedPuts; @@ -76,35 +93,35 @@ public class TestHTableMultiplexer { HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), PER_REGIONSERVER_QUEUE_SIZE); - HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, VERSION, + HTable htable1 = + TEST_UTIL.createTable(TABLE_1, new byte[][] { FAMILY }, VERSION, Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); - TEST_UTIL.waitUntilAllRegionsAssigned(TABLE); + HTable htable2 = + TEST_UTIL.createTable(TABLE_2, new byte[][] { FAMILY }, VERSION, Bytes.toBytes("aaaaa"), + Bytes.toBytes("zzzzz"), NUM_REGIONS); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_1); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_2); - byte[][] startRows = ht.getStartKeys(); - byte[][] endRows = ht.getEndKeys(); + byte[][] startRows = htable1.getStartKeys(); + byte[][] endRows = htable1.getEndKeys(); // SinglePut case for (int i = 0; i < NUM_REGIONS; i++) { byte [] row = startRows[i]; if (row == null || row.length <= 0) continue; - Put put = new Put(row); - put.add(FAMILY, QUALIFIER, VALUE1); - success = multiplexer.put(TABLE, put); - assertTrue(success); + Put put = new Put(row).add(FAMILY, QUALIFIER, VALUE1); + success = multiplexer.put(TABLE_1, put); + assertTrue("multiplexer.put returns", success); - LOG.info("Put for " + Bytes.toString(startRows[i]) + " @ iteration " + (i+1)); + put = new Put(row).add(FAMILY, QUALIFIER, VALUE1); + success = multiplexer.put(TABLE_2, put); + assertTrue("multiplexer.put failed", success); + + LOG.info("Put for " + Bytes.toStringBinary(startRows[i]) + " @ iteration " + (i + 1)); // verify that the Get returns the correct result - Get get = new Get(startRows[i]); - get.addColumn(FAMILY, QUALIFIER); - Result r; - int nbTry = 0; - do { - assertTrue(nbTry++ < 50); - Thread.sleep(100); - r = ht.get(get); - } while (r == null || r.getValue(FAMILY, QUALIFIER) == null); - assertEquals(0, Bytes.compareTo(VALUE1, r.getValue(FAMILY, QUALIFIER))); + checkExistence(htable1, startRows[i], FAMILY, QUALIFIER); + checkExistence(htable2, startRows[i], FAMILY, QUALIFIER); } // MultiPut case @@ -116,7 +133,7 @@ public class TestHTableMultiplexer { put.add(FAMILY, QUALIFIER, VALUE2); multiput.add(put); } - failedPuts = multiplexer.put(TABLE, multiput); + failedPuts = multiplexer.put(TABLE_1, multiput); assertTrue(failedPuts == null); // verify that the Get returns the correct result @@ -130,7 +147,7 @@ public class TestHTableMultiplexer { do { assertTrue(nbTry++ < 50); Thread.sleep(100); - r = ht.get(get); + r = htable1.get(get); } while (r == null || r.getValue(FAMILY, QUALIFIER) == null || Bytes.compareTo(VALUE2, r.getValue(FAMILY, QUALIFIER)) != 0); }