HBASE-19680 BufferedMutatorImpl#mutate should wait the result from AP in order to throw the failed mutations

This commit is contained in:
Chia-Ping Tsai 2018-02-17 07:16:14 +08:00
parent d59959d19e
commit 34d5f2b70e
11 changed files with 175 additions and 246 deletions

View File

@ -65,17 +65,12 @@ import org.apache.hadoop.hbase.util.Bytes;
* The class manages internally the retries.
* </p>
* <p>
* The class can be constructed in regular mode, or "global error" mode. In global error mode,
* AP tracks errors across all calls (each "future" also has global view of all errors). That
* mode is necessary for backward compat with HTable behavior, where multiple submissions are
* made and the errors can propagate using any put/flush call, from previous calls.
* In "regular" mode, the errors are tracked inside the Future object that is returned.
* The errors are tracked inside the Future object that is returned.
* The results are always tracked inside the Future object and can be retrieved when the call
* has finished. Partial results can also be retrieved if some part of multi-request failed.
* </p>
* <p>
* This class is thread safe in regular mode; in global error code, submitting operations and
* retrieving errors from different threads may be not thread safe.
* This class is thread safe.
* Internally, the class is thread safe enough to manage simultaneously new submission and results
* arising from older operations.
* </p>
@ -144,7 +139,6 @@ class AsyncProcess {
final ClusterConnection connection;
private final RpcRetryingCallerFactory rpcCallerFactory;
final RpcControllerFactory rpcFactory;
final BatchErrors globalErrors;
// Start configuration settings.
final int startLogErrorsCnt;
@ -168,14 +162,12 @@ class AsyncProcess {
private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
private final int periodToLog;
AsyncProcess(ClusterConnection hc, Configuration conf,
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
RpcControllerFactory rpcFactory) {
RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
if (hc == null) {
throw new IllegalArgumentException("ClusterConnection cannot be null.");
}
this.connection = hc;
this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
this.id = COUNTER.incrementAndGet();
@ -445,10 +437,10 @@ class AsyncProcess {
private Consumer<Long> getLogger(TableName tableName, long max) {
return (currentInProgress) -> {
LOG.info("#" + id + (max < 0 ? ", waiting for any free slot"
: ", waiting for some tasks to finish. Expected max="
+ max) + ", tasksInProgress=" + currentInProgress +
" hasError=" + hasError() + (tableName == null ? "" : ", tableName=" + tableName));
LOG.info("#" + id + (max < 0 ?
", waiting for any free slot" :
", waiting for some tasks to finish. Expected max=" + max) + ", tasksInProgress="
+ currentInProgress + (tableName == null ? "" : ", tableName=" + tableName));
};
}
@ -460,38 +452,6 @@ class AsyncProcess {
void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
requestController.decTaskCounters(regions, sn);
}
/**
* Only used w/useGlobalErrors ctor argument, for HTable backward compat.
* @return Whether there were any errors in any request since the last time
* {@link #waitForAllPreviousOpsAndReset(List, TableName)} was called, or AP was created.
*/
public boolean hasError() {
return globalErrors != null && globalErrors.hasErrors();
}
/**
* Only used w/useGlobalErrors ctor argument, for HTable backward compat.
* Waits for all previous operations to finish, and returns errors and (optionally)
* failed operations themselves.
* @param failedRows an optional list into which the rows that failed since the last time
* {@link #waitForAllPreviousOpsAndReset(List, TableName)} was called, or AP was created, are saved.
* @param tableName name of the table
* @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List, TableName)}
* was called, or AP was created.
*/
public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
List<Row> failedRows, TableName tableName) throws InterruptedIOException {
waitForMaximumCurrentTasks(0, tableName);
if (globalErrors == null || !globalErrors.hasErrors()) {
return null;
}
if (failedRows != null) {
failedRows.addAll(globalErrors.actions);
}
RetriesExhaustedWithDetailsException result = globalErrors.makeException(logBatchErrorDetails);
globalErrors.clear();
return result;
}
/**
* Create a caller. Isolated to be easily overridden in the tests.

View File

@ -24,10 +24,8 @@ import java.io.InterruptedIOException;
import java.util.List;
/**
* The context used to wait for results from one submit call.
* 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
* then errors and failed operations in this object will reflect global errors.
* 2) If submit call is made with needResults false, results will not be saved.
* The context used to wait for results from one submit call. If submit call is made with
* needResults false, results will not be saved.
* @since 2.0.0
*/
@InterfaceAudience.Private

View File

@ -388,8 +388,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>());
this.asyncProcess = asyncProcess;
this.errorsByServer = createServerErrorTracker();
this.errors = (asyncProcess.globalErrors != null)
? asyncProcess.globalErrors : new BatchErrors();
this.errors = new BatchErrors();
this.operationTimeout = task.getOperationTimeout();
this.rpcTimeout = task.getRpcTimeout();
this.currentCallable = task.getCallable();

View File

@ -16,8 +16,11 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -141,7 +144,13 @@ public class BufferedMutatorImpl implements BufferedMutator {
RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
this(conn, params,
// puts need to track errors globally due to how the APIs currently work.
new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, true, rpcFactory));
new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory));
}
private void checkClose() {
if (closed) {
throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
}
}
@VisibleForTesting
@ -173,16 +182,13 @@ public class BufferedMutatorImpl implements BufferedMutator {
@Override
public void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
if (closed) {
throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
}
checkClose();
long toAddSize = 0;
int toAddCount = 0;
for (Mutation m : ms) {
if (m instanceof Put) {
validatePut((Put) m);
HTable.validatePut((Put) m, maxKeyValueSize);
}
toAddSize += m.heapSize();
++toAddCount;
@ -191,26 +197,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
if (currentWriteBufferSize.get() == 0) {
firstRecordInBufferTimestamp.set(System.currentTimeMillis());
}
// This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed.
if (ap.hasError()) {
currentWriteBufferSize.addAndGet(toAddSize);
writeAsyncBuffer.addAll(ms);
undealtMutationCount.addAndGet(toAddCount);
backgroundFlushCommits(true);
} else {
currentWriteBufferSize.addAndGet(toAddSize);
writeAsyncBuffer.addAll(ms);
undealtMutationCount.addAndGet(toAddCount);
}
// Now try and queue what needs to be queued.
while (undealtMutationCount.get() != 0
&& currentWriteBufferSize.get() > writeBufferSize) {
backgroundFlushCommits(false);
}
currentWriteBufferSize.addAndGet(toAddSize);
writeAsyncBuffer.addAll(ms);
undealtMutationCount.addAndGet(toAddCount);
doFlush(false);
}
@VisibleForTesting
@ -238,118 +228,40 @@ public class BufferedMutatorImpl implements BufferedMutator {
}
}
// validate for well-formedness
public void validatePut(final Put put) throws IllegalArgumentException {
HTable.validatePut(put, maxKeyValueSize);
}
@Override
public synchronized void close() throws IOException {
try {
if (this.closed) {
return;
}
// Stop any running Periodic Flush timer.
disableWriteBufferPeriodicFlush();
// As we can have an operation in progress even if the buffer is empty, we call
// backgroundFlushCommits at least one time.
backgroundFlushCommits(true);
if (cleanupPoolOnClose) {
this.pool.shutdown();
boolean terminated;
int loopCnt = 0;
do {
// wait until the pool has terminated
terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
loopCnt += 1;
if (loopCnt >= 10) {
LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
break;
}
} while (!terminated);
}
} catch (InterruptedException e) {
LOG.warn("waitForTermination interrupted");
} finally {
this.closed = true;
}
}
@Override
public synchronized void flush() throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
// As we can have an operation in progress even if the buffer is empty, we call
// backgroundFlushCommits at least one time.
backgroundFlushCommits(true);
}
/**
* Send the operations in the buffer to the servers. Does not wait for the server's answer. If
* the is an error (max retried reach from a previous flush or bad operation), it tries to send
* all operations in the buffer and sends an exception.
*
* @param synchronous - if true, sends all the writes and wait for all of them to finish before
* returning.
*/
private void backgroundFlushCommits(boolean synchronous) throws
InterruptedIOException,
RetriesExhaustedWithDetailsException {
if (!synchronous && writeAsyncBuffer.isEmpty()) {
if (closed) {
return;
}
if (!synchronous) {
QueueRowAccess taker = new QueueRowAccess();
AsyncProcessTask task = wrapAsyncProcessTask(taker);
try {
ap.submit(task);
if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -"
+ " waiting for all operation in progress to finish (successfully or not)");
}
} finally {
taker.restoreRemainder();
}
}
if (synchronous || ap.hasError()) {
QueueRowAccess taker = new QueueRowAccess();
AsyncProcessTask task = wrapAsyncProcessTask(taker);
try {
while (!taker.isEmpty()) {
ap.submit(task);
taker.reset();
}
} finally {
taker.restoreRemainder();
}
RetriesExhaustedWithDetailsException error =
ap.waitForAllPreviousOpsAndReset(null, tableName);
if (error != null) {
if (listener == null) {
throw error;
} else {
this.listener.onException(error, this);
// Stop any running Periodic Flush timer.
disableWriteBufferPeriodicFlush();
try {
// As we can have an operation in progress even if the buffer is empty, we call
// doFlush at least one time.
doFlush(true);
} finally {
if (cleanupPoolOnClose) {
this.pool.shutdown();
try {
if (!pool.awaitTermination(600, TimeUnit.SECONDS)) {
LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
}
} catch (InterruptedException e) {
LOG.warn("waitForTermination interrupted");
Thread.currentThread().interrupt();
}
}
closed = true;
}
}
/**
* Reuse the AsyncProcessTask when calling
* {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
* @param taker access the inner buffer.
* @return An AsyncProcessTask which always returns the latest rpc and operation timeout.
*/
private AsyncProcessTask wrapAsyncProcessTask(QueueRowAccess taker) {
AsyncProcessTask task = AsyncProcessTask.newBuilder()
private AsyncProcessTask createTask(QueueRowAccess access) {
return new AsyncProcessTask(AsyncProcessTask.newBuilder()
.setPool(pool)
.setTableName(tableName)
.setRowAccess(taker)
.setRowAccess(access)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
.build();
return new AsyncProcessTask(task) {
.build()) {
@Override
public int getRpcTimeout() {
return rpcTimeout.get();
@ -362,6 +274,72 @@ public class BufferedMutatorImpl implements BufferedMutator {
};
}
@Override
public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
checkClose();
doFlush(true);
}
/**
* Send the operations in the buffer to the servers.
*
* @param flushAll - if true, sends all the writes and wait for all of them to finish before
* returning. Otherwise, flush until buffer size is smaller than threshold
*/
private void doFlush(boolean flushAll) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>();
while (true) {
if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) {
// There is the room to accept more mutations.
break;
}
AsyncRequestFuture asf;
try (QueueRowAccess access = new QueueRowAccess()) {
if (access.isEmpty()) {
// It means someone has gotten the ticker to run the flush.
break;
}
asf = ap.submit(createTask(access));
}
// DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't
// be released.
asf.waitUntilDone();
if (asf.hasError()) {
errors.add(asf.getErrors());
}
}
RetriesExhaustedWithDetailsException exception = makeException(errors);
if (exception == null) {
return;
} else if(listener == null) {
throw exception;
} else {
listener.onException(exception, this);
}
}
private static RetriesExhaustedWithDetailsException makeException(
List<RetriesExhaustedWithDetailsException> errors) {
switch (errors.size()) {
case 0:
return null;
case 1:
return errors.get(0);
default:
List<Throwable> exceptions = new ArrayList<>();
List<Row> actions = new ArrayList<>();
List<String> hostnameAndPort = new ArrayList<>();
errors.forEach(e -> {
exceptions.addAll(e.exceptions);
actions.addAll(e.actions);
hostnameAndPort.addAll(e.hostnameAndPort);
});
return new RetriesExhaustedWithDetailsException(exceptions, actions, hostnameAndPort);
}
}
/**
* {@inheritDoc}
*/
@ -433,12 +411,15 @@ public class BufferedMutatorImpl implements BufferedMutator {
return undealtMutationCount.get();
}
private class QueueRowAccess implements RowAccess<Row> {
private class QueueRowAccess implements RowAccess<Row>, Closeable {
private int remainder = undealtMutationCount.getAndSet(0);
void reset() {
restoreRemainder();
remainder = undealtMutationCount.getAndSet(0);
@Override
public void close() {
if (remainder > 0) {
undealtMutationCount.addAndGet(remainder);
remainder = 0;
}
}
@Override
@ -474,6 +455,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
iter.remove();
currentWriteBufferSize.addAndGet(-last.heapSize());
--remainder;
last = null;
}
};
}
@ -483,13 +465,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
return remainder;
}
void restoreRemainder() {
if (remainder > 0) {
undealtMutationCount.addAndGet(remainder);
remainder = 0;
}
}
@Override
public boolean isEmpty() {
return remainder <= 0;

View File

@ -261,7 +261,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, false, rpcControllerFactory);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics = new MetricsConnection(this);
} else {

View File

@ -1182,10 +1182,9 @@ public class HTable implements Table {
final List<String> callbackErrorServers = new ArrayList<>();
Object[] results = new Object[execs.size()];
AsyncProcess asyncProcess =
new AsyncProcess(connection, configuration,
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
true, RpcControllerFactory.instantiate(configuration));
AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
RpcControllerFactory.instantiate(configuration));
Callback<ClientProtos.CoprocessorServiceResult> resultsCallback
= (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {

View File

@ -452,7 +452,7 @@ public class HTableMultiplexer {
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory);
this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory);
this.executor = executor;
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
this.pool = pool;

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.client;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -27,7 +25,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* of elements between collections.
* @param <T>
*/
@InterfaceAudience.Public
@InterfaceAudience.Private
public interface RowAccess<T> extends Iterable<T> {
/**
* @return true if there are no elements.

View File

@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@ -171,22 +170,17 @@ public class TestAsyncProcess {
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
this(hc, conf, new AtomicInteger());
super(hc, conf,
new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
service = Executors.newFixedThreadPool(5);
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), new CountingThreadFactory(nbThreads));
}
public MyAsyncProcess(
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
super(hc, conf,
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
service = Executors.newFixedThreadPool(5);
}
public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
boolean needResults) throws InterruptedIOException {
@ -324,7 +318,7 @@ public class TestAsyncProcess {
private final IOException ioe;
public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
super(hc, conf, true);
super(hc, conf);
this.ioe = ioe;
serverTrackerTimeout = 1L;
}
@ -656,7 +650,7 @@ public class TestAsyncProcess {
+ ", minCountSnRequest:" + minCountSnRequest
+ ", minCountSn2Request:" + minCountSn2Request);
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) {
mutator.mutate(puts);
@ -807,7 +801,7 @@ public class TestAsyncProcess {
@Test
public void testFail() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
List<Put> puts = new ArrayList<>(1);
Put p = createPut(1, false);
@ -834,7 +828,7 @@ public class TestAsyncProcess {
@Test
public void testSubmitTrue() throws IOException {
ClusterConnection conn = createHConnection();
final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false);
final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
final String defaultClazz =
conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
@ -882,7 +876,7 @@ public class TestAsyncProcess {
@Test
public void testFailAndSuccess() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
List<Put> puts = new ArrayList<>(3);
puts.add(createPut(1, false));
@ -909,7 +903,7 @@ public class TestAsyncProcess {
@Test
public void testFlush() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
List<Put> puts = new ArrayList<>(3);
puts.add(createPut(1, false));
@ -927,7 +921,7 @@ public class TestAsyncProcess {
@Test
public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
ClusterConnection hc = createHConnection();
MyAsyncProcess ap = new MyAsyncProcess(hc, CONF, false);
MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
testTaskCount(ap);
}
@ -944,7 +938,7 @@ public class TestAsyncProcess {
conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
SimpleRequestController.class.getName());
MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, false);
MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
testTaskCount(ap);
if (defaultClazz != null) {
conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
@ -981,7 +975,7 @@ public class TestAsyncProcess {
conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
SimpleRequestController.class.getName());
final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false);
final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
SimpleRequestController controller = (SimpleRequestController) ap.requestController;
@ -1087,7 +1081,7 @@ public class TestAsyncProcess {
@Test
public void testHTablePutSuccess() throws Exception {
ClusterConnection conn = createHConnection();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
@ -1104,7 +1098,7 @@ public class TestAsyncProcess {
@Test
public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
ClusterConnection conn = createHConnection();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
checkPeriodicFlushParameters(conn, ap,
1234, 1234,
@ -1150,7 +1144,7 @@ public class TestAsyncProcess {
@Test
public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
ClusterConnection conn = createHConnection();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP
@ -1217,7 +1211,7 @@ public class TestAsyncProcess {
@Test
public void testBufferedMutatorImplWithSharedPool() throws Exception {
ClusterConnection conn = createHConnection();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap);
@ -1226,30 +1220,27 @@ public class TestAsyncProcess {
}
@Test
public void testHTableFailedPutAndNewPut() throws Exception {
public void testFailedPutAndNewPut() throws Exception {
ClusterConnection conn = createHConnection();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE)
.writeBufferSize(0);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
Put p = createPut(1, false);
mutator.mutate(p);
ap.waitForMaximumCurrentTasks(0, null); // Let's do all the retries.
// We're testing that we're behaving as we were behaving in 0.94: sending exceptions in the
// doPut if it fails.
// This said, it's not a very easy going behavior. For example, when we insert a list of
// puts, we may raise an exception in the middle of the list. It's then up to the caller to
// manage what was inserted, what was tried but failed, and what was not even tried.
p = createPut(1, true);
Assert.assertEquals(0, mutator.size());
try {
mutator.mutate(p);
Assert.fail();
} catch (RetriesExhaustedException expected) {
} catch (RetriesExhaustedWithDetailsException expected) {
assertEquals(1, expected.getNumExceptions());
assertTrue(expected.getRow(0) == p);
}
// Let's do all the retries.
ap.waitForMaximumCurrentTasks(0, null);
Assert.assertEquals(0, mutator.size());
// There is no global error so the new put should not fail
mutator.mutate(createPut(1, true));
Assert.assertEquals("the put should not been inserted.", 0, mutator.size());
}
@ -1277,7 +1268,7 @@ public class TestAsyncProcess {
public void testBatch() throws IOException, InterruptedException {
ClusterConnection conn = new MyConnectionImpl(CONF);
HTable ht = (HTable) conn.getTable(DUMMY_TABLE);
ht.multiAp = new MyAsyncProcess(conn, CONF, false);
ht.multiAp = new MyAsyncProcess(conn, CONF);
List<Put> puts = new ArrayList<>(7);
puts.add(createPut(1, true));
@ -1307,7 +1298,7 @@ public class TestAsyncProcess {
public void testErrorsServers() throws IOException {
Configuration configuration = new Configuration(CONF);
ClusterConnection conn = new MyConnectionImpl(configuration);
MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
MyAsyncProcess ap = new MyAsyncProcess(conn, configuration);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
@ -1323,21 +1314,22 @@ public class TestAsyncProcess {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
assertEquals(1, expected.getNumExceptions());
assertTrue(expected.getRow(0) == p);
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
}
@Ignore @Test // Test is failing with wrong count. FIX!!
@Test
public void testReadAndWriteTimeout() throws IOException {
final long readTimeout = 10 * 1000;
final long writeTimeout = 20 * 1000;
Configuration copyConf = new Configuration(CONF);
copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
ClusterConnection conn = createHConnection();
Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
ClusterConnection conn = new MyConnectionImpl(copyConf);
MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) {
ht.multiAp = ap;
List<Get> gets = new LinkedList<>();
@ -1368,7 +1360,7 @@ public class TestAsyncProcess {
}
@Test
public void testGlobalErrors() throws IOException {
public void testErrors() throws IOException {
ClusterConnection conn = new MyConnectionImpl(CONF);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
@ -1383,6 +1375,8 @@ public class TestAsyncProcess {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
assertEquals(1, expected.getNumExceptions());
assertTrue(expected.getRow(0) == p);
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
@ -1404,6 +1398,8 @@ public class TestAsyncProcess {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
assertEquals(1, expected.getNumExceptions());
assertTrue(expected.getRow(0) == p);
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
@ -1703,7 +1699,7 @@ public class TestAsyncProcess {
static class AsyncProcessForThrowableCheck extends AsyncProcess {
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(
conf));
}
}
@ -1759,6 +1755,8 @@ public class TestAsyncProcess {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
assertEquals(1, expected.getNumExceptions());
assertTrue(expected.getRow(0) == p);
}
long actualSleep = System.currentTimeMillis() - startTime;
long expectedSleep = 0L;
@ -1784,6 +1782,8 @@ public class TestAsyncProcess {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
assertEquals(1, expected.getNumExceptions());
assertTrue(expected.getRow(0) == p);
}
actualSleep = System.currentTimeMillis() - startTime;
expectedSleep = 0L;

View File

@ -202,7 +202,7 @@ public class TestAsyncProcessWithRegionException {
private final ExecutorService service = Executors.newFixedThreadPool(5);
MyAsyncProcess(ClusterConnection hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
}
public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)

View File

@ -127,7 +127,7 @@ public class HConnectionTestingUtility {
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
Mockito.when(c.getAsyncProcess()).thenReturn(
new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), false,
new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf),
RpcControllerFactory.instantiate(conf)));
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
RpcRetryingCallerFactory.instantiate(conf,