HBASE-10277 refactor AsyncProcess

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1564832 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2014-02-05 17:08:33 +00:00
parent 1c46c16cc0
commit 7898e68fd4
11 changed files with 932 additions and 858 deletions

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
@ -524,4 +525,9 @@ public interface HConnection extends Abortable, Closeable {
* @return Nonce generator for this HConnection; may be null if disabled in configuration.
*/
public NonceGenerator getNonceGenerator();
/**
* @return Default AsyncProcess associated with this connection.
*/
AsyncProcess getAsyncProcess();
}

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@ -76,42 +77,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.*;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.security.User;
@ -233,6 +198,18 @@ public class HConnectionManager {
};
}
/** Dummy nonce generator for disabled nonces. */
static class NoNonceGenerator implements NonceGenerator {
@Override
public long getNonceGroup() {
return HConstants.NO_NONCE;
}
@Override
public long newNonce() {
return HConstants.NO_NONCE;
}
}
/*
* Non-instantiable.
*/
@ -574,6 +551,7 @@ public class HConnectionManager {
final int rpcTimeout;
private NonceGenerator nonceGenerator = null;
private final int prefetchRegionLimit;
private final AsyncProcess asyncProcess;
private volatile boolean closed;
private volatile boolean aborted;
@ -687,18 +665,6 @@ public class HConnectionManager {
}
}
/** Dummy nonce generator for disabled nonces. */
private static class NoNonceGenerator implements NonceGenerator {
@Override
public long getNonceGroup() {
return HConstants.NO_NONCE;
}
@Override
public long newNonce() {
return HConstants.NO_NONCE;
}
}
/**
* For tests.
*/
@ -722,6 +688,7 @@ public class HConnectionManager {
} else {
this.nonceGenerator = new NoNonceGenerator();
}
this.asyncProcess = createAsyncProcess(this.conf);
this.prefetchRegionLimit = conf.getInt(
HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
@ -2342,17 +2309,11 @@ public class HConnectionManager {
Batch.Callback<R> callback)
throws IOException, InterruptedException {
// To fulfill the original contract, we have a special callback. This callback
// will set the results in the Object array.
ObjectResultFiller<R> cb = new ObjectResultFiller<R>(results, callback);
AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf);
// We're doing a submit all. This way, the originalIndex will match the initial list.
asyncProcess.submitAll(list);
asyncProcess.waitUntilDone();
if (asyncProcess.hasError()) {
throw asyncProcess.getErrors();
AsyncRequestFuture ars = this.asyncProcess.submitAll(
pool, tableName, list, callback, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
@ -2368,52 +2329,18 @@ public class HConnectionManager {
processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
}
// For tests.
protected <R> AsyncProcess createAsyncProcess(TableName tableName, ExecutorService pool,
AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
return new AsyncProcess<R>(this, tableName, pool, callback, conf,
RpcRetryingCallerFactory.instantiate(conf));
// For tests to override.
protected AsyncProcess createAsyncProcess(Configuration conf) {
// No default pool available.
return new AsyncProcess(
this, conf, this.batchPool, RpcRetryingCallerFactory.instantiate(conf), false);
}
/**
* Fill the result array for the interfaces using it.
*/
private static class ObjectResultFiller<Res>
implements AsyncProcess.AsyncProcessCallback<Res> {
private final Object[] results;
private Batch.Callback<Res> callback;
ObjectResultFiller(Object[] results, Batch.Callback<Res> callback) {
this.results = results;
this.callback = callback;
}
@Override
public void success(int pos, byte[] region, Row row, Res result) {
assert pos < results.length;
results[pos] = result;
if (callback != null) {
callback.update(region, row.getRow(), result);
}
}
@Override
public boolean failure(int pos, Row row, Throwable t) {
assert pos < results.length;
results[pos] = t;
//Batch.Callback<Res> was not called on failure in 0.94. We keep this.
return true; // we want to have this failure in the failures list.
}
@Override
public boolean retriableFailure(int originalIndex, Row row, Throwable exception) {
return true; // we retry
}
@Override
public AsyncProcess getAsyncProcess() {
return asyncProcess;
}
/*
* Return the number of cached region for a table. It will only be called
* from a unit test.

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@ -133,7 +134,9 @@ public class HTable implements HTableInterface {
private final boolean cleanupConnectionOnClose; // close the connection in close()
/** The Async process for puts with autoflush set to false or multiputs */
protected AsyncProcess<Object> ap;
protected AsyncProcess ap;
/** The Async process for batch */
protected AsyncProcess multiAp;
private RpcRetryingCallerFactory rpcCallerFactory;
/**
@ -211,7 +214,7 @@ public class HTable implements HTableInterface {
this.pool = getDefaultExecutor(this.configuration);
this.finishSetup();
}
public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
if (maxThreads == 0) {
@ -314,7 +317,7 @@ public class HTable implements HTableInterface {
/**
* For internal testing.
*/
protected HTable(){
protected HTable() {
tableName = null;
cleanupPoolOnClose = false;
cleanupConnectionOnClose = false;
@ -340,8 +343,9 @@ public class HTable implements HTableInterface {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
ap = new AsyncProcess<Object>(connection, tableName, pool, null,
configuration, rpcCallerFactory);
// puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true);
multiAp = this.connection.getAsyncProcess();
this.maxKeyValueSize = this.configuration.getInt(
"hbase.client.keyvalue.maxsize", -1);
@ -789,9 +793,13 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
public void batch(final List<?extends Row> actions, final Object[] results)
public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
batchCallback(actions, results, null);
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
/**
@ -802,7 +810,9 @@ public class HTable implements HTableInterface {
@Override
public Object[] batch(final List<? extends Row> actions)
throws InterruptedException, IOException {
return batchCallback(actions, null);
Object[] results = new Object[actions.size()];
batch(actions, results);
return results;
}
/**
@ -911,7 +921,10 @@ public class HTable implements HTableInterface {
* @throws InterruptedIOException if we were interrupted.
*/
private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
if (ap.hasError()){
// This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed.
if (ap.hasError()) {
writeAsyncBuffer.add(put);
backgroundFlushCommits(true);
}
@ -938,30 +951,22 @@ public class HTable implements HTableInterface {
InterruptedIOException, RetriesExhaustedWithDetailsException {
try {
do {
ap.submit(writeAsyncBuffer, true);
} while (synchronous && !writeAsyncBuffer.isEmpty());
if (synchronous) {
ap.waitUntilDone();
if (!synchronous) {
ap.submit(tableName, writeAsyncBuffer, true, null, false);
if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -" +
" waiting for all operation in progress to finish (successfully or not)");
}
}
if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -" +
" waiting for all operation in progress to finish (successfully or not)");
if (synchronous || ap.hasError()) {
while (!writeAsyncBuffer.isEmpty()) {
ap.submit(writeAsyncBuffer, true);
ap.submit(tableName, writeAsyncBuffer, true, null, false);
}
ap.waitUntilDone();
if (!clearBufferOnFail) {
// if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
// write buffer. This is a questionable feature kept here for backward compatibility
writeAsyncBuffer.addAll(ap.getFailedOperations());
List<Row> failedRows = clearBufferOnFail ? null : writeAsyncBuffer;
RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(failedRows);
if (error != null) {
throw error;
}
RetriesExhaustedWithDetailsException e = ap.getErrors();
ap.clearErrors();
throw e;
}
} finally {
currentWriteBufferSize = 0;
@ -1301,8 +1306,7 @@ public class HTable implements HTableInterface {
*/
public void processBatch(final List<? extends Row> list, final Object[] results)
throws IOException, InterruptedException {
this.processBatchCallback(list, results, null);
this.batch(list, results);
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@ -35,10 +36,9 @@ import org.apache.hadoop.hbase.util.Pair;
@InterfaceAudience.Private
public class MultiResponse {
// map of regionName to list of (Results paired to the original index for that
// Result)
private Map<byte[], List<Pair<Integer, Object>>> results =
new TreeMap<byte[], List<Pair<Integer, Object>>>(Bytes.BYTES_COMPARATOR);
// map of regionName to map of Results by the original index for that Result
private Map<byte[], Map<Integer, Object>> results =
new TreeMap<byte[], Map<Integer, Object>>(Bytes.BYTES_COMPARATOR);
/**
* The server can send us a failure for the region itself, instead of individual failure.
@ -56,7 +56,7 @@ public class MultiResponse {
*/
public int size() {
int size = 0;
for (Collection<?> c : results.values()) {
for (Map<?,?> c : results.values()) {
size += c.size();
}
return size;
@ -66,25 +66,19 @@ public class MultiResponse {
* Add the pair to the container, grouped by the regionName
*
* @param regionName
* @param r
* First item in the pair is the original index of the Action
* (request). Second item is the Result. Result will be empty for
* successful Put and Delete actions.
* @param index the original index of the Action (request).
* @param result the result; will be empty for successful Put and Delete actions.
*/
public void add(byte[] regionName, Pair<Integer, Object> r) {
List<Pair<Integer, Object>> rs = results.get(regionName);
public void add(byte[] regionName, int originalIndex, Object resOrEx) {
Map<Integer, Object> rs = results.get(regionName);
if (rs == null) {
rs = new ArrayList<Pair<Integer, Object>>();
rs = new HashMap<Integer, Object>();
results.put(regionName, rs);
}
rs.add(r);
rs.put(originalIndex, resOrEx);
}
public void add(byte []regionName, int originalIndex, Object resOrEx) {
add(regionName, new Pair<Integer,Object>(originalIndex, resOrEx));
}
public Map<byte[], List<Pair<Integer, Object>>> getResults() {
public Map<byte[], Map<Integer, Object>> getResults() {
return results;
}

View File

@ -48,16 +48,16 @@ import java.util.Set;
public class RetriesExhaustedWithDetailsException
extends RetriesExhaustedException {
List<Throwable> exceptions;
List<? extends Row> actions;
List<Row> actions;
List<String> hostnameAndPort;
public RetriesExhaustedWithDetailsException(List<Throwable> exceptions,
List<? extends Row> actions,
List<Row> actions,
List<String> hostnameAndPort) {
super("Failed " + exceptions.size() + " action" +
pluralize(exceptions) + ": " +
getDesc(exceptions, actions, hostnameAndPort));
this.exceptions = exceptions;
this.actions = actions;
this.hostnameAndPort = hostnameAndPort;

View File

@ -103,7 +103,7 @@ public final class ResponseConverter {
}
byte[] regionName = rs.getValue().toByteArray();
if (actionResult.hasException()){
if (actionResult.hasException()) {
Throwable regionException = ProtobufUtil.toException(actionResult.getException());
results.addException(regionName, regionException);
continue;
@ -117,11 +117,9 @@ public final class ResponseConverter {
for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
if (roe.hasException()) {
results.add(regionName, new Pair<Integer, Object>(roe.getIndex(),
ProtobufUtil.toException(roe.getException())));
results.add(regionName, roe.getIndex(), ProtobufUtil.toException(roe.getException()));
} else if (roe.hasResult()) {
results.add(regionName, new Pair<Integer, Object>(roe.getIndex(),
ProtobufUtil.toResult(roe.getResult(), cells)));
results.add(regionName, roe.getIndex(), ProtobufUtil.toResult(roe.getResult(), cells));
} else {
// no result & no exception. Unexpected.
throw new IllegalStateException("No result & no exception roe=" + roe +

View File

@ -27,6 +27,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
@ -73,9 +76,33 @@ public class TestAsyncProcess {
private static final String success = "success";
private static Exception failure = new Exception("failure");
static class MyAsyncProcess<Res> extends AsyncProcess<Res> {
static class MyAsyncProcess extends AsyncProcess {
final AtomicInteger nbMultiResponse = new AtomicInteger();
final AtomicInteger nbActions = new AtomicInteger();
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
@Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<Res> callback, Object[] results, boolean needResults) {
// Test HTable has tableName of null, so pass DUMMY_TABLE
AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
r.hardRetryLimit = new AtomicInteger(1);
allReqs.add(r);
return r;
}
@SuppressWarnings("unchecked")
public long getRetriesRequested() {
long result = 0;
for (AsyncRequestFuture ars : allReqs) {
if (ars instanceof AsyncProcess.AsyncRequestFutureImpl) {
result += (1 - ((AsyncRequestFutureImpl<?>)ars).hardRetryLimit.get());
}
}
return result;
}
static class CountingThreadFactory implements ThreadFactory {
final AtomicInteger nbThreads;
@ -91,15 +118,29 @@ public class TestAsyncProcess {
}
}
public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf) {
this(hc, callback, conf, new AtomicInteger());
public MyAsyncProcess(HConnection hc, Configuration conf) {
this(hc, conf, new AtomicInteger());
}
public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf,
AtomicInteger nbThreads) {
super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
callback, conf, new RpcRetryingCallerFactory(conf));
public MyAsyncProcess(HConnection hc, Configuration conf, AtomicInteger nbThreads) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
new RpcRetryingCallerFactory(conf), false);
}
public MyAsyncProcess(
HConnection hc, Configuration conf, boolean useGlobalErrors) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
new RpcRetryingCallerFactory(conf), useGlobalErrors);
}
@Override
public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
boolean atLeastOne, Callback<Res> callback, boolean needResults)
throws InterruptedIOException {
// We use results in tests to check things, so override to always save them.
return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
}
@Override
@ -112,7 +153,7 @@ public class TestAsyncProcess {
throws IOException, RuntimeException {
try {
// sleep one second in order for threadpool to start another thread instead of reusing
// existing one.
// existing one.
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore error
@ -144,7 +185,6 @@ public class TestAsyncProcess {
* Returns our async process.
*/
static class MyConnectionImpl extends HConnectionManager.HConnectionImplementation {
MyAsyncProcess<?> ap;
final AtomicInteger nbThreads = new AtomicInteger(0);
final static Configuration c = new Configuration();
@ -160,15 +200,6 @@ public class TestAsyncProcess {
super(conf);
}
@Override
protected <R> AsyncProcess createAsyncProcess(TableName tableName,
ExecutorService pool,
AsyncProcess.AsyncProcessCallback<R> callback,
Configuration confn ) {
ap = new MyAsyncProcess<R>(this, callback, c, nbThreads);
return ap;
}
@Override
public HRegionLocation locateRegion(final TableName tableName,
final byte[] row) {
@ -207,55 +238,57 @@ public class TestAsyncProcess {
@Test
public void testSubmit() throws Exception {
HConnection hc = createHConnection();
AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
AsyncProcess ap = new MyAsyncProcess(hc, conf);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
ap.submit(puts, false);
ap.submit(DUMMY_TABLE, puts, false, null, false);
Assert.assertTrue(puts.isEmpty());
}
@Test
public void testSubmitWithCB() throws Exception {
HConnection hc = createHConnection();
MyCB mcb = new MyCB();
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
final AtomicInteger updateCalled = new AtomicInteger(0);
Batch.Callback<Object> cb = new Batch.Callback<Object>() {
public void update(byte[] region, byte[] row, Object result) {
updateCalled.incrementAndGet();
}
};
AsyncProcess ap = new MyAsyncProcess(hc, conf);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
ap.submit(puts, false);
final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false);
Assert.assertTrue(puts.isEmpty());
while (!(mcb.successCalled.get() == 1) && !ap.hasError()) {
Thread.sleep(1);
}
Assert.assertEquals(mcb.successCalled.get(), 1);
ars.waitUntilDone();
Assert.assertEquals(updateCalled.get(), 1);
}
@Test
public void testSubmitBusyRegion() throws Exception {
HConnection hc = createHConnection();
AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
AsyncProcess ap = new MyAsyncProcess(hc, conf);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
ap.submit(puts, false);
ap.submit(DUMMY_TABLE, puts, false, null, false);
Assert.assertEquals(puts.size(), 1);
ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
ap.submit(puts, false);
Assert.assertTrue(puts.isEmpty());
ap.submit(DUMMY_TABLE, puts, false, null, false);
Assert.assertEquals(0, puts.size());
}
@Test
public void testSubmitBusyRegionServer() throws Exception {
HConnection hc = createHConnection();
AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, null, conf);
AsyncProcess ap = new MyAsyncProcess(hc, conf);
ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
@ -265,80 +298,44 @@ public class TestAsyncProcess {
puts.add(createPut(1, true)); // <== this one will make it, the region is already in
puts.add(createPut(2, true)); // <== new region, but the rs is ok
ap.submit(puts, false);
ap.submit(DUMMY_TABLE, puts, false, null, false);
Assert.assertEquals(" puts=" + puts, 1, puts.size());
ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
ap.submit(puts, false);
ap.submit(DUMMY_TABLE, puts, false, null, false);
Assert.assertTrue(puts.isEmpty());
}
@Test
public void testFail() throws Exception {
HConnection hc = createHConnection();
MyCB mcb = new MyCB();
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
Put p = createPut(1, false);
puts.add(p);
ap.submit(puts, false);
Assert.assertTrue(puts.isEmpty());
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
verifyResult(ars, false);
Assert.assertEquals(2L, ap.getRetriesRequested());
while (!ap.hasError()) {
Thread.sleep(1);
}
Assert.assertEquals(1, ars.getErrors().exceptions.size());
Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
failure.equals(ars.getErrors().exceptions.get(0)));
Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
failure.equals(ars.getErrors().exceptions.get(0)));
Assert.assertEquals(0, mcb.successCalled.get());
Assert.assertEquals(2, mcb.retriableFailure.get());
Assert.assertEquals(1, mcb.failureCalled.get());
Assert.assertEquals(1, ap.getErrors().exceptions.size());
Assert.assertTrue("was: " + ap.getErrors().exceptions.get(0),
failure.equals(ap.getErrors().exceptions.get(0)));
Assert.assertTrue("was: " + ap.getErrors().exceptions.get(0),
failure.equals(ap.getErrors().exceptions.get(0)));
Assert.assertEquals(1, ap.getFailedOperations().size());
Assert.assertTrue("was: " + ap.getFailedOperations().get(0),
p.equals(ap.getFailedOperations().get(0)));
Assert.assertEquals(1, ars.getFailedOperations().size());
Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
p.equals(ars.getFailedOperations().get(0)));
}
@Test
public void testWaitForNextTaskDone() throws IOException {
HConnection hc = createHConnection();
MyCB mcb = new MyCB();
final AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
ap.tasksSent.incrementAndGet();
final AtomicBoolean checkPoint = new AtomicBoolean(false);
final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
Thread t = new Thread(){
@Override
public void run(){
Threads.sleep(1000);
Assert.assertFalse(checkPoint.get());
ap.tasksDone.incrementAndGet();
checkPoint2.set(true);
}
};
t.start();
ap.waitForNextTaskDone(0);
checkPoint.set(true);
while (!checkPoint2.get()){
Threads.sleep(1);
}
}
@Test
public void testSubmitTrue() throws IOException {
HConnection hc = createHConnection();
MyCB mcb = new MyCB();
final AsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
ap.tasksSent.incrementAndGet();
final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
ap.tasksInProgress.incrementAndGet();
final AtomicInteger ai = new AtomicInteger(1);
ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
@ -349,9 +346,9 @@ public class TestAsyncProcess {
@Override
public void run(){
Threads.sleep(1000);
Assert.assertFalse(checkPoint.get());
Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent
ai.decrementAndGet();
ap.tasksDone.incrementAndGet();
ap.tasksInProgress.decrementAndGet();
checkPoint2.set(true);
}
};
@ -360,12 +357,12 @@ public class TestAsyncProcess {
Put p = createPut(1, true);
puts.add(p);
ap.submit(puts, false);
ap.submit(DUMMY_TABLE, puts, false, null, false);
Assert.assertFalse(puts.isEmpty());
t.start();
ap.submit(puts, true);
ap.submit(DUMMY_TABLE, puts, true, null, false);
Assert.assertTrue(puts.isEmpty());
checkPoint.set(true);
@ -376,71 +373,50 @@ public class TestAsyncProcess {
@Test
public void testFailAndSuccess() throws Exception {
HConnection hc = createHConnection();
MyCB mcb = new MyCB();
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, false));
puts.add(createPut(1, true));
puts.add(createPut(1, true));
ap.submit(puts, false);
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertTrue(puts.isEmpty());
long cutoff = System.currentTimeMillis() + 60000;
while (!ap.hasError() && System.currentTimeMillis() < cutoff) {
Thread.sleep(1);
}
Assert.assertTrue(ap.hasError());
ap.waitUntilDone();
Assert.assertEquals(mcb.successCalled.get(), 2);
Assert.assertEquals(mcb.retriableFailure.get(), 2);
Assert.assertEquals(mcb.failureCalled.get(), 1);
Assert.assertEquals(1, ap.getErrors().actions.size());
ars.waitUntilDone();
verifyResult(ars, false, true, true);
Assert.assertEquals(2, ap.getRetriesRequested());
Assert.assertEquals(1, ars.getErrors().actions.size());
puts.add(createPut(1, true));
ap.submit(puts, false);
Assert.assertTrue(puts.isEmpty());
while (mcb.successCalled.get() != 3) {
Thread.sleep(1);
}
Assert.assertEquals(mcb.retriableFailure.get(), 2);
Assert.assertEquals(mcb.failureCalled.get(), 1);
ap.clearErrors();
Assert.assertTrue(ap.getErrors().actions.isEmpty());
// Wait for AP to be free. While ars might have the result, ap counters are decreased later.
ap.waitUntilDone();
ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
Assert.assertEquals(2, ap.getRetriesRequested());
verifyResult(ars, true);
}
@Test
public void testFlush() throws Exception {
HConnection hc = createHConnection();
MyCB mcb = new MyCB();
AsyncProcess ap = new MyAsyncProcess<Object>(hc, mcb, conf);
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, false));
puts.add(createPut(1, true));
puts.add(createPut(1, true));
ap.submit(puts, false);
ap.waitUntilDone();
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
ars.waitUntilDone();
verifyResult(ars, false, true, true);
Assert.assertEquals(2, ap.getRetriesRequested());
Assert.assertEquals(mcb.successCalled.get(), 2);
Assert.assertEquals(mcb.retriableFailure.get(), 2);
Assert.assertEquals(mcb.failureCalled.get(), 1);
Assert.assertEquals(1, ap.getFailedOperations().size());
Assert.assertEquals(1, ars.getFailedOperations().size());
}
@Test
public void testMaxTask() throws Exception {
HConnection hc = createHConnection();
final AsyncProcess ap = new MyAsyncProcess<Object>(hc, null, conf);
final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
for (int i = 0; i < 1000; i++) {
ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
@ -461,7 +437,7 @@ public class TestAsyncProcess {
t.start();
try {
ap.submit(puts, false);
ap.submit(DUMMY_TABLE, puts, false, null, false);
Assert.fail("We should have been interrupted.");
} catch (InterruptedIOException expected) {
}
@ -471,7 +447,7 @@ public class TestAsyncProcess {
Thread t2 = new Thread() {
public void run() {
Threads.sleep(sleepTime);
while (ap.tasksDone.get() > 0) {
while (ap.tasksInProgress.get() > 0) {
ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
}
}
@ -479,39 +455,13 @@ public class TestAsyncProcess {
t2.start();
long start = System.currentTimeMillis();
ap.submit(new ArrayList<Row>(), false);
ap.submit(DUMMY_TABLE, new ArrayList<Row>(), false, null, false);
long end = System.currentTimeMillis();
//Adds 100 to secure us against approximate timing.
Assert.assertTrue(start + 100L + sleepTime > end);
}
private class MyCB implements AsyncProcess.AsyncProcessCallback<Object> {
private final AtomicInteger successCalled = new AtomicInteger(0);
private final AtomicInteger failureCalled = new AtomicInteger(0);
private final AtomicInteger retriableFailure = new AtomicInteger(0);
@Override
public void success(int originalIndex, byte[] region, Row row, Object o) {
successCalled.incrementAndGet();
}
@Override
public boolean failure(int originalIndex, Row row, Throwable t) {
failureCalled.incrementAndGet();
return true;
}
@Override
public boolean retriableFailure(int originalIndex, Row row, Throwable exception) {
// We retry once only.
return (retriableFailure.incrementAndGet() < 2);
}
}
private static HConnection createHConnection() throws IOException {
HConnection hc = Mockito.mock(HConnection.class);
@ -535,14 +485,17 @@ public class TestAsyncProcess {
Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE),
Mockito.eq(FAILS))).thenReturn(loc2);
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
return hc;
}
@Test
public void testHTablePutSuccess() throws Exception {
HTable ht = Mockito.mock(HTable.class);
HConnection hc = createHConnection();
ht.ap = new MyAsyncProcess<Object>(hc, null, conf);
ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
Put put = createPut(1, true);
@ -553,9 +506,8 @@ public class TestAsyncProcess {
private void doHTableFailedPut(boolean bufferOn) throws Exception {
HTable ht = new HTable();
HConnection hc = createHConnection();
MyCB mcb = new MyCB(); // This allows to have some hints on what's going on.
ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
ht.ap = ap;
ht.setAutoFlush(true, true);
if (bufferOn) {
ht.setWriteBufferSize(1024L * 1024L);
@ -575,9 +527,15 @@ public class TestAsyncProcess {
} catch (RetriesExhaustedException expected) {
}
Assert.assertEquals(0L, ht.currentWriteBufferSize);
Assert.assertEquals(0, mcb.successCalled.get());
Assert.assertEquals(2, mcb.retriableFailure.get());
Assert.assertEquals(1, mcb.failureCalled.get());
// The table should have sent one request, maybe after multiple attempts
AsyncRequestFuture ars = null;
for (AsyncRequestFuture someReqs : ap.allReqs) {
if (someReqs.getResults().length == 0) continue;
Assert.assertTrue(ars == null);
ars = someReqs;
}
Assert.assertTrue(ars != null);
verifyResult(ars, false);
// This should not raise any exception, puts have been 'received' before by the catch.
ht.close();
@ -589,23 +547,22 @@ public class TestAsyncProcess {
}
@Test
public void doHTableFailedPutWithoutBuffer() throws Exception {
public void testHTableFailedPutWithoutBuffer() throws Exception {
doHTableFailedPut(false);
}
@Test
public void testHTableFailedPutAndNewPut() throws Exception {
HTable ht = new HTable();
HConnection hc = createHConnection();
MyCB mcb = new MyCB(); // This allows to have some hints on what's going on.
ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
ht.ap = ap;
ht.setAutoFlush(false, true);
ht.setWriteBufferSize(0);
Put p = createPut(1, false);
ht.put(p);
ht.ap.waitUntilDone(); // Let's do all the retries.
ap.waitUntilDone(); // Let's do all the retries.
// We're testing that we're behaving as we were behaving in 0.94: sending exceptions in the
// doPut if it fails.
@ -626,14 +583,13 @@ public class TestAsyncProcess {
@Test
public void testWithNoClearOnFail() throws IOException {
HTable ht = new HTable();
HConnection hc = createHConnection();
MyCB mcb = new MyCB();
ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
ht.setAutoFlush(false, false);
Put p = createPut(1, false);
ht.put(p);
Assert.assertEquals(0, ht.writeAsyncBuffer.size());
try {
ht.flushCommits();
} catch (RetriesExhaustedWithDetailsException expected) {
@ -651,6 +607,7 @@ public class TestAsyncProcess {
public void testBatch() throws IOException, InterruptedException {
HTable ht = new HTable();
ht.connection = new MyConnectionImpl();
ht.multiAp = new MyAsyncProcess(ht.connection, conf, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
@ -686,10 +643,9 @@ public class TestAsyncProcess {
// set default writeBufferSize
ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
MyConnectionImpl mci = new MyConnectionImpl(configuration);
ht.connection = mci;
ht.ap = new MyAsyncProcess<Object>(mci, null, configuration);
ht.connection = new MyConnectionImpl(configuration);
MyAsyncProcess ap = new MyAsyncProcess(ht.connection, conf, true);
ht.ap = ap;
Assert.assertNotNull(ht.ap.createServerErrorTracker());
Assert.assertTrue(ht.ap.serverTrackerTimeout > 200);
@ -705,7 +661,7 @@ public class TestAsyncProcess {
} catch (RetriesExhaustedWithDetailsException expected) {
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(ht.ap.tasksSent.get(), 3);
Assert.assertEquals(2, ap.getRetriesRequested());
}
/**
@ -730,11 +686,13 @@ public class TestAsyncProcess {
HTable ht = new HTable();
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
ht.connection = con;
MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
ht.multiAp = ap;
ht.batch(gets);
Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
Assert.assertEquals("1 multi response per server", 2, con.ap.nbMultiResponse.get());
Assert.assertEquals(ap.nbActions.get(), NB_REGS);
Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());
Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
int nbReg = 0;
@ -744,6 +702,13 @@ public class TestAsyncProcess {
Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
}
private void verifyResult(AsyncRequestFuture ars, boolean... expected) {
Object[] actual = ars.getResults();
Assert.assertEquals(expected.length, actual.length);
for (int i = 0; i < expected.length; ++i) {
Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
}
}
/**
* @param regCnt the region: 1 to 3.

View File

@ -63,6 +63,7 @@ import com.google.protobuf.ServiceException;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CoprocessorHConnection implements HConnection {
private static final NonceGenerator ng = new HConnectionManager.NoNonceGenerator();
/**
* Create an unmanaged {@link HConnection} based on the environment in which we are running the
@ -388,6 +389,11 @@ public class CoprocessorHConnection implements HConnection {
@Override
public NonceGenerator getNonceGenerator() {
return null; // don't use nonces for coprocessor connection
return ng; // don't use nonces for coprocessor connection
}
@Override
public AsyncProcess getAsyncProcess() {
return delegate.getAsyncProcess();
}
}

View File

@ -117,6 +117,10 @@ public class HConnectionTestingUtility {
Mockito.when(c.getClient(Mockito.any(ServerName.class))).
thenReturn(client);
}
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
Mockito.when(c.getAsyncProcess()).thenReturn(new AsyncProcess(
c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false));
return c;
}

View File

@ -62,8 +62,13 @@ import org.apache.hadoop.hbase.master.CatalogJanitor.SplitParentFirstComparator;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -73,6 +78,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
@ -105,6 +112,18 @@ public class TestCatalogJanitor {
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
try {
Mockito.when(ri.multi(
(RpcController)Mockito.any(), (MultiRequest)Mockito.any())).
thenAnswer(new Answer<MultiResponse>() {
@Override
public MultiResponse answer(InvocationOnMock invocation) throws Throwable {
return buildMultiResponse( (MultiRequest)invocation.getArguments()[1]);
}
});
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
// Mock an HConnection and a AdminProtocol implementation. Have the
// HConnection return the HRI. Have the HRI return a few mocked up responses
// to make our test work.
@ -940,5 +959,23 @@ public class TestCatalogJanitor {
return htd;
}
private MultiResponse buildMultiResponse(MultiRequest req) {
MultiResponse.Builder builder = MultiResponse.newBuilder();
RegionActionResult.Builder regionActionResultBuilder =
RegionActionResult.newBuilder();
ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
for (RegionAction regionAction: req.getRegionActionList()) {
regionActionResultBuilder.clear();
for (ClientProtos.Action action: regionAction.getActionList()) {
roeBuilder.clear();
roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
roeBuilder.setIndex(action.getIndex());
regionActionResultBuilder.addResultOrException(roeBuilder.build());
}
builder.addRegionActionResult(regionActionResultBuilder.build());
}
return builder.build();
}
}