diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index e8c690974e5..fae0a940fa9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -28,23 +28,28 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. * Each put will be sharded into different buffer queues based on its destination region server. @@ -63,24 +68,25 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceStability.Evolving public class HTableMultiplexer { private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName()); - private static int poolID = 0; - static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms"; - - /** The map between each region server to its corresponding buffer queue */ - private final Map> serverToBufferQueueMap = - new ConcurrentHashMap<>(); + public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = + "hbase.tablemultiplexer.flush.period.ms"; + public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads"; + public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = + "hbase.client.max.retries.in.queue"; /** The map between each region server to its flush worker */ - private final Map serverToFlushWorkerMap = + private final Map serverToFlushWorkerMap = new ConcurrentHashMap<>(); private final Configuration conf; private final ClusterConnection conn; private final ExecutorService pool; private final int retryNum; - private int perRegionServerBufferQueueSize; + private final int perRegionServerBufferQueueSize; private final int maxKeyValueSize; + private final ScheduledExecutorService executor; + private final long flushPeriod; /** * @param conf The HBaseConfiguration @@ -96,6 +102,11 @@ public class HTableMultiplexer { HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); + this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); + int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10); + this.executor = + Executors.newScheduledThreadPool(initThreads, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); } /** @@ -106,7 +117,7 @@ public class HTableMultiplexer { * @return true if the request can be accepted by its corresponding buffer queue. * @throws IOException */ - public boolean put(TableName tableName, final Put put) throws IOException { + public boolean put(TableName tableName, final Put put) { return put(tableName, put, this.retryNum); } @@ -118,8 +129,7 @@ public class HTableMultiplexer { * @return the list of puts which could not be queued * @throws IOException */ - public List put(TableName tableName, final List puts) - throws IOException { + public List put(TableName tableName, final List puts) { if (puts == null) return null; @@ -140,23 +150,22 @@ public class HTableMultiplexer { return failedPuts; } - public List put(byte[] tableName, final List puts) throws IOException { + /** + * Deprecated. Use {@link #put(TableName, List) } instead. + */ + @Deprecated + public List put(byte[] tableName, final List puts) { return put(TableName.valueOf(tableName), puts); } - - + /** * The put request will be buffered by its corresponding buffer queue. And the put request will be * retried before dropping the request. * Return false if the queue is already full. - * @param tableName - * @param put - * @param retry * @return true if the request can be accepted by its corresponding buffer queue. * @throws IOException */ - public boolean put(final TableName tableName, final Put put, int retry) - throws IOException { + public boolean put(final TableName tableName, final Put put, int retry) { if (retry <= 0) { return false; } @@ -166,24 +175,35 @@ public class HTableMultiplexer { HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); if (loc != null) { // Add the put pair into its corresponding queue. - LinkedBlockingQueue queue = getQueue(loc); + // Generate a MultiPutStatus object and offer it into the queue PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry); return queue.offer(s); } - } catch (Exception e) { - LOG.debug("Cannot process the put " + put + " because of " + e); + } catch (IOException e) { + LOG.debug("Cannot process the put " + put, e); } return false; } - public boolean put(final byte[] tableName, final Put put, int retry) - throws IOException { + /** + * Deprecated. Use {@link #put(TableName, Put) } instead. + */ + @Deprecated + public boolean put(final byte[] tableName, final Put put, int retry) { return put(TableName.valueOf(tableName), put, retry); } + /** + * Deprecated. Use {@link #put(TableName, Put)} instead. + */ + @Deprecated + public boolean put(final byte[] tableName, Put put) { + return put(TableName.valueOf(tableName), put); + } + /** * @return the current HTableMultiplexerStatus */ @@ -192,30 +212,20 @@ public class HTableMultiplexer { } private LinkedBlockingQueue getQueue(HRegionLocation addr) { - LinkedBlockingQueue queue = serverToBufferQueueMap.get(addr); - if (queue == null) { - synchronized (this.serverToBufferQueueMap) { - queue = serverToBufferQueueMap.get(addr); - if (queue == null) { - // Create a queue for the new region server - queue = new LinkedBlockingQueue(perRegionServerBufferQueueSize); - serverToBufferQueueMap.put(addr, queue); - + FlushWorker worker = serverToFlushWorkerMap.get(addr); + if (worker == null) { + synchronized (this.serverToFlushWorkerMap) { + worker = serverToFlushWorkerMap.get(addr); + if (worker == null) { // Create the flush worker - HTableFlushWorker worker = - new HTableFlushWorker(conf, this.conn, addr, this, queue, pool); + worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize, + pool, executor); this.serverToFlushWorkerMap.put(addr, worker); - - // Launch a daemon thread to flush the puts - // from the queue to its corresponding region server. - String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + (poolID++); - Thread t = new Thread(worker, name); - t.setDaemon(true); - t.start(); + executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); } } } - return queue; + return worker.getQueue(); } /** @@ -223,7 +233,7 @@ public class HTableMultiplexer { * report the number of buffered requests and the number of the failed (dropped) requests * in total or on per region server basis. */ - static class HTableMultiplexerStatus { + public static class HTableMultiplexerStatus { private long totalFailedPutCounter; private long totalBufferedPutCounter; private long maxLatency; @@ -234,7 +244,7 @@ public class HTableMultiplexer { private Map serverToMaxLatencyMap; public HTableMultiplexerStatus( - Map serverToFlushWorkerMap) { + Map serverToFlushWorkerMap) { this.totalBufferedPutCounter = 0; this.totalFailedPutCounter = 0; this.maxLatency = 0; @@ -247,17 +257,17 @@ public class HTableMultiplexer { } private void initialize( - Map serverToFlushWorkerMap) { + Map serverToFlushWorkerMap) { if (serverToFlushWorkerMap == null) { return; } long averageCalcSum = 0; int averageCalcCount = 0; - for (Map.Entry entry : serverToFlushWorkerMap + for (Map.Entry entry : serverToFlushWorkerMap .entrySet()) { HRegionLocation addr = entry.getKey(); - HTableFlushWorker worker = entry.getValue(); + FlushWorker worker = entry.getValue(); long bufferedCounter = worker.getTotalBufferedCount(); long failedCounter = worker.getTotalFailedCount(); @@ -325,25 +335,15 @@ public class HTableMultiplexer { } private static class PutStatus { - private final HRegionInfo regionInfo; - private final Put put; - private final int retryCount; - public PutStatus(final HRegionInfo regionInfo, final Put put, - final int retryCount) { + public final HRegionInfo regionInfo; + public final Put put; + public final int retryCount; + + public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) { this.regionInfo = regionInfo; this.put = put; this.retryCount = retryCount; } - - public HRegionInfo getRegionInfo() { - return regionInfo; - } - public Put getPut() { - return put; - } - public int getRetryCount() { - return retryCount; - } } /** @@ -386,26 +386,38 @@ public class HTableMultiplexer { } } - private static class HTableFlushWorker implements Runnable { + private static class FlushWorker implements Runnable { private final HRegionLocation addr; - private final Configuration conf; - private final ClusterConnection conn; + private final AsyncProcess asyncProc; private final LinkedBlockingQueue queue; - private final HTableMultiplexer htableMultiplexer; + private final HTableMultiplexer multiplexer; private final AtomicLong totalFailedPutCount = new AtomicLong(0); - private final AtomicInteger currentProcessingPutCount = new AtomicInteger(0); + private final AtomicInteger currentProcessingCount = new AtomicInteger(0); private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); private final AtomicLong maxLatency = new AtomicLong(0); private final ExecutorService pool; + private final List processingList = new ArrayList<>(); + private final ScheduledExecutorService executor; + private final int maxRetryInQueue; + private final AtomicInteger retryInQueue = new AtomicInteger(0); + private final int rpcTimeOutMs; - public HTableFlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, - HTableMultiplexer htableMultiplexer, LinkedBlockingQueue queue, ExecutorService pool) { + public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, + HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, + ExecutorService pool, ScheduledExecutorService executor) { this.addr = addr; - this.conf = conf; - this.conn = conn; - this.htableMultiplexer = htableMultiplexer; - this.queue = queue; + this.asyncProc = conn.getAsyncProcess(); + this.multiplexer = htableMultiplexer; + this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); this.pool = pool; + this.executor = executor; + this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); + this.rpcTimeOutMs = + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + } + + protected LinkedBlockingQueue getQueue() { + return this.queue; } public long getTotalFailedCount() { @@ -413,7 +425,7 @@ public class HTableMultiplexer { } public long getTotalBufferedCount() { - return queue.size() + currentProcessingPutCount.get(); + return queue.size() + currentProcessingCount.get(); } public AtomicAverageCounter getAverageLatencyCounter() { @@ -424,149 +436,146 @@ public class HTableMultiplexer { return this.maxLatency.getAndSet(0); } - private boolean resubmitFailedPut(PutStatus failedPutStatus, - HRegionLocation oldLoc) throws IOException { - Put failedPut = failedPutStatus.getPut(); - // The currentPut is failed. So get the table name for the currentPut. - TableName tableName = failedPutStatus.getRegionInfo().getTable(); + private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { // Decrease the retry count - int retryCount = failedPutStatus.getRetryCount() - 1; + final int retryCount = ps.retryCount - 1; if (retryCount <= 0) { // Update the failed counter and no retry any more. return false; - } else { - // Retry one more time - return this.htableMultiplexer.put(tableName, failedPut, retryCount); } + + int cnt = retryInQueue.incrementAndGet(); + if (cnt > maxRetryInQueue) { + // Too many Puts in queue for resubmit, give up this + retryInQueue.decrementAndGet(); + return false; + } + + final Put failedPut = ps.put; + // The currentPut is failed. So get the table name for the currentPut. + final TableName tableName = ps.regionInfo.getTable(); + + // Wait at least RPC timeout time + long delayMs = rpcTimeOutMs; + delayMs = Math.max(delayMs, (long) (multiplexer.flushPeriod * Math.pow(2, + multiplexer.retryNum - retryCount))); + + executor.schedule(new Runnable() { + @Override + public void run() { + boolean succ = false; + try { + succ = FlushWorker.this.multiplexer.put(tableName, failedPut, retryCount); + } finally { + FlushWorker.this.retryInQueue.decrementAndGet(); + if (!succ) { + FlushWorker.this.totalFailedPutCount.incrementAndGet(); + } + } + } + }, delayMs, TimeUnit.MILLISECONDS); + return true; } @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings - (value = "REC_CATCH_EXCEPTION", justification = "na") public void run() { - List processingList = new ArrayList<>(); - /** - * The frequency in milliseconds for the current thread to process the corresponding - * buffer queue. - **/ - long frequency = conf.getLong(TABLE_MULTIPLEXER_FLUSH_FREQ_MS, 100); - - // initial delay - try { - Thread.sleep(frequency); - } catch (InterruptedException e) { - LOG.warn("Interrupted while sleeping"); - Thread.currentThread().interrupt(); - } - - AsyncProcess ap = conn.getAsyncProcess(); - - long start, elapsed; int failedCount = 0; - while (true) { - try { - start = elapsed = EnvironmentEdgeManager.currentTime(); + try { + long start = EnvironmentEdgeManager.currentTime(); - // Clear the processingList, putToStatusMap and failedCount - processingList.clear(); - failedCount = 0; - - // drain all the queued puts into the tmp list - queue.drainTo(processingList); - currentProcessingPutCount.set(processingList.size()); - - if (processingList.size() > 0) { - List> retainedActions = new ArrayList<>(processingList.size()); - MultiAction actions = new MultiAction<>(); - for (int i = 0; i < processingList.size(); i++) { - PutStatus putStatus = processingList.get(i); - Action action = new Action(putStatus.getPut(), i); - actions.add(putStatus.getRegionInfo().getRegionName(), action); - retainedActions.add(action); - } - - // Process this multi-put request - List failed = null; - Object[] results = new Object[actions.size()]; - ServerName server = addr.getServerName(); - Map> actionsByServer = - Collections.singletonMap(server, actions); - try { - AsyncRequestFuture arf = - ap.submitMultiActions(null, retainedActions, 0L, null, results, - true, null, null, actionsByServer, pool); - arf.waitUntilDone(); - if (arf.hasError()) { - throw arf.getErrors(); - } - } catch (IOException e) { - LOG.debug("Caught some exceptions " + e - + " when flushing puts to region server " + addr.getHostnamePort()); - } finally { - // mutate list so that it is empty for complete success, or - // contains only failed records - // results are returned in the same order as the requests in list - // walk the list backwards, so we can remove from list without - // impacting the indexes of earlier members - for (int i = 0; i < results.length; i++) { - if (results[i] == null) { - if (failed == null) { - failed = new ArrayList(); - } - failed.add(processingList.get(i)); - } - } - } - - if (failed != null) { - // Resubmit failed puts - for (PutStatus putStatus : processingList) { - if (!resubmitFailedPut(putStatus, this.addr)) { - failedCount++; - } - } - // Update the totalFailedCount - this.totalFailedPutCount.addAndGet(failedCount); - } - - elapsed = EnvironmentEdgeManager.currentTime() - start; - // Update latency counters - averageLatency.add(elapsed); - if (elapsed > maxLatency.get()) { - maxLatency.set(elapsed); - } - - // Log some basic info - if (LOG.isDebugEnabled()) { - LOG.debug("Processed " + currentProcessingPutCount - + " put requests for " + addr.getHostnamePort() + " and " - + failedCount + " failed" + ", latency for this send: " - + elapsed); - } - - // Reset the current processing put count - currentProcessingPutCount.set(0); - } - - // Sleep for a while - if (elapsed == start) { - elapsed = EnvironmentEdgeManager.currentTime() - start; - } - if (elapsed < frequency) { - try { - Thread.sleep(frequency - elapsed); - } catch (InterruptedException e) { - LOG.warn("Interrupted while sleeping"); - Thread.currentThread().interrupt(); - } - } - } catch (Exception e) { - // Log all the exceptions and move on - LOG.debug("Caught some exceptions " + e - + " when flushing puts to region server " - + addr.getHostnamePort(), e); + // drain all the queued puts into the tmp list + processingList.clear(); + queue.drainTo(processingList); + if (processingList.size() == 0) { + // Nothing to flush + return; } + + currentProcessingCount.set(processingList.size()); + // failedCount is decreased whenever a Put is success or resubmit. + failedCount = processingList.size(); + + List> retainedActions = new ArrayList<>(processingList.size()); + MultiAction actions = new MultiAction<>(); + for (int i = 0; i < processingList.size(); i++) { + PutStatus putStatus = processingList.get(i); + Action action = new Action(putStatus.put, i); + actions.add(putStatus.regionInfo.getRegionName(), action); + retainedActions.add(action); + } + + // Process this multi-put request + List failed = null; + Object[] results = new Object[actions.size()]; + ServerName server = addr.getServerName(); + Map> actionsByServer = + Collections.singletonMap(server, actions); + try { + AsyncRequestFuture arf = + asyncProc.submitMultiActions(null, retainedActions, 0L, null, results, true, null, + null, actionsByServer, pool); + arf.waitUntilDone(); + if (arf.hasError()) { + // We just log and ignore the exception here since failed Puts will be resubmit again. + LOG.debug("Caught some exceptions when flushing puts to region server " + + addr.getHostnamePort(), arf.getErrors()); + } + } finally { + for (int i = 0; i < results.length; i++) { + if (results[i] == null) { + if (failed == null) { + failed = new ArrayList(); + } + failed.add(processingList.get(i)); + } else { + failedCount--; + } + } + } + + if (failed != null) { + // Resubmit failed puts + for (PutStatus putStatus : processingList) { + if (resubmitFailedPut(putStatus, this.addr)) { + failedCount--; + } + } + } + + long elapsed = EnvironmentEdgeManager.currentTime() - start; + // Update latency counters + averageLatency.add(elapsed); + if (elapsed > maxLatency.get()) { + maxLatency.set(elapsed); + } + + // Log some basic info + if (LOG.isDebugEnabled()) { + LOG.debug("Processed " + currentProcessingCount + " put requests for " + + addr.getHostnamePort() + " and " + failedCount + " failed" + + ", latency for this send: " + elapsed); + } + + // Reset the current processing put count + currentProcessingCount.set(0); + } catch (RuntimeException e) { + // To make findbugs happy + // Log all the exceptions and move on + LOG.debug( + "Caught some exceptions " + e + " when flushing puts to region server " + + addr.getHostnamePort(), e); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + // Log all the exceptions and move on + LOG.debug( + "Caught some exceptions " + e + " when flushing puts to region server " + + addr.getHostnamePort(), e); + } finally { + // Update the totalFailedCount + this.totalFailedPutCount.addAndGet(failedCount); } } }