From dad90f6cce5bc51e43e3778068d5e45bcb1c9de0 Mon Sep 17 00:00:00 2001
From: Chia-Ping Tsai
Date: Sat, 17 Feb 2018 07:16:14 +0800
Subject: [PATCH] HBASE-19680 BufferedMutatorImpl#mutate should wait the result
from AP in order to throw the failed mutations
---
.../hadoop/hbase/client/AsyncProcess.java | 54 +---
.../hbase/client/AsyncRequestFuture.java | 6 +-
.../hbase/client/AsyncRequestFutureImpl.java | 3 +-
.../hbase/client/BufferedMutatorImpl.java | 249 ++++++++----------
.../client/ConnectionImplementation.java | 2 +-
.../apache/hadoop/hbase/client/HTable.java | 7 +-
.../hbase/client/HTableMultiplexer.java | 2 +-
.../apache/hadoop/hbase/client/RowAccess.java | 4 +-
.../hadoop/hbase/client/TestAsyncProcess.java | 90 +++----
.../TestAsyncProcessWithRegionException.java | 2 +-
.../client/HConnectionTestingUtility.java | 2 +-
11 files changed, 175 insertions(+), 246 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 6c4118c5132..de7449bf285 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -65,17 +65,12 @@ import org.apache.hadoop.hbase.util.Bytes;
* The class manages internally the retries.
*
*
- * The class can be constructed in regular mode, or "global error" mode. In global error mode,
- * AP tracks errors across all calls (each "future" also has global view of all errors). That
- * mode is necessary for backward compat with HTable behavior, where multiple submissions are
- * made and the errors can propagate using any put/flush call, from previous calls.
- * In "regular" mode, the errors are tracked inside the Future object that is returned.
+ * The errors are tracked inside the Future object that is returned.
* The results are always tracked inside the Future object and can be retrieved when the call
* has finished. Partial results can also be retrieved if some part of multi-request failed.
*
*
- * This class is thread safe in regular mode; in global error code, submitting operations and
- * retrieving errors from different threads may be not thread safe.
+ * This class is thread safe.
* Internally, the class is thread safe enough to manage simultaneously new submission and results
* arising from older operations.
*
@@ -144,7 +139,6 @@ class AsyncProcess {
final ClusterConnection connection;
private final RpcRetryingCallerFactory rpcCallerFactory;
final RpcControllerFactory rpcFactory;
- final BatchErrors globalErrors;
// Start configuration settings.
final int startLogErrorsCnt;
@@ -168,14 +162,12 @@ class AsyncProcess {
private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
private final int periodToLog;
AsyncProcess(ClusterConnection hc, Configuration conf,
- RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
- RpcControllerFactory rpcFactory) {
+ RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
if (hc == null) {
throw new IllegalArgumentException("ClusterConnection cannot be null.");
}
this.connection = hc;
- this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
this.id = COUNTER.incrementAndGet();
@@ -445,10 +437,10 @@ class AsyncProcess {
private Consumer getLogger(TableName tableName, long max) {
return (currentInProgress) -> {
- LOG.info("#" + id + (max < 0 ? ", waiting for any free slot"
- : ", waiting for some tasks to finish. Expected max="
- + max) + ", tasksInProgress=" + currentInProgress +
- " hasError=" + hasError() + (tableName == null ? "" : ", tableName=" + tableName));
+ LOG.info("#" + id + (max < 0 ?
+ ", waiting for any free slot" :
+ ", waiting for some tasks to finish. Expected max=" + max) + ", tasksInProgress="
+ + currentInProgress + (tableName == null ? "" : ", tableName=" + tableName));
};
}
@@ -460,38 +452,6 @@ class AsyncProcess {
void decTaskCounters(Collection regions, ServerName sn) {
requestController.decTaskCounters(regions, sn);
}
- /**
- * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
- * @return Whether there were any errors in any request since the last time
- * {@link #waitForAllPreviousOpsAndReset(List, TableName)} was called, or AP was created.
- */
- public boolean hasError() {
- return globalErrors != null && globalErrors.hasErrors();
- }
-
- /**
- * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
- * Waits for all previous operations to finish, and returns errors and (optionally)
- * failed operations themselves.
- * @param failedRows an optional list into which the rows that failed since the last time
- * {@link #waitForAllPreviousOpsAndReset(List, TableName)} was called, or AP was created, are saved.
- * @param tableName name of the table
- * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List, TableName)}
- * was called, or AP was created.
- */
- public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
- List failedRows, TableName tableName) throws InterruptedIOException {
- waitForMaximumCurrentTasks(0, tableName);
- if (globalErrors == null || !globalErrors.hasErrors()) {
- return null;
- }
- if (failedRows != null) {
- failedRows.addAll(globalErrors.actions);
- }
- RetriesExhaustedWithDetailsException result = globalErrors.makeException(logBatchErrorDetails);
- globalErrors.clear();
- return result;
- }
/**
* Create a caller. Isolated to be easily overridden in the tests.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java
index 90bd235dcba..b91e094d340 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java
@@ -24,10 +24,8 @@ import java.io.InterruptedIOException;
import java.util.List;
/**
- * The context used to wait for results from one submit call.
- * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
- * then errors and failed operations in this object will reflect global errors.
- * 2) If submit call is made with needResults false, results will not be saved.
+ * The context used to wait for results from one submit call. If submit call is made with
+ * needResults false, results will not be saved.
* @since 2.0.0
*/
@InterfaceAudience.Private
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index ace74f97ea0..a8b8ebfa6a2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -388,8 +388,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture {
new ConcurrentHashMap());
this.asyncProcess = asyncProcess;
this.errorsByServer = createServerErrorTracker();
- this.errors = (asyncProcess.globalErrors != null)
- ? asyncProcess.globalErrors : new BatchErrors();
+ this.errors = new BatchErrors();
this.operationTimeout = task.getOperationTimeout();
this.rpcTimeout = task.getRpcTimeout();
this.currentCallable = task.getCallable();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index b171fc404d3..9d24b4daa90 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -16,8 +16,11 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET;
+
+import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -141,7 +144,13 @@ public class BufferedMutatorImpl implements BufferedMutator {
RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
this(conn, params,
// puts need to track errors globally due to how the APIs currently work.
- new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, true, rpcFactory));
+ new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory));
+ }
+
+ private void checkClose() {
+ if (closed) {
+ throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
+ }
}
@VisibleForTesting
@@ -173,16 +182,13 @@ public class BufferedMutatorImpl implements BufferedMutator {
@Override
public void mutate(List extends Mutation> ms) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
-
- if (closed) {
- throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
- }
+ checkClose();
long toAddSize = 0;
int toAddCount = 0;
for (Mutation m : ms) {
if (m instanceof Put) {
- validatePut((Put) m);
+ HTable.validatePut((Put) m, maxKeyValueSize);
}
toAddSize += m.heapSize();
++toAddCount;
@@ -191,26 +197,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
if (currentWriteBufferSize.get() == 0) {
firstRecordInBufferTimestamp.set(System.currentTimeMillis());
}
-
- // This behavior is highly non-intuitive... it does not protect us against
- // 94-incompatible behavior, which is a timing issue because hasError, the below code
- // and setter of hasError are not synchronized. Perhaps it should be removed.
- if (ap.hasError()) {
- currentWriteBufferSize.addAndGet(toAddSize);
- writeAsyncBuffer.addAll(ms);
- undealtMutationCount.addAndGet(toAddCount);
- backgroundFlushCommits(true);
- } else {
- currentWriteBufferSize.addAndGet(toAddSize);
- writeAsyncBuffer.addAll(ms);
- undealtMutationCount.addAndGet(toAddCount);
- }
-
- // Now try and queue what needs to be queued.
- while (undealtMutationCount.get() != 0
- && currentWriteBufferSize.get() > writeBufferSize) {
- backgroundFlushCommits(false);
- }
+ currentWriteBufferSize.addAndGet(toAddSize);
+ writeAsyncBuffer.addAll(ms);
+ undealtMutationCount.addAndGet(toAddCount);
+ doFlush(false);
}
@VisibleForTesting
@@ -238,118 +228,40 @@ public class BufferedMutatorImpl implements BufferedMutator {
}
}
- // validate for well-formedness
- public void validatePut(final Put put) throws IllegalArgumentException {
- HTable.validatePut(put, maxKeyValueSize);
- }
-
@Override
public synchronized void close() throws IOException {
- try {
- if (this.closed) {
- return;
- }
-
- // Stop any running Periodic Flush timer.
- disableWriteBufferPeriodicFlush();
-
- // As we can have an operation in progress even if the buffer is empty, we call
- // backgroundFlushCommits at least one time.
- backgroundFlushCommits(true);
- if (cleanupPoolOnClose) {
- this.pool.shutdown();
- boolean terminated;
- int loopCnt = 0;
- do {
- // wait until the pool has terminated
- terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
- loopCnt += 1;
- if (loopCnt >= 10) {
- LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
- break;
- }
- } while (!terminated);
- }
- } catch (InterruptedException e) {
- LOG.warn("waitForTermination interrupted");
- } finally {
- this.closed = true;
- }
- }
-
- @Override
- public synchronized void flush() throws InterruptedIOException,
- RetriesExhaustedWithDetailsException {
- // As we can have an operation in progress even if the buffer is empty, we call
- // backgroundFlushCommits at least one time.
- backgroundFlushCommits(true);
- }
-
- /**
- * Send the operations in the buffer to the servers. Does not wait for the server's answer. If
- * the is an error (max retried reach from a previous flush or bad operation), it tries to send
- * all operations in the buffer and sends an exception.
- *
- * @param synchronous - if true, sends all the writes and wait for all of them to finish before
- * returning.
- */
- private void backgroundFlushCommits(boolean synchronous) throws
- InterruptedIOException,
- RetriesExhaustedWithDetailsException {
- if (!synchronous && writeAsyncBuffer.isEmpty()) {
+ if (closed) {
return;
}
-
- if (!synchronous) {
- QueueRowAccess taker = new QueueRowAccess();
- AsyncProcessTask task = wrapAsyncProcessTask(taker);
- try {
- ap.submit(task);
- if (ap.hasError()) {
- LOG.debug(tableName + ": One or more of the operations have failed -"
- + " waiting for all operation in progress to finish (successfully or not)");
- }
- } finally {
- taker.restoreRemainder();
- }
- }
- if (synchronous || ap.hasError()) {
- QueueRowAccess taker = new QueueRowAccess();
- AsyncProcessTask task = wrapAsyncProcessTask(taker);
- try {
- while (!taker.isEmpty()) {
- ap.submit(task);
- taker.reset();
- }
- } finally {
- taker.restoreRemainder();
- }
- RetriesExhaustedWithDetailsException error =
- ap.waitForAllPreviousOpsAndReset(null, tableName);
- if (error != null) {
- if (listener == null) {
- throw error;
- } else {
- this.listener.onException(error, this);
+ // Stop any running Periodic Flush timer.
+ disableWriteBufferPeriodicFlush();
+ try {
+ // As we can have an operation in progress even if the buffer is empty, we call
+ // doFlush at least one time.
+ doFlush(true);
+ } finally {
+ if (cleanupPoolOnClose) {
+ this.pool.shutdown();
+ try {
+ if (!pool.awaitTermination(600, TimeUnit.SECONDS)) {
+ LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("waitForTermination interrupted");
+ Thread.currentThread().interrupt();
}
}
+ closed = true;
}
}
- /**
- * Reuse the AsyncProcessTask when calling
- * {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
- * @param taker access the inner buffer.
- * @return An AsyncProcessTask which always returns the latest rpc and operation timeout.
- */
- private AsyncProcessTask wrapAsyncProcessTask(QueueRowAccess taker) {
- AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ private AsyncProcessTask createTask(QueueRowAccess access) {
+ return new AsyncProcessTask(AsyncProcessTask.newBuilder()
.setPool(pool)
.setTableName(tableName)
- .setRowAccess(taker)
+ .setRowAccess(access)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
- .build();
- return new AsyncProcessTask(task) {
+ .build()) {
@Override
public int getRpcTimeout() {
return rpcTimeout.get();
@@ -362,6 +274,72 @@ public class BufferedMutatorImpl implements BufferedMutator {
};
}
+ @Override
+ public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+ checkClose();
+ doFlush(true);
+ }
+
+ /**
+ * Send the operations in the buffer to the servers.
+ *
+ * @param flushAll - if true, sends all the writes and wait for all of them to finish before
+ * returning. Otherwise, flush until buffer size is smaller than threshold
+ */
+ private void doFlush(boolean flushAll) throws InterruptedIOException,
+ RetriesExhaustedWithDetailsException {
+ List errors = new ArrayList<>();
+ while (true) {
+ if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) {
+ // There is the room to accept more mutations.
+ break;
+ }
+ AsyncRequestFuture asf;
+ try (QueueRowAccess access = new QueueRowAccess()) {
+ if (access.isEmpty()) {
+ // It means someone has gotten the ticker to run the flush.
+ break;
+ }
+ asf = ap.submit(createTask(access));
+ }
+ // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't
+ // be released.
+ asf.waitUntilDone();
+ if (asf.hasError()) {
+ errors.add(asf.getErrors());
+ }
+ }
+
+ RetriesExhaustedWithDetailsException exception = makeException(errors);
+ if (exception == null) {
+ return;
+ } else if(listener == null) {
+ throw exception;
+ } else {
+ listener.onException(exception, this);
+ }
+ }
+
+ private static RetriesExhaustedWithDetailsException makeException(
+ List errors) {
+ switch (errors.size()) {
+ case 0:
+ return null;
+ case 1:
+ return errors.get(0);
+ default:
+ List exceptions = new ArrayList<>();
+ List actions = new ArrayList<>();
+ List hostnameAndPort = new ArrayList<>();
+ errors.forEach(e -> {
+ exceptions.addAll(e.exceptions);
+ actions.addAll(e.actions);
+ hostnameAndPort.addAll(e.hostnameAndPort);
+ });
+ return new RetriesExhaustedWithDetailsException(exceptions, actions, hostnameAndPort);
+ }
+ }
+
/**
* {@inheritDoc}
*/
@@ -433,12 +411,15 @@ public class BufferedMutatorImpl implements BufferedMutator {
return undealtMutationCount.get();
}
- private class QueueRowAccess implements RowAccess {
+ private class QueueRowAccess implements RowAccess, Closeable {
private int remainder = undealtMutationCount.getAndSet(0);
- void reset() {
- restoreRemainder();
- remainder = undealtMutationCount.getAndSet(0);
+ @Override
+ public void close() {
+ if (remainder > 0) {
+ undealtMutationCount.addAndGet(remainder);
+ remainder = 0;
+ }
}
@Override
@@ -474,6 +455,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
iter.remove();
currentWriteBufferSize.addAndGet(-last.heapSize());
--remainder;
+ last = null;
}
};
}
@@ -483,13 +465,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
return remainder;
}
- void restoreRemainder() {
- if (remainder > 0) {
- undealtMutationCount.addAndGet(remainder);
- remainder = 0;
- }
- }
-
@Override
public boolean isEmpty() {
return remainder <= 0;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 8807884fdc9..a5a01881681 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -261,7 +261,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
- this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, false, rpcControllerFactory);
+ this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics = new MetricsConnection(this);
} else {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 855005efdbc..a4289e939a7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -1182,10 +1182,9 @@ public class HTable implements Table {
final List callbackErrorServers = new ArrayList<>();
Object[] results = new Object[execs.size()];
- AsyncProcess asyncProcess =
- new AsyncProcess(connection, configuration,
- RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
- true, RpcControllerFactory.instantiate(configuration));
+ AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
+ RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
+ RpcControllerFactory.instantiate(configuration));
Callback resultsCallback
= (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 4be39a9c21e..e6b061e45fd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -452,7 +452,7 @@ public class HTableMultiplexer {
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory);
+ this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory);
this.executor = executor;
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
this.pool = pool;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
index 16921fe297a..9f92c66d317 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.client;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -27,7 +25,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* of elements between collections.
* @param
*/
-@InterfaceAudience.Public
+@InterfaceAudience.Private
public interface RowAccess extends Iterable {
/**
* @return true if there are no elements.
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index fc405df031b..2979dcd5e35 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@@ -171,22 +170,17 @@ public class TestAsyncProcess {
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
- this(hc, conf, new AtomicInteger());
+ super(hc, conf,
+ new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
+ service = Executors.newFixedThreadPool(5);
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
- super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
+ super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), new CountingThreadFactory(nbThreads));
}
- public MyAsyncProcess(
- ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
- super(hc, conf,
- new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
- service = Executors.newFixedThreadPool(5);
- }
-
public AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
List extends Row> rows, boolean atLeastOne, Batch.Callback callback,
boolean needResults) throws InterruptedIOException {
@@ -324,7 +318,7 @@ public class TestAsyncProcess {
private final IOException ioe;
public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
- super(hc, conf, true);
+ super(hc, conf);
this.ioe = ioe;
serverTrackerTimeout = 1L;
}
@@ -656,7 +650,7 @@ public class TestAsyncProcess {
+ ", minCountSnRequest:" + minCountSnRequest
+ ", minCountSn2Request:" + minCountSn2Request);
- MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) {
mutator.mutate(puts);
@@ -807,7 +801,7 @@ public class TestAsyncProcess {
@Test
public void testFail() throws Exception {
- MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
List puts = new ArrayList<>(1);
Put p = createPut(1, false);
@@ -834,7 +828,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitTrue() throws IOException {
ClusterConnection conn = createHConnection();
- final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false);
+ final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
final String defaultClazz =
conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
@@ -882,7 +876,7 @@ public class TestAsyncProcess {
@Test
public void testFailAndSuccess() throws Exception {
- MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
List puts = new ArrayList<>(3);
puts.add(createPut(1, false));
@@ -909,7 +903,7 @@ public class TestAsyncProcess {
@Test
public void testFlush() throws Exception {
- MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
List puts = new ArrayList<>(3);
puts.add(createPut(1, false));
@@ -927,7 +921,7 @@ public class TestAsyncProcess {
@Test
public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
ClusterConnection hc = createHConnection();
- MyAsyncProcess ap = new MyAsyncProcess(hc, CONF, false);
+ MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
testTaskCount(ap);
}
@@ -944,7 +938,7 @@ public class TestAsyncProcess {
conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
SimpleRequestController.class.getName());
- MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, false);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
testTaskCount(ap);
if (defaultClazz != null) {
conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
@@ -981,7 +975,7 @@ public class TestAsyncProcess {
conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
SimpleRequestController.class.getName());
- final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false);
+ final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
SimpleRequestController controller = (SimpleRequestController) ap.requestController;
@@ -1087,7 +1081,7 @@ public class TestAsyncProcess {
@Test
public void testHTablePutSuccess() throws Exception {
ClusterConnection conn = createHConnection();
- MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
@@ -1104,7 +1098,7 @@ public class TestAsyncProcess {
@Test
public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
ClusterConnection conn = createHConnection();
- MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
checkPeriodicFlushParameters(conn, ap,
1234, 1234,
@@ -1150,7 +1144,7 @@ public class TestAsyncProcess {
@Test
public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
ClusterConnection conn = createHConnection();
- MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP
@@ -1217,7 +1211,7 @@ public class TestAsyncProcess {
@Test
public void testBufferedMutatorImplWithSharedPool() throws Exception {
ClusterConnection conn = createHConnection();
- MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap);
@@ -1226,30 +1220,27 @@ public class TestAsyncProcess {
}
@Test
- public void testHTableFailedPutAndNewPut() throws Exception {
+ public void testFailedPutAndNewPut() throws Exception {
ClusterConnection conn = createHConnection();
- MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE)
.writeBufferSize(0);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
Put p = createPut(1, false);
- mutator.mutate(p);
-
- ap.waitForMaximumCurrentTasks(0, null); // Let's do all the retries.
-
- // We're testing that we're behaving as we were behaving in 0.94: sending exceptions in the
- // doPut if it fails.
- // This said, it's not a very easy going behavior. For example, when we insert a list of
- // puts, we may raise an exception in the middle of the list. It's then up to the caller to
- // manage what was inserted, what was tried but failed, and what was not even tried.
- p = createPut(1, true);
- Assert.assertEquals(0, mutator.size());
try {
mutator.mutate(p);
Assert.fail();
- } catch (RetriesExhaustedException expected) {
+ } catch (RetriesExhaustedWithDetailsException expected) {
+ assertEquals(1, expected.getNumExceptions());
+ assertTrue(expected.getRow(0) == p);
}
+ // Let's do all the retries.
+ ap.waitForMaximumCurrentTasks(0, null);
+ Assert.assertEquals(0, mutator.size());
+
+ // There is no global error so the new put should not fail
+ mutator.mutate(createPut(1, true));
Assert.assertEquals("the put should not been inserted.", 0, mutator.size());
}
@@ -1277,7 +1268,7 @@ public class TestAsyncProcess {
public void testBatch() throws IOException, InterruptedException {
ClusterConnection conn = new MyConnectionImpl(CONF);
HTable ht = (HTable) conn.getTable(DUMMY_TABLE);
- ht.multiAp = new MyAsyncProcess(conn, CONF, false);
+ ht.multiAp = new MyAsyncProcess(conn, CONF);
List puts = new ArrayList<>(7);
puts.add(createPut(1, true));
@@ -1307,7 +1298,7 @@ public class TestAsyncProcess {
public void testErrorsServers() throws IOException {
Configuration configuration = new Configuration(CONF);
ClusterConnection conn = new MyConnectionImpl(configuration);
- MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, configuration);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
@@ -1323,21 +1314,22 @@ public class TestAsyncProcess {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
+ assertEquals(1, expected.getNumExceptions());
+ assertTrue(expected.getRow(0) == p);
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
}
- @Ignore @Test // Test is failing with wrong count. FIX!!
+ @Test
public void testReadAndWriteTimeout() throws IOException {
final long readTimeout = 10 * 1000;
final long writeTimeout = 20 * 1000;
Configuration copyConf = new Configuration(CONF);
copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
- ClusterConnection conn = createHConnection();
- Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
- MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
+ ClusterConnection conn = new MyConnectionImpl(copyConf);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) {
ht.multiAp = ap;
List gets = new LinkedList<>();
@@ -1368,7 +1360,7 @@ public class TestAsyncProcess {
}
@Test
- public void testGlobalErrors() throws IOException {
+ public void testErrors() throws IOException {
ClusterConnection conn = new MyConnectionImpl(CONF);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
@@ -1383,6 +1375,8 @@ public class TestAsyncProcess {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
+ assertEquals(1, expected.getNumExceptions());
+ assertTrue(expected.getRow(0) == p);
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
@@ -1404,6 +1398,8 @@ public class TestAsyncProcess {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
+ assertEquals(1, expected.getNumExceptions());
+ assertTrue(expected.getRow(0) == p);
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
@@ -1703,7 +1699,7 @@ public class TestAsyncProcess {
static class AsyncProcessForThrowableCheck extends AsyncProcess {
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
- super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
+ super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(
conf));
}
}
@@ -1759,6 +1755,8 @@ public class TestAsyncProcess {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
+ assertEquals(1, expected.getNumExceptions());
+ assertTrue(expected.getRow(0) == p);
}
long actualSleep = System.currentTimeMillis() - startTime;
long expectedSleep = 0L;
@@ -1784,6 +1782,8 @@ public class TestAsyncProcess {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
+ assertEquals(1, expected.getNumExceptions());
+ assertTrue(expected.getRow(0) == p);
}
actualSleep = System.currentTimeMillis() - startTime;
expectedSleep = 0L;
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
index c46385e5a4d..ffc4e5192f9 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
@@ -202,7 +202,7 @@ public class TestAsyncProcessWithRegionException {
private final ExecutorService service = Executors.newFixedThreadPool(5);
MyAsyncProcess(ClusterConnection hc, Configuration conf) {
- super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
+ super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
}
public AsyncRequestFuture submit(TableName tableName, List extends Row> rows)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 8ef784c7728..0f896b33075 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -127,7 +127,7 @@ public class HConnectionTestingUtility {
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
Mockito.when(c.getAsyncProcess()).thenReturn(
- new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), false,
+ new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf),
RpcControllerFactory.instantiate(conf)));
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
RpcRetryingCallerFactory.instantiate(conf,