From 34d5f2b70eff408c884401ae79ea17eff9f53a7f Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 17 Feb 2018 07:16:14 +0800 Subject: [PATCH] HBASE-19680 BufferedMutatorImpl#mutate should wait the result from AP in order to throw the failed mutations --- .../hadoop/hbase/client/AsyncProcess.java | 54 +--- .../hbase/client/AsyncRequestFuture.java | 6 +- .../hbase/client/AsyncRequestFutureImpl.java | 3 +- .../hbase/client/BufferedMutatorImpl.java | 249 ++++++++---------- .../client/ConnectionImplementation.java | 2 +- .../apache/hadoop/hbase/client/HTable.java | 7 +- .../hbase/client/HTableMultiplexer.java | 2 +- .../apache/hadoop/hbase/client/RowAccess.java | 4 +- .../hadoop/hbase/client/TestAsyncProcess.java | 90 +++---- .../TestAsyncProcessWithRegionException.java | 2 +- .../client/HConnectionTestingUtility.java | 2 +- 11 files changed, 175 insertions(+), 246 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 6c4118c5132..de7449bf285 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -65,17 +65,12 @@ import org.apache.hadoop.hbase.util.Bytes; * The class manages internally the retries. *

*

- * The class can be constructed in regular mode, or "global error" mode. In global error mode, - * AP tracks errors across all calls (each "future" also has global view of all errors). That - * mode is necessary for backward compat with HTable behavior, where multiple submissions are - * made and the errors can propagate using any put/flush call, from previous calls. - * In "regular" mode, the errors are tracked inside the Future object that is returned. + * The errors are tracked inside the Future object that is returned. * The results are always tracked inside the Future object and can be retrieved when the call * has finished. Partial results can also be retrieved if some part of multi-request failed. *

*

- * This class is thread safe in regular mode; in global error code, submitting operations and - * retrieving errors from different threads may be not thread safe. + * This class is thread safe. * Internally, the class is thread safe enough to manage simultaneously new submission and results * arising from older operations. *

@@ -144,7 +139,6 @@ class AsyncProcess { final ClusterConnection connection; private final RpcRetryingCallerFactory rpcCallerFactory; final RpcControllerFactory rpcFactory; - final BatchErrors globalErrors; // Start configuration settings. final int startLogErrorsCnt; @@ -168,14 +162,12 @@ class AsyncProcess { private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000; private final int periodToLog; AsyncProcess(ClusterConnection hc, Configuration conf, - RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, - RpcControllerFactory rpcFactory) { + RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) { if (hc == null) { throw new IllegalArgumentException("ClusterConnection cannot be null."); } this.connection = hc; - this.globalErrors = useGlobalErrors ? new BatchErrors() : null; this.id = COUNTER.incrementAndGet(); @@ -445,10 +437,10 @@ class AsyncProcess { private Consumer getLogger(TableName tableName, long max) { return (currentInProgress) -> { - LOG.info("#" + id + (max < 0 ? ", waiting for any free slot" - : ", waiting for some tasks to finish. Expected max=" - + max) + ", tasksInProgress=" + currentInProgress + - " hasError=" + hasError() + (tableName == null ? "" : ", tableName=" + tableName)); + LOG.info("#" + id + (max < 0 ? + ", waiting for any free slot" : + ", waiting for some tasks to finish. Expected max=" + max) + ", tasksInProgress=" + + currentInProgress + (tableName == null ? "" : ", tableName=" + tableName)); }; } @@ -460,38 +452,6 @@ class AsyncProcess { void decTaskCounters(Collection regions, ServerName sn) { requestController.decTaskCounters(regions, sn); } - /** - * Only used w/useGlobalErrors ctor argument, for HTable backward compat. - * @return Whether there were any errors in any request since the last time - * {@link #waitForAllPreviousOpsAndReset(List, TableName)} was called, or AP was created. - */ - public boolean hasError() { - return globalErrors != null && globalErrors.hasErrors(); - } - - /** - * Only used w/useGlobalErrors ctor argument, for HTable backward compat. - * Waits for all previous operations to finish, and returns errors and (optionally) - * failed operations themselves. - * @param failedRows an optional list into which the rows that failed since the last time - * {@link #waitForAllPreviousOpsAndReset(List, TableName)} was called, or AP was created, are saved. - * @param tableName name of the table - * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List, TableName)} - * was called, or AP was created. - */ - public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( - List failedRows, TableName tableName) throws InterruptedIOException { - waitForMaximumCurrentTasks(0, tableName); - if (globalErrors == null || !globalErrors.hasErrors()) { - return null; - } - if (failedRows != null) { - failedRows.addAll(globalErrors.actions); - } - RetriesExhaustedWithDetailsException result = globalErrors.makeException(logBatchErrorDetails); - globalErrors.clear(); - return result; - } /** * Create a caller. Isolated to be easily overridden in the tests. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java index 90bd235dcba..b91e094d340 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java @@ -24,10 +24,8 @@ import java.io.InterruptedIOException; import java.util.List; /** - * The context used to wait for results from one submit call. - * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), - * then errors and failed operations in this object will reflect global errors. - * 2) If submit call is made with needResults false, results will not be saved. + * The context used to wait for results from one submit call. If submit call is made with + * needResults false, results will not be saved. * @since 2.0.0 */ @InterfaceAudience.Private diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index ace74f97ea0..a8b8ebfa6a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -388,8 +388,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { new ConcurrentHashMap()); this.asyncProcess = asyncProcess; this.errorsByServer = createServerErrorTracker(); - this.errors = (asyncProcess.globalErrors != null) - ? asyncProcess.globalErrors : new BatchErrors(); + this.errors = new BatchErrors(); this.operationTimeout = task.getOperationTimeout(); this.rpcTimeout = task.getRpcTimeout(); this.currentCallable = task.getCallable(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index b171fc404d3..9d24b4daa90 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -16,8 +16,11 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET; + +import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -141,7 +144,13 @@ public class BufferedMutatorImpl implements BufferedMutator { RpcControllerFactory rpcFactory, BufferedMutatorParams params) { this(conn, params, // puts need to track errors globally due to how the APIs currently work. - new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, true, rpcFactory)); + new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory)); + } + + private void checkClose() { + if (closed) { + throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); + } } @VisibleForTesting @@ -173,16 +182,13 @@ public class BufferedMutatorImpl implements BufferedMutator { @Override public void mutate(List ms) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - - if (closed) { - throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); - } + checkClose(); long toAddSize = 0; int toAddCount = 0; for (Mutation m : ms) { if (m instanceof Put) { - validatePut((Put) m); + HTable.validatePut((Put) m, maxKeyValueSize); } toAddSize += m.heapSize(); ++toAddCount; @@ -191,26 +197,10 @@ public class BufferedMutatorImpl implements BufferedMutator { if (currentWriteBufferSize.get() == 0) { firstRecordInBufferTimestamp.set(System.currentTimeMillis()); } - - // This behavior is highly non-intuitive... it does not protect us against - // 94-incompatible behavior, which is a timing issue because hasError, the below code - // and setter of hasError are not synchronized. Perhaps it should be removed. - if (ap.hasError()) { - currentWriteBufferSize.addAndGet(toAddSize); - writeAsyncBuffer.addAll(ms); - undealtMutationCount.addAndGet(toAddCount); - backgroundFlushCommits(true); - } else { - currentWriteBufferSize.addAndGet(toAddSize); - writeAsyncBuffer.addAll(ms); - undealtMutationCount.addAndGet(toAddCount); - } - - // Now try and queue what needs to be queued. - while (undealtMutationCount.get() != 0 - && currentWriteBufferSize.get() > writeBufferSize) { - backgroundFlushCommits(false); - } + currentWriteBufferSize.addAndGet(toAddSize); + writeAsyncBuffer.addAll(ms); + undealtMutationCount.addAndGet(toAddCount); + doFlush(false); } @VisibleForTesting @@ -238,118 +228,40 @@ public class BufferedMutatorImpl implements BufferedMutator { } } - // validate for well-formedness - public void validatePut(final Put put) throws IllegalArgumentException { - HTable.validatePut(put, maxKeyValueSize); - } - @Override public synchronized void close() throws IOException { - try { - if (this.closed) { - return; - } - - // Stop any running Periodic Flush timer. - disableWriteBufferPeriodicFlush(); - - // As we can have an operation in progress even if the buffer is empty, we call - // backgroundFlushCommits at least one time. - backgroundFlushCommits(true); - if (cleanupPoolOnClose) { - this.pool.shutdown(); - boolean terminated; - int loopCnt = 0; - do { - // wait until the pool has terminated - terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); - loopCnt += 1; - if (loopCnt >= 10) { - LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); - break; - } - } while (!terminated); - } - } catch (InterruptedException e) { - LOG.warn("waitForTermination interrupted"); - } finally { - this.closed = true; - } - } - - @Override - public synchronized void flush() throws InterruptedIOException, - RetriesExhaustedWithDetailsException { - // As we can have an operation in progress even if the buffer is empty, we call - // backgroundFlushCommits at least one time. - backgroundFlushCommits(true); - } - - /** - * Send the operations in the buffer to the servers. Does not wait for the server's answer. If - * the is an error (max retried reach from a previous flush or bad operation), it tries to send - * all operations in the buffer and sends an exception. - * - * @param synchronous - if true, sends all the writes and wait for all of them to finish before - * returning. - */ - private void backgroundFlushCommits(boolean synchronous) throws - InterruptedIOException, - RetriesExhaustedWithDetailsException { - if (!synchronous && writeAsyncBuffer.isEmpty()) { + if (closed) { return; } - - if (!synchronous) { - QueueRowAccess taker = new QueueRowAccess(); - AsyncProcessTask task = wrapAsyncProcessTask(taker); - try { - ap.submit(task); - if (ap.hasError()) { - LOG.debug(tableName + ": One or more of the operations have failed -" - + " waiting for all operation in progress to finish (successfully or not)"); - } - } finally { - taker.restoreRemainder(); - } - } - if (synchronous || ap.hasError()) { - QueueRowAccess taker = new QueueRowAccess(); - AsyncProcessTask task = wrapAsyncProcessTask(taker); - try { - while (!taker.isEmpty()) { - ap.submit(task); - taker.reset(); - } - } finally { - taker.restoreRemainder(); - } - RetriesExhaustedWithDetailsException error = - ap.waitForAllPreviousOpsAndReset(null, tableName); - if (error != null) { - if (listener == null) { - throw error; - } else { - this.listener.onException(error, this); + // Stop any running Periodic Flush timer. + disableWriteBufferPeriodicFlush(); + try { + // As we can have an operation in progress even if the buffer is empty, we call + // doFlush at least one time. + doFlush(true); + } finally { + if (cleanupPoolOnClose) { + this.pool.shutdown(); + try { + if (!pool.awaitTermination(600, TimeUnit.SECONDS)) { + LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + } + } catch (InterruptedException e) { + LOG.warn("waitForTermination interrupted"); + Thread.currentThread().interrupt(); } } + closed = true; } } - /** - * Reuse the AsyncProcessTask when calling - * {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}. - * @param taker access the inner buffer. - * @return An AsyncProcessTask which always returns the latest rpc and operation timeout. - */ - private AsyncProcessTask wrapAsyncProcessTask(QueueRowAccess taker) { - AsyncProcessTask task = AsyncProcessTask.newBuilder() + private AsyncProcessTask createTask(QueueRowAccess access) { + return new AsyncProcessTask(AsyncProcessTask.newBuilder() .setPool(pool) .setTableName(tableName) - .setRowAccess(taker) + .setRowAccess(access) .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) - .build(); - return new AsyncProcessTask(task) { + .build()) { @Override public int getRpcTimeout() { return rpcTimeout.get(); @@ -362,6 +274,72 @@ public class BufferedMutatorImpl implements BufferedMutator { }; } + @Override + public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException { + checkClose(); + doFlush(true); + } + + /** + * Send the operations in the buffer to the servers. + * + * @param flushAll - if true, sends all the writes and wait for all of them to finish before + * returning. Otherwise, flush until buffer size is smaller than threshold + */ + private void doFlush(boolean flushAll) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + List errors = new ArrayList<>(); + while (true) { + if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) { + // There is the room to accept more mutations. + break; + } + AsyncRequestFuture asf; + try (QueueRowAccess access = new QueueRowAccess()) { + if (access.isEmpty()) { + // It means someone has gotten the ticker to run the flush. + break; + } + asf = ap.submit(createTask(access)); + } + // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't + // be released. + asf.waitUntilDone(); + if (asf.hasError()) { + errors.add(asf.getErrors()); + } + } + + RetriesExhaustedWithDetailsException exception = makeException(errors); + if (exception == null) { + return; + } else if(listener == null) { + throw exception; + } else { + listener.onException(exception, this); + } + } + + private static RetriesExhaustedWithDetailsException makeException( + List errors) { + switch (errors.size()) { + case 0: + return null; + case 1: + return errors.get(0); + default: + List exceptions = new ArrayList<>(); + List actions = new ArrayList<>(); + List hostnameAndPort = new ArrayList<>(); + errors.forEach(e -> { + exceptions.addAll(e.exceptions); + actions.addAll(e.actions); + hostnameAndPort.addAll(e.hostnameAndPort); + }); + return new RetriesExhaustedWithDetailsException(exceptions, actions, hostnameAndPort); + } + } + /** * {@inheritDoc} */ @@ -433,12 +411,15 @@ public class BufferedMutatorImpl implements BufferedMutator { return undealtMutationCount.get(); } - private class QueueRowAccess implements RowAccess { + private class QueueRowAccess implements RowAccess, Closeable { private int remainder = undealtMutationCount.getAndSet(0); - void reset() { - restoreRemainder(); - remainder = undealtMutationCount.getAndSet(0); + @Override + public void close() { + if (remainder > 0) { + undealtMutationCount.addAndGet(remainder); + remainder = 0; + } } @Override @@ -474,6 +455,7 @@ public class BufferedMutatorImpl implements BufferedMutator { iter.remove(); currentWriteBufferSize.addAndGet(-last.heapSize()); --remainder; + last = null; } }; } @@ -483,13 +465,6 @@ public class BufferedMutatorImpl implements BufferedMutator { return remainder; } - void restoreRemainder() { - if (remainder > 0) { - undealtMutationCount.addAndGet(remainder); - remainder = 0; - } - } - @Override public boolean isEmpty() { return remainder <= 0; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 8807884fdc9..a5a01881681 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -261,7 +261,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); - this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, false, rpcControllerFactory); + this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { this.metrics = new MetricsConnection(this); } else { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 855005efdbc..a4289e939a7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1182,10 +1182,9 @@ public class HTable implements Table { final List callbackErrorServers = new ArrayList<>(); Object[] results = new Object[execs.size()]; - AsyncProcess asyncProcess = - new AsyncProcess(connection, configuration, - RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration)); + AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, + RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), + RpcControllerFactory.instantiate(configuration)); Callback resultsCallback = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 4be39a9c21e..e6b061e45fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -452,7 +452,7 @@ public class HTableMultiplexer { HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory); + this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); this.pool = pool; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java index 16921fe297a..9f92c66d317 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.client; - -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.yetus.audience.InterfaceAudience; /** @@ -27,7 +25,7 @@ import org.apache.yetus.audience.InterfaceAudience; * of elements between collections. * @param */ -@InterfaceAudience.Public +@InterfaceAudience.Private public interface RowAccess extends Iterable { /** * @return true if there are no elements. diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index fc405df031b..2979dcd5e35 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; @@ -171,22 +170,17 @@ public class TestAsyncProcess { } public MyAsyncProcess(ClusterConnection hc, Configuration conf) { - this(hc, conf, new AtomicInteger()); + super(hc, conf, + new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); + service = Executors.newFixedThreadPool(5); } public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { - super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); + super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new CountingThreadFactory(nbThreads)); } - public MyAsyncProcess( - ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { - super(hc, conf, - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); - service = Executors.newFixedThreadPool(5); - } - public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, List rows, boolean atLeastOne, Batch.Callback callback, boolean needResults) throws InterruptedIOException { @@ -324,7 +318,7 @@ public class TestAsyncProcess { private final IOException ioe; public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) { - super(hc, conf, true); + super(hc, conf); this.ioe = ioe; serverTrackerTimeout = 1L; } @@ -656,7 +650,7 @@ public class TestAsyncProcess { + ", minCountSnRequest:" + minCountSnRequest + ", minCountSn2Request:" + minCountSn2Request); - MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) { mutator.mutate(puts); @@ -807,7 +801,7 @@ public class TestAsyncProcess { @Test public void testFail() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); List puts = new ArrayList<>(1); Put p = createPut(1, false); @@ -834,7 +828,7 @@ public class TestAsyncProcess { @Test public void testSubmitTrue() throws IOException { ClusterConnection conn = createHConnection(); - final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false); + final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, @@ -882,7 +876,7 @@ public class TestAsyncProcess { @Test public void testFailAndSuccess() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); List puts = new ArrayList<>(3); puts.add(createPut(1, false)); @@ -909,7 +903,7 @@ public class TestAsyncProcess { @Test public void testFlush() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); List puts = new ArrayList<>(3); puts.add(createPut(1, false)); @@ -927,7 +921,7 @@ public class TestAsyncProcess { @Test public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { ClusterConnection hc = createHConnection(); - MyAsyncProcess ap = new MyAsyncProcess(hc, CONF, false); + MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); testTaskCount(ap); } @@ -944,7 +938,7 @@ public class TestAsyncProcess { conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, SimpleRequestController.class.getName()); - MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, false); + MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); testTaskCount(ap); if (defaultClazz != null) { conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, @@ -981,7 +975,7 @@ public class TestAsyncProcess { conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, SimpleRequestController.class.getName()); - final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false); + final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); SimpleRequestController controller = (SimpleRequestController) ap.requestController; @@ -1087,7 +1081,7 @@ public class TestAsyncProcess { @Test public void testHTablePutSuccess() throws Exception { ClusterConnection conn = createHConnection(); - MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); @@ -1104,7 +1098,7 @@ public class TestAsyncProcess { @Test public void testSettingWriteBufferPeriodicFlushParameters() throws Exception { ClusterConnection conn = createHConnection(); - MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); checkPeriodicFlushParameters(conn, ap, 1234, 1234, @@ -1150,7 +1144,7 @@ public class TestAsyncProcess { @Test public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception { ClusterConnection conn = createHConnection(); - MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP @@ -1217,7 +1211,7 @@ public class TestAsyncProcess { @Test public void testBufferedMutatorImplWithSharedPool() throws Exception { ClusterConnection conn = createHConnection(); - MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap); @@ -1226,30 +1220,27 @@ public class TestAsyncProcess { } @Test - public void testHTableFailedPutAndNewPut() throws Exception { + public void testFailedPutAndNewPut() throws Exception { ClusterConnection conn = createHConnection(); - MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE) .writeBufferSize(0); BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); Put p = createPut(1, false); - mutator.mutate(p); - - ap.waitForMaximumCurrentTasks(0, null); // Let's do all the retries. - - // We're testing that we're behaving as we were behaving in 0.94: sending exceptions in the - // doPut if it fails. - // This said, it's not a very easy going behavior. For example, when we insert a list of - // puts, we may raise an exception in the middle of the list. It's then up to the caller to - // manage what was inserted, what was tried but failed, and what was not even tried. - p = createPut(1, true); - Assert.assertEquals(0, mutator.size()); try { mutator.mutate(p); Assert.fail(); - } catch (RetriesExhaustedException expected) { + } catch (RetriesExhaustedWithDetailsException expected) { + assertEquals(1, expected.getNumExceptions()); + assertTrue(expected.getRow(0) == p); } + // Let's do all the retries. + ap.waitForMaximumCurrentTasks(0, null); + Assert.assertEquals(0, mutator.size()); + + // There is no global error so the new put should not fail + mutator.mutate(createPut(1, true)); Assert.assertEquals("the put should not been inserted.", 0, mutator.size()); } @@ -1277,7 +1268,7 @@ public class TestAsyncProcess { public void testBatch() throws IOException, InterruptedException { ClusterConnection conn = new MyConnectionImpl(CONF); HTable ht = (HTable) conn.getTable(DUMMY_TABLE); - ht.multiAp = new MyAsyncProcess(conn, CONF, false); + ht.multiAp = new MyAsyncProcess(conn, CONF); List puts = new ArrayList<>(7); puts.add(createPut(1, true)); @@ -1307,7 +1298,7 @@ public class TestAsyncProcess { public void testErrorsServers() throws IOException { Configuration configuration = new Configuration(CONF); ClusterConnection conn = new MyConnectionImpl(configuration); - MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true); + MyAsyncProcess ap = new MyAsyncProcess(conn, configuration); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); @@ -1323,21 +1314,22 @@ public class TestAsyncProcess { mutator.flush(); Assert.fail(); } catch (RetriesExhaustedWithDetailsException expected) { + assertEquals(1, expected.getNumExceptions()); + assertTrue(expected.getRow(0) == p); } // Checking that the ErrorsServers came into play and didn't make us stop immediately Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); } - @Ignore @Test // Test is failing with wrong count. FIX!! + @Test public void testReadAndWriteTimeout() throws IOException { final long readTimeout = 10 * 1000; final long writeTimeout = 20 * 1000; Configuration copyConf = new Configuration(CONF); copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout); copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); - ClusterConnection conn = createHConnection(); - Mockito.when(conn.getConfiguration()).thenReturn(copyConf); - MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true); + ClusterConnection conn = new MyConnectionImpl(copyConf); + MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) { ht.multiAp = ap; List gets = new LinkedList<>(); @@ -1368,7 +1360,7 @@ public class TestAsyncProcess { } @Test - public void testGlobalErrors() throws IOException { + public void testErrors() throws IOException { ClusterConnection conn = new MyConnectionImpl(CONF); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test")); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); @@ -1383,6 +1375,8 @@ public class TestAsyncProcess { mutator.flush(); Assert.fail(); } catch (RetriesExhaustedWithDetailsException expected) { + assertEquals(1, expected.getNumExceptions()); + assertTrue(expected.getRow(0) == p); } // Checking that the ErrorsServers came into play and didn't make us stop immediately Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); @@ -1404,6 +1398,8 @@ public class TestAsyncProcess { mutator.flush(); Assert.fail(); } catch (RetriesExhaustedWithDetailsException expected) { + assertEquals(1, expected.getNumExceptions()); + assertTrue(expected.getRow(0) == p); } // Checking that the ErrorsServers came into play and didn't make us stop immediately Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); @@ -1703,7 +1699,7 @@ public class TestAsyncProcess { static class AsyncProcessForThrowableCheck extends AsyncProcess { public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) { - super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( + super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory( conf)); } } @@ -1759,6 +1755,8 @@ public class TestAsyncProcess { mutator.flush(); Assert.fail(); } catch (RetriesExhaustedWithDetailsException expected) { + assertEquals(1, expected.getNumExceptions()); + assertTrue(expected.getRow(0) == p); } long actualSleep = System.currentTimeMillis() - startTime; long expectedSleep = 0L; @@ -1784,6 +1782,8 @@ public class TestAsyncProcess { mutator.flush(); Assert.fail(); } catch (RetriesExhaustedWithDetailsException expected) { + assertEquals(1, expected.getNumExceptions()); + assertTrue(expected.getRow(0) == p); } actualSleep = System.currentTimeMillis() - startTime; expectedSleep = 0L; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java index c46385e5a4d..ffc4e5192f9 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java @@ -202,7 +202,7 @@ public class TestAsyncProcessWithRegionException { private final ExecutorService service = Executors.newFixedThreadPool(5); MyAsyncProcess(ClusterConnection hc, Configuration conf) { - super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); + super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); } public AsyncRequestFuture submit(TableName tableName, List rows) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 8ef784c7728..0f896b33075 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -127,7 +127,7 @@ public class HConnectionTestingUtility { NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(c.getNonceGenerator()).thenReturn(ng); Mockito.when(c.getAsyncProcess()).thenReturn( - new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), false, + new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf))); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( RpcRetryingCallerFactory.instantiate(conf,