diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index d6992333fb2..045885fa65e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -19,17 +19,13 @@ package org.apache.hadoop.hbase.client; -import java.io.IOException; +import com.google.common.annotations.VisibleForTesting; import java.io.InterruptedIOException; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -38,31 +34,39 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.RetryImmediatelyException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.backoff.ServerStatistics; -import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.Trace; -import com.google.common.annotations.VisibleForTesting; - /** * This class allows a continuous flow of requests. It's written to be compatible with a * synchronous caller such as HTable. @@ -126,6 +130,25 @@ class AsyncProcess { private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2; + /** + * The maximum size of single RegionServer. + */ + public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize"; + + /** + * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}. + */ + public static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304; + + /** + * The maximum size of submit. + */ + public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize"; + /** + * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}. + */ + public static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE; + /** * The context used to wait for results from one submit call. * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), @@ -208,7 +231,6 @@ class AsyncProcess { new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); protected final ConcurrentMap taskCounterPerServer = new ConcurrentHashMap(); - // Start configuration settings. private final int startLogErrorsCnt; @@ -217,6 +239,11 @@ class AsyncProcess { */ protected final int maxTotalConcurrentTasks; + /** + * The max heap size of all tasks simultaneously executed on a server. + */ + protected final long maxHeapSizePerRequest; + protected final long maxHeapSizeSubmit; /** * The number of tasks we run in parallel on a single region. * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start @@ -278,7 +305,6 @@ class AsyncProcess { addresses.addAll(other.addresses); } } - public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory, int rpcTimeout) { @@ -306,7 +332,9 @@ class AsyncProcess { HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); - + this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, + DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); + this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE); this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); @@ -321,7 +349,15 @@ class AsyncProcess { throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" + maxConcurrentTasksPerRegion); } + if (this.maxHeapSizePerRequest <= 0) { + throw new IllegalArgumentException("maxHeapSizePerServer=" + + maxHeapSizePerRequest); + } + if (this.maxHeapSizeSubmit <= 0) { + throw new IllegalArgumentException("maxHeapSizeSubmit=" + + maxHeapSizeSubmit); + } // Server tracker allows us to do faster, and yet useful (hopefully), retries. // However, if we are too useful, we might fail very quickly due to retry count limit. // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum @@ -356,16 +392,34 @@ class AsyncProcess { } throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); } - /** * See {@link #submit(ExecutorService, TableName, List, boolean, Batch.Callback, boolean)}. * Uses default ExecutorService for this AP (must have been created with one). */ - public AsyncRequestFuture submit(TableName tableName, List rows, + public AsyncRequestFuture submit(TableName tableName, final List rows, boolean atLeastOne, Batch.Callback callback, boolean needResults) throws InterruptedIOException { return submit(null, tableName, rows, atLeastOne, callback, needResults); } + /** + * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}. + * Uses default ExecutorService for this AP (must have been created with one). + */ + public AsyncRequestFuture submit(TableName tableName, + final RowAccess rows, boolean atLeastOne, Batch.Callback callback, + boolean needResults) throws InterruptedIOException { + return submit(null, tableName, rows, atLeastOne, callback, needResults); + } + /** + * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}. + * Uses the {@link ListRowAccess} to wrap the {@link List}. + */ + public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, + List rows, boolean atLeastOne, Batch.Callback callback, + boolean needResults) throws InterruptedIOException { + return submit(pool, tableName, new ListRowAccess(rows), atLeastOne, + callback, needResults); + } /** * Extract from the rows list what we can submit. The rows we can not submit are kept in the @@ -380,7 +434,7 @@ class AsyncProcess { * @param atLeastOne true if we should submit at least a subset. */ public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, - List rows, boolean atLeastOne, Batch.Callback callback, + RowAccess rows, boolean atLeastOne, Batch.Callback callback, boolean needResults) throws InterruptedIOException { if (rows.isEmpty()) { return NO_REQS_RESULT; @@ -396,16 +450,15 @@ class AsyncProcess { // Location errors that happen before we decide what requests to take. List locationErrors = null; List locationErrorRows = null; + RowCheckerHost checker = createRowCheckerHost(); + boolean firstIter = true; do { // Wait until there is at least one slot for a new task. waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString()); - - // Remember the previous decisions about regions or region servers we put in the - // final multi. - Map regionIncluded = new HashMap(); - Map serverIncluded = new HashMap(); - int posInList = -1; + if (!firstIter) { + checker.reset(); + } Iterator it = rows.iterator(); while (it.hasNext()) { Row r = it.next(); @@ -434,8 +487,12 @@ class AsyncProcess { it.remove(); break; // Backward compat: we stop considering actions on location error. } - - if (canTakeOperation(loc, regionIncluded, serverIncluded)) { + long rowSize = (r instanceof Mutation) ? ((Mutation) r).heapSize() : 0; + ReturnCode code = checker.canTakeOperation(loc, rowSize); + if (code == ReturnCode.END) { + break; + } + if (code == ReturnCode.INCLUDE) { Action action = new Action(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); @@ -445,6 +502,7 @@ class AsyncProcess { it.remove(); } } + firstIter = false; } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); if (retainedActions.isEmpty()) return NO_REQS_RESULT; @@ -453,6 +511,18 @@ class AsyncProcess { locationErrors, locationErrorRows, actionsByServer, pool); } + private RowCheckerHost createRowCheckerHost() { + return new RowCheckerHost(Arrays.asList( + new TaskCountChecker(maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, + taskCounterPerServer, + taskCounterPerRegion) + , new RequestSizeChecker(maxHeapSizePerRequest) + , new SubmittedSizeChecker(maxHeapSizeSubmit) + )); + } AsyncRequestFuture submitMultiActions(TableName tableName, List> retainedActions, long nonceGroup, Batch.Callback callback, Object[] results, boolean needResults, List locationErrors, @@ -494,74 +564,6 @@ class AsyncProcess { multiAction.add(regionName, action); } - - /** - * Check if we should send new operations to this region or region server. - * We're taking into account the past decision; if we have already accepted - * operation on a given region, we accept all operations for this region. - * - * @param loc; the region and the server name we want to use. - * @return true if this region is considered as busy. - */ - protected boolean canTakeOperation(HRegionLocation loc, - Map regionsIncluded, - Map serversIncluded) { - HRegionInfo regionInfo = loc.getRegionInfo(); - Boolean regionPrevious = regionsIncluded.get(regionInfo); - - if (regionPrevious != null) { - // We already know what to do with this region. - return regionPrevious; - } - - Boolean serverPrevious = serversIncluded.get(loc.getServerName()); - if (Boolean.FALSE.equals(serverPrevious)) { - // It's a new region, on a region server that we have already excluded. - regionsIncluded.put(regionInfo, Boolean.FALSE); - return false; - } - - AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); - if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { - // Too many tasks on this region already. - regionsIncluded.put(regionInfo, Boolean.FALSE); - return false; - } - - if (serverPrevious == null) { - // The region is ok, but we need to decide for this region server. - int newServers = 0; // number of servers we're going to contact so far - for (Map.Entry kv : serversIncluded.entrySet()) { - if (kv.getValue()) { - newServers++; - } - } - - // Do we have too many total tasks already? - boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks; - - if (ok) { - // If the total is fine, is it ok for this individual server? - AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); - ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer); - } - - if (!ok) { - regionsIncluded.put(regionInfo, Boolean.FALSE); - serversIncluded.put(loc.getServerName(), Boolean.FALSE); - return false; - } - - serversIncluded.put(loc.getServerName(), Boolean.TRUE); - } else { - assert serverPrevious.equals(Boolean.TRUE); - } - - regionsIncluded.put(regionInfo, Boolean.TRUE); - - return true; - } - /** * See {@link #submitAll(ExecutorService, TableName, List, Batch.Callback, Object[])}. * Uses default ExecutorService for this AP (must have been created with one). @@ -740,7 +742,7 @@ class AsyncProcess { private final int numAttempt; private final ServerName server; private final Set callsInProgress; - + private Long heapSize = null; private SingleServerRequestRunnable( MultiAction multiAction, int numAttempt, ServerName server, Set callsInProgress) { @@ -750,6 +752,24 @@ class AsyncProcess { this.callsInProgress = callsInProgress; } + @VisibleForTesting + long heapSize() { + if (heapSize != null) { + return heapSize; + } + heapSize = 0L; + for (Map.Entry>> e: this.multiAction.actions.entrySet()) { + List> actions = e.getValue(); + for (Action action: actions) { + Row row = action.getAction(); + if (row instanceof Mutation) { + heapSize += ((Mutation) row).heapSize(); + } + } + } + return heapSize; + } + @Override public void run() { MultiResponse res; @@ -831,7 +851,7 @@ class AsyncProcess { private final long nonceGroup; private CancellableRegionServerCallable currentCallable; private int currentCallTotalTimeout; - + private final Map> heapSizesByServer = new HashMap<>(); public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, Batch.Callback callback, CancellableRegionServerCallable callable, int timeout) { @@ -910,7 +930,21 @@ class AsyncProcess { public Set getCallsInProgress() { return callsInProgress; } + @VisibleForTesting + Map> getRequestHeapSize() { + return heapSizesByServer; + } + private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server, + SingleServerRequestRunnable runnable) { + List heapCount = heapSizesByServer.get(server); + if (heapCount == null) { + heapCount = new LinkedList<>(); + heapSizesByServer.put(server, heapCount); + } + heapCount.add(runnable.heapSize()); + return runnable; + } /** * Group a list of actions per region servers, and send them. * @@ -1080,8 +1114,9 @@ class AsyncProcess { if (connection.getConnectionMetrics() != null) { connection.getConnectionMetrics().incrNormalRunners(); } - return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress))); + SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, + new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); + return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); } // group the actions by the amount of delay @@ -1102,9 +1137,8 @@ class AsyncProcess { List toReturn = new ArrayList(actions.size()); for (DelayingRunner runner : actions.values()) { String traceText = "AsyncProcess.sendMultiAction"; - Runnable runnable = - new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, - callsInProgress); + Runnable runnable = addSingleServerRequestHeapSize(server, + new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); // use a delay runner only if we need to sleep for some time if (runner.getSleepTime() > 0) { runner.setRunner(runnable); @@ -1941,4 +1975,284 @@ class AsyncProcess { NO_RETRIES_EXHAUSTED, NO_OTHER_SUCCEEDED } + + /** + * Collect all advices from checkers and make the final decision. + */ + @VisibleForTesting + static class RowCheckerHost { + private final List checkers; + private boolean isEnd = false; + RowCheckerHost(final List checkers) { + this.checkers = checkers; + } + void reset() throws InterruptedIOException { + isEnd = false; + InterruptedIOException e = null; + for (RowChecker checker : checkers) { + try { + checker.reset(); + } catch (InterruptedIOException ex) { + e = ex; + } + } + if (e != null) { + throw e; + } + } + ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + if (isEnd) { + return ReturnCode.END; + } + ReturnCode code = ReturnCode.INCLUDE; + for (RowChecker checker : checkers) { + switch (checker.canTakeOperation(loc, rowSize)) { + case END: + isEnd = true; + code = ReturnCode.END; + break; + case SKIP: + code = ReturnCode.SKIP; + break; + case INCLUDE: + default: + break; + } + if (code == ReturnCode.END) { + break; + } + } + for (RowChecker checker : checkers) { + checker.notifyFinal(code, loc, rowSize); + } + return code; + } + } + + /** + * Provide a way to control the flow of rows iteration. + */ + @VisibleForTesting + interface RowChecker { + enum ReturnCode { + /** + * Accept current row. + */ + INCLUDE, + /** + * Skip current row. + */ + SKIP, + /** + * No more row can be included. + */ + END + }; + ReturnCode canTakeOperation(HRegionLocation loc, long rowSize); + /** + * Add the final ReturnCode to the checker. + * The ReturnCode may be reversed, so the checker need the final decision to update + * the inner state. + */ + void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize); + /** + * Reset the inner state. + */ + void reset() throws InterruptedIOException ; + } + + /** + * limit the heapsize of total submitted data. + * Reduce the limit of heapsize for submitting quickly + * if there is no running task. + */ + @VisibleForTesting + static class SubmittedSizeChecker implements RowChecker { + private final long maxHeapSizeSubmit; + private long heapSize = 0; + SubmittedSizeChecker(final long maxHeapSizeSubmit) { + this.maxHeapSizeSubmit = maxHeapSizeSubmit; + } + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + if (heapSize >= maxHeapSizeSubmit) { + return ReturnCode.END; + } + return ReturnCode.INCLUDE; + } + + @Override + public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) { + if (code == ReturnCode.INCLUDE) { + heapSize += rowSize; + } + } + + @Override + public void reset() { + heapSize = 0; + } + } + /** + * limit the max number of tasks in an AsyncProcess. + */ + @VisibleForTesting + static class TaskCountChecker implements RowChecker { + private static final long MAX_WAITING_TIME = 1000; //ms + private final Set regionsIncluded = new HashSet<>(); + private final Set serversIncluded = new HashSet<>(); + private final int maxConcurrentTasksPerRegion; + private final int maxTotalConcurrentTasks; + private final int maxConcurrentTasksPerServer; + private final Map taskCounterPerRegion; + private final Map taskCounterPerServer; + private final Set busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + private final AtomicLong tasksInProgress; + TaskCountChecker(final int maxTotalConcurrentTasks, + final int maxConcurrentTasksPerServer, + final int maxConcurrentTasksPerRegion, + final AtomicLong tasksInProgress, + final Map taskCounterPerServer, + final Map taskCounterPerRegion) { + this.maxTotalConcurrentTasks = maxTotalConcurrentTasks; + this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion; + this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer; + this.taskCounterPerRegion = taskCounterPerRegion; + this.taskCounterPerServer = taskCounterPerServer; + this.tasksInProgress = tasksInProgress; + } + @Override + public void reset() throws InterruptedIOException { + // prevent the busy-waiting + waitForRegion(); + regionsIncluded.clear(); + serversIncluded.clear(); + busyRegions.clear(); + } + private void waitForRegion() throws InterruptedIOException { + if (busyRegions.isEmpty()) { + return; + } + EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); + final long start = ee.currentTime(); + while ((ee.currentTime() - start) <= MAX_WAITING_TIME) { + for (byte[] region : busyRegions) { + AtomicInteger count = taskCounterPerRegion.get(region); + if (count == null || count.get() < maxConcurrentTasksPerRegion) { + return; + } + } + try { + synchronized (tasksInProgress) { + tasksInProgress.wait(10); + } + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted." + + " tasksInProgress=" + tasksInProgress); + } + } + } + /** + * 1) check the regions is allowed. + * 2) check the concurrent tasks for regions. + * 3) check the total concurrent tasks. + * 4) check the concurrent tasks for server. + * @param loc + * @param rowSize + * @return + */ + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + + HRegionInfo regionInfo = loc.getRegionInfo(); + if (regionsIncluded.contains(regionInfo)) { + // We already know what to do with this region. + return ReturnCode.INCLUDE; + } + AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); + if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { + // Too many tasks on this region already. + return ReturnCode.SKIP; + } + int newServers = serversIncluded.size() + + (serversIncluded.contains(loc.getServerName()) ? 0 : 1); + if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) { + // Too many tasks. + return ReturnCode.SKIP; + } + AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); + if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) { + // Too many tasks for this individual server + return ReturnCode.SKIP; + } + return ReturnCode.INCLUDE; + } + + @Override + public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) { + if (code == ReturnCode.INCLUDE) { + regionsIncluded.add(loc.getRegionInfo()); + serversIncluded.add(loc.getServerName()); + } + busyRegions.add(loc.getRegionInfo().getRegionName()); + } + } + + /** + * limit the request size for each regionserver. + */ + @VisibleForTesting + static class RequestSizeChecker implements RowChecker { + private final long maxHeapSizePerRequest; + private final Map serverRequestSizes = new HashMap<>(); + RequestSizeChecker(final long maxHeapSizePerRequest) { + this.maxHeapSizePerRequest = maxHeapSizePerRequest; + } + @Override + public void reset() { + serverRequestSizes.clear(); + } + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + // Is it ok for limit of request size? + long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ? + serverRequestSizes.get(loc.getServerName()) : 0L; + // accept at least one request + if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) { + return ReturnCode.INCLUDE; + } + return ReturnCode.SKIP; + } + + @Override + public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) { + if (code == ReturnCode.INCLUDE) { + long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ? + serverRequestSizes.get(loc.getServerName()) : 0L; + serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize); + } + } + } + + public static class ListRowAccess implements RowAccess { + private final List data; + ListRowAccess(final List data) { + this.data = data; + } + + @Override + public int size() { + return data.size(); + } + + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + + @Override + public Iterator iterator() { + return data.iterator(); + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 39e4f75c6bf..2d4c8b36c8c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -28,11 +28,13 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import java.io.IOException; import java.io.InterruptedIOException; import java.util.Arrays; -import java.util.LinkedList; +import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** @@ -69,6 +71,12 @@ public class BufferedMutatorImpl implements BufferedMutator { @VisibleForTesting AtomicLong currentWriteBufferSize = new AtomicLong(0); + /** + * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. + * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation. + */ + @VisibleForTesting + AtomicInteger undealtMutationCount = new AtomicInteger(0); private long writeBufferSize; private final int maxKeyValueSize; private boolean closed = false; @@ -129,11 +137,13 @@ public class BufferedMutatorImpl implements BufferedMutator { } long toAddSize = 0; + int toAddCount = 0; for (Mutation m : ms) { if (m instanceof Put) { validatePut((Put) m); } toAddSize += m.heapSize(); + ++toAddCount; } // This behavior is highly non-intuitive... it does not protect us against @@ -142,14 +152,17 @@ public class BufferedMutatorImpl implements BufferedMutator { if (ap.hasError()) { currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); + undealtMutationCount.addAndGet(toAddCount); backgroundFlushCommits(true); } else { currentWriteBufferSize.addAndGet(toAddSize); writeAsyncBuffer.addAll(ms); + undealtMutationCount.addAndGet(toAddCount); } // Now try and queue what needs to be queued. - while (currentWriteBufferSize.get() > writeBufferSize) { + while (undealtMutationCount.get() != 0 + && currentWriteBufferSize.get() > writeBufferSize) { backgroundFlushCommits(false); } } @@ -208,58 +221,41 @@ public class BufferedMutatorImpl implements BufferedMutator { private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException { + if (!synchronous && writeAsyncBuffer.isEmpty()) { + return; + } - LinkedList buffer = new LinkedList<>(); - // Keep track of the size so that this thread doesn't spin forever - long dequeuedSize = 0; - - try { - // Grab all of the available mutations. - Mutation m; - - // If there's no buffer size drain everything. If there is a buffersize drain up to twice - // that amount. This should keep the loop from continually spinning if there are threads - // that keep adding more data to the buffer. - while ( - (writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || synchronous) - && (m = writeAsyncBuffer.poll()) != null) { - buffer.add(m); - long size = m.heapSize(); - dequeuedSize += size; - currentWriteBufferSize.addAndGet(-size); - } - - if (!synchronous && dequeuedSize == 0) { - return; - } - - if (!synchronous) { - ap.submit(tableName, buffer, true, null, false); + if (!synchronous) { + QueueRowAccess taker = new QueueRowAccess(); + try { + ap.submit(tableName, taker, true, null, false); if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); } + } finally { + taker.restoreRemainder(); } - if (synchronous || ap.hasError()) { - while (!buffer.isEmpty()) { - ap.submit(tableName, buffer, true, null, false); - } - RetriesExhaustedWithDetailsException error = - ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString()); - if (error != null) { - if (listener == null) { - throw error; - } else { - this.listener.onException(error, this); - } + } + if (synchronous || ap.hasError()) { + QueueRowAccess taker = new QueueRowAccess(); + try { + while (!taker.isEmpty()) { + ap.submit(tableName, taker, true, null, false); + taker.reset(); } + } finally { + taker.restoreRemainder(); } - } finally { - for (Mutation mut : buffer) { - long size = mut.heapSize(); - currentWriteBufferSize.addAndGet(size); - dequeuedSize -= size; - writeAsyncBuffer.add(mut); + + RetriesExhaustedWithDetailsException error = + ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString()); + if (error != null) { + if (listener == null) { + throw error; + } else { + this.listener.onException(error, this); + } } } } @@ -285,4 +281,67 @@ public class BufferedMutatorImpl implements BufferedMutator { public long getWriteBufferSize() { return this.writeBufferSize; } + + private class QueueRowAccess implements RowAccess { + private int remainder = undealtMutationCount.getAndSet(0); + + void reset() { + restoreRemainder(); + remainder = undealtMutationCount.getAndSet(0); + } + + @Override + public Iterator iterator() { + return new Iterator() { + private final Iterator iter = writeAsyncBuffer.iterator(); + private int countDown = remainder; + private Mutation last = null; + @Override + public boolean hasNext() { + if (countDown <= 0) { + return false; + } + return iter.hasNext(); + } + @Override + public Row next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + last = iter.next(); + if (last == null) { + throw new NoSuchElementException(); + } + --countDown; + return last; + } + @Override + public void remove() { + if (last == null) { + throw new IllegalStateException(); + } + iter.remove(); + currentWriteBufferSize.addAndGet(-last.heapSize()); + --remainder; + } + }; + } + + @Override + public int size() { + return remainder; + } + + void restoreRemainder() { + if (remainder > 0) { + undealtMutationCount.addAndGet(remainder); + remainder = 0; + } + } + + @Override + public boolean isEmpty() { + return remainder <= 0; + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java new file mode 100644 index 00000000000..788f1a469d5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Provide a way to access the inner buffer. + * The purpose is to reduce the elapsed time to move a large number + * of elements between collections. + * @param + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +@VisibleForTesting +interface RowAccess extends Iterable { + /** + * @return true if there are no elements. + */ + boolean isEmpty(); + + /** + * @return the number of elements in this list. + */ + int size(); +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 59590785db5..516f2cf3982 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -25,8 +25,11 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.BlockingQueue; @@ -57,6 +60,12 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; +import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl; +import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess; +import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker; +import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; +import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost; +import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -65,13 +74,34 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; -import static org.junit.Assert.assertTrue; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.mockito.Mockito; +import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker; +import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @Category({ClientTests.class, MediumTests.class}) public class TestAsyncProcess { @@ -180,6 +210,13 @@ public class TestAsyncProcess { new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); } + @Override + public AsyncRequestFuture submit(TableName tableName, RowAccess rows, + boolean atLeastOne, Callback callback, boolean needResults) + throws InterruptedIOException { + // We use results in tests to check things, so override to always save them. + return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); + } @Override public AsyncRequestFuture submit(TableName tableName, List rows, boolean atLeastOne, Callback callback, boolean needResults) @@ -435,7 +472,186 @@ public class TestAsyncProcess { return null; } } + @Test + public void testListRowAccess() { + int count = 10; + List values = new LinkedList<>(); + for (int i = 0; i != count; ++i) { + values.add(String.valueOf(i)); + } + ListRowAccess taker = new ListRowAccess(values); + assertEquals(count, taker.size()); + + int restoreCount = 0; + int takeCount = 0; + Iterator it = taker.iterator(); + while (it.hasNext()) { + String v = it.next(); + assertEquals(String.valueOf(takeCount), v); + ++takeCount; + it.remove(); + if (Math.random() >= 0.5) { + break; + } + } + assertEquals(count, taker.size() + takeCount); + + it = taker.iterator(); + while (it.hasNext()) { + String v = it.next(); + assertEquals(String.valueOf(takeCount), v); + ++takeCount; + it.remove(); + } + assertEquals(0, taker.size()); + assertEquals(count, takeCount); + } + private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) { + if (putSizePerServer <= maxHeapSizePerRequest) { + return 1; + } else if (putSizePerServer % maxHeapSizePerRequest == 0) { + return putSizePerServer / maxHeapSizePerRequest; + } else { + return putSizePerServer / maxHeapSizePerRequest + 1; + } + } + + @Test + public void testSubmitSameSizeOfRequest() throws Exception { + long writeBuffer = 2 * 1024 * 1024; + long putsHeapSize = writeBuffer; + doSubmitRequest(writeBuffer, putsHeapSize); + } + @Test + public void testIllegalArgument() throws IOException { + ClusterConnection conn = createHConnection(); + final long maxHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, + AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); + conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1); + try { + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + fail("The maxHeapSizePerRequest must be bigger than zero"); + } catch (IllegalArgumentException e) { + } + conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest); + } + @Test + public void testSubmitLargeRequestWithUnlimitedSize() throws Exception { + long maxHeapSizePerRequest = Long.MAX_VALUE; + long putsHeapSize = 2 * 1024 * 1024; + doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); + } + + @Test(timeout=300000) + public void testSubmitRandomSizeRequest() throws Exception { + Random rn = new Random(); + final long limit = 10 * 1024 * 1024; + for (int count = 0; count != 2; ++count) { + long maxHeapSizePerRequest = Math.max(1, (Math.abs(rn.nextLong()) % limit)); + long putsHeapSize = Math.max(1, (Math.abs(rn.nextLong()) % limit)); + LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest + ", putsHeapSize=" + putsHeapSize); + doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); + } + } + + @Test + public void testSubmitSmallRequest() throws Exception { + long maxHeapSizePerRequest = 2 * 1024 * 1024; + long putsHeapSize = 100; + doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); + } + + @Test(timeout=120000) + public void testSubmitLargeRequest() throws Exception { + long maxHeapSizePerRequest = 2 * 1024 * 1024; + long putsHeapSize = maxHeapSizePerRequest * 2; + doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); + } + + private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception { + ClusterConnection conn = createHConnection(); + final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, + AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); + conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest); + BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); + + // sn has two regions + long putSizeSN = 0; + long putSizeSN2 = 0; + List puts = new ArrayList<>(); + while ((putSizeSN + putSizeSN2) <= putsHeapSize) { + Put put1 = new Put(DUMMY_BYTES_1); + put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); + Put put2 = new Put(DUMMY_BYTES_2); + put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); + Put put3 = new Put(DUMMY_BYTES_3); + put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3); + putSizeSN += (put1.heapSize() + put2.heapSize()); + putSizeSN2 += put3.heapSize(); + puts.add(put1); + puts.add(put2); + puts.add(put3); + } + + int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest); + int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest); + LOG.info("Total put count:" + puts.size() + ", putSizeSN:"+ putSizeSN + ", putSizeSN2:" + putSizeSN2 + + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest + + ", minCountSnRequest:" + minCountSnRequest + + ", minCountSn2Request:" + minCountSn2Request); + try (HTable ht = new HTable(conn, bufferParam)) { + MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); + ht.mutator.ap = ap; + + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + ht.put(puts); + List reqs = ap.allReqs; + + int actualSnReqCount = 0; + int actualSn2ReqCount = 0; + for (AsyncRequestFuture req : reqs) { + if (!(req instanceof AsyncRequestFutureImpl)) { + continue; + } + AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + if (ars.getRequestHeapSize().containsKey(sn)) { + ++actualSnReqCount; + } + if (ars.getRequestHeapSize().containsKey(sn2)) { + ++actualSn2ReqCount; + } + } + // If the server is busy, the actual count may be incremented. + assertEquals(true, minCountSnRequest <= actualSnReqCount); + assertEquals(true, minCountSn2Request <= actualSn2ReqCount); + Map sizePerServers = new HashMap<>(); + for (AsyncRequestFuture req : reqs) { + if (!(req instanceof AsyncRequestFutureImpl)) { + continue; + } + AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + Map> requestHeapSize = ars.getRequestHeapSize(); + for (Map.Entry> entry : requestHeapSize.entrySet()) { + long sum = 0; + for (long size : entry.getValue()) { + assertEquals(true, size <= maxHeapSizePerRequest); + sum += size; + } + assertEquals(true, sum <= maxHeapSizePerRequest); + long value = sizePerServers.containsKey(entry.getKey()) ? sizePerServers.get(entry.getKey()) : 0L; + sizePerServers.put(entry.getKey(), value + sum); + } + } + assertEquals(true, sizePerServers.containsKey(sn)); + assertEquals(true, sizePerServers.containsKey(sn2)); + assertEquals(false, sizePerServers.containsKey(sn3)); + assertEquals(putSizeSN, (long) sizePerServers.get(sn)); + assertEquals(putSizeSN2, (long) sizePerServers.get(sn2)); + } + // restore config. + conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, defaultHeapSizePerRequest); + } @Test public void testSubmit() throws Exception { ClusterConnection hc = createHConnection(); @@ -477,7 +693,9 @@ public class TestAsyncProcess { List puts = new ArrayList(); puts.add(createPut(1, true)); - ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); + for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) { + ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); + } ap.submit(DUMMY_TABLE, puts, false, null, false); Assert.assertEquals(puts.size(), 1); @@ -538,7 +756,7 @@ public class TestAsyncProcess { public void testSubmitTrue() throws IOException { final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); ap.tasksInProgress.incrementAndGet(); - final AtomicInteger ai = new AtomicInteger(1); + final AtomicInteger ai = new AtomicInteger(ap.maxConcurrentTasksPerRegion); ap.taskCounterPerRegion.put(hri1.getRegionName(), ai); final AtomicBoolean checkPoint = new AtomicBoolean(false); @@ -672,6 +890,8 @@ public class TestAsyncProcess { setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); + Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) + .thenReturn(Arrays.asList(loc1, loc2, loc3)); setMockLocation(hc, FAILS, new RegionLocations(loc2)); return hc; } @@ -681,6 +901,18 @@ public class TestAsyncProcess { setMockLocation(hc, DUMMY_BYTES_1, hrls1); setMockLocation(hc, DUMMY_BYTES_2, hrls2); setMockLocation(hc, DUMMY_BYTES_3, hrls3); + List locations = new ArrayList(); + for (HRegionLocation loc : hrls1.getRegionLocations()) { + locations.add(loc); + } + for (HRegionLocation loc : hrls2.getRegionLocations()) { + locations.add(loc); + } + for (HRegionLocation loc : hrls3.getRegionLocations()) { + locations.add(loc); + } + Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) + .thenReturn(locations); return hc; } @@ -688,6 +920,8 @@ public class TestAsyncProcess { RegionLocations result) throws IOException { Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); + Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), + Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result); } private static ClusterConnection createHConnectionCommon() { @@ -788,6 +1022,195 @@ public class TestAsyncProcess { Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size()); } + @Test + public void testTaskCheckerHost() throws IOException { + final int maxTotalConcurrentTasks = 100; + final int maxConcurrentTasksPerServer = 2; + final int maxConcurrentTasksPerRegion = 1; + final AtomicLong tasksInProgress = new AtomicLong(0); + final Map taskCounterPerServer = new HashMap<>(); + final Map taskCounterPerRegion = new HashMap<>(); + TaskCountChecker countChecker = new TaskCountChecker( + maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, taskCounterPerServer, taskCounterPerRegion); + final long maxHeapSizePerRequest = 2 * 1024 * 1024; + // unlimiited + RequestSizeChecker sizeChecker = new RequestSizeChecker(maxHeapSizePerRequest); + RowCheckerHost checkerHost = new RowCheckerHost(Arrays.asList(countChecker, sizeChecker)); + + ReturnCode loc1Code = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, loc1Code); + + ReturnCode loc1Code_2 = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest); + // rejected for size + assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc1Code_2); + + ReturnCode loc2Code = checkerHost.canTakeOperation(loc2, maxHeapSizePerRequest); + // rejected for size + assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc2Code); + + // fill the task slots for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100)); + + ReturnCode loc3Code = checkerHost.canTakeOperation(loc3, 1L); + // rejected for count + assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc3Code); + + // release the task slots for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0)); + + ReturnCode loc3Code_2 = checkerHost.canTakeOperation(loc3, 1L); + assertEquals(RowChecker.ReturnCode.INCLUDE, loc3Code_2); + } + + @Test + public void testRequestSizeCheckerr() throws IOException { + final long maxHeapSizePerRequest = 2 * 1024 * 1024; + final ClusterConnection conn = createHConnection(); + RequestSizeChecker checker = new RequestSizeChecker(maxHeapSizePerRequest); + + // inner state is unchanged. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(loc2, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + } + + // accept the data located on loc1 region. + ReturnCode acceptCode = checker.canTakeOperation(loc1, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, acceptCode); + checker.notifyFinal(acceptCode, loc1, maxHeapSizePerRequest); + + // the sn server reachs the limit. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest); + assertNotEquals(RowChecker.ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(loc2, maxHeapSizePerRequest); + assertNotEquals(RowChecker.ReturnCode.INCLUDE, code); + } + + // the request to sn2 server should be accepted. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc3, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + } + + checker.reset(); + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + code = checker.canTakeOperation(loc2, maxHeapSizePerRequest); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + } + } + + @Test + public void testSubmittedSizeChecker() { + final long maxHeapSizeSubmit = 2 * 1024 * 1024; + SubmittedSizeChecker checker = new SubmittedSizeChecker(maxHeapSizeSubmit); + + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(loc1, 100000); + assertEquals(ReturnCode.INCLUDE, include); + } + + for (int i = 0; i != 10; ++i) { + checker.notifyFinal(ReturnCode.INCLUDE, loc1, maxHeapSizeSubmit); + } + + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(loc1, 100000); + assertEquals(ReturnCode.END, include); + } + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(loc2, 100000); + assertEquals(ReturnCode.END, include); + } + checker.reset(); + for (int i = 0; i != 10; ++i) { + ReturnCode include = checker.canTakeOperation(loc1, 100000); + assertEquals(ReturnCode.INCLUDE, include); + } + } + @Test + public void testTaskCountChecker() throws InterruptedIOException { + long rowSize = 12345; + int maxTotalConcurrentTasks = 100; + int maxConcurrentTasksPerServer = 2; + int maxConcurrentTasksPerRegion = 1; + AtomicLong tasksInProgress = new AtomicLong(0); + Map taskCounterPerServer = new HashMap<>(); + Map taskCounterPerRegion = new HashMap<>(); + TaskCountChecker checker = new TaskCountChecker( + maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, taskCounterPerServer, taskCounterPerRegion); + + // inner state is unchanged. + for (int i = 0; i != 10; ++i) { + ReturnCode code = checker.canTakeOperation(loc1, rowSize); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + } + // add loc1 region. + ReturnCode code = checker.canTakeOperation(loc1, rowSize); + assertEquals(RowChecker.ReturnCode.INCLUDE, code); + checker.notifyFinal(code, loc1, rowSize); + + // fill the task slots for loc1. + taskCounterPerRegion.put(loc1.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(loc1.getServerName(), new AtomicInteger(100)); + + // the region was previously accepted, so it must be accpted now. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize); + assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode); + checker.notifyFinal(includeCode, loc1, rowSize); + } + + // fill the task slots for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100)); + + // no task slots. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode excludeCode = checker.canTakeOperation(loc3, rowSize); + assertNotEquals(RowChecker.ReturnCode.INCLUDE, excludeCode); + checker.notifyFinal(excludeCode, loc3, rowSize); + } + + // release the tasks for loc3. + taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0)); + taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0)); + + // add loc3 region. + ReturnCode code3 = checker.canTakeOperation(loc3, rowSize); + assertEquals(RowChecker.ReturnCode.INCLUDE, code3); + checker.notifyFinal(code3, loc3, rowSize); + + // the region was previously accepted, so it must be accpted now. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode includeCode = checker.canTakeOperation(loc3, rowSize); + assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode); + checker.notifyFinal(includeCode, loc3, rowSize); + } + + checker.reset(); + // the region was previously accepted, + // but checker have reseted and task slots for loc1 is full. + // So it must be rejected now. + for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { + ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize); + assertNotEquals(RowChecker.ReturnCode.INCLUDE, includeCode); + checker.notifyFinal(includeCode, loc1, rowSize); + } + } + @Test public void testBatch() throws IOException, InterruptedException { ClusterConnection conn = new MyConnectionImpl(conf); @@ -818,7 +1241,6 @@ public class TestAsyncProcess { Assert.assertEquals(res[5], success); Assert.assertEquals(res[6], failure); } - @Test public void testErrorsServers() throws IOException { Configuration configuration = new Configuration(conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java index e771a9256c5..e84b9e45ee4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java @@ -69,7 +69,7 @@ import com.google.protobuf.Service; * mrmBuilder.addMutationRequest(m1); * mrmBuilder.addMutationRequest(m2); * CoprocessorRpcChannel channel = t.coprocessorService(ROW); - * MultiRowMutationService.BlockingInterface service = + * MultiRowMutationService.BlockingInterface service = * MultiRowMutationService.newBlockingStub(channel); * MutateRowsRequest mrm = mrmBuilder.build(); * service.mutateRows(null, mrm);