HBASE-16556 The read/write timeout are not used in HTable.delete(List), HTable.get(List), and HTable.existsAll(List) (ChiaPing Tsai)

This commit is contained in:
tedyu 2016-09-04 21:15:10 -07:00
parent a0e52a2dab
commit e1aab356b3
3 changed files with 70 additions and 20 deletions

View File

@ -339,6 +339,12 @@ public class HTable implements HTableInterface, RegionLocator {
cleanupConnectionOnClose = false;
// used from tests, don't trust the connection is real
this.mutator = new BufferedMutatorImpl(conn, null, null, params);
this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
}
/**
@ -889,8 +895,8 @@ public class HTable implements HTableInterface, RegionLocator {
return new Result[]{get(gets.get(0))};
}
try {
Object [] r1 = batch((List)gets);
Object[] r1 = new Object[gets.size()];
batch((List<? extends Row>)gets, r1, readRpcTimeout);
// translate.
Result [] results = new Result[r1.length];
int i=0;
@ -905,6 +911,15 @@ public class HTable implements HTableInterface, RegionLocator {
}
}
public void batch(final List<? extends Row> actions, final Object[] results, int timeout)
throws InterruptedException, IOException {
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
/**
* {@inheritDoc}
*/
@ -995,7 +1010,7 @@ public class HTable implements HTableInterface, RegionLocator {
throws IOException {
Object[] results = new Object[deletes.size()];
try {
batch(deletes, results);
batch(deletes, results, writeRpcTimeout);
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
} finally {
@ -1423,9 +1438,9 @@ public class HTable implements HTableInterface, RegionLocator {
exists.add(ge);
}
Object[] r1;
Object[] r1 = new Object[exists.size()];
try {
r1 = batch(exists);
batch(exists, r1, readRpcTimeout);
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
@ -1485,17 +1500,6 @@ public class HTable implements HTableInterface, RegionLocator {
this.batchCallback(list, results, callback);
}
/**
* Parameterized batch processing, allowing varying return types for different
* {@link Row} implementations.
*/
public void processBatch(final List<? extends Row> list, final Object[] results)
throws IOException, InterruptedException {
this.batch(list, results);
}
@Override
public void close() throws IOException {
if (this.closed) {

View File

@ -152,7 +152,7 @@ public class TestAsyncProcess {
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
public AtomicInteger callsCt = new AtomicInteger();
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
private long previousTimeout = -1;
@Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
@ -208,7 +208,13 @@ public class TestAsyncProcess {
// We use results in tests to check things, so override to always save them.
return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
}
@Override
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
PayloadCarryingServerCallable callable, int curTimeout) {
previousTimeout = curTimeout;
return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout);
}
@Override
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
@ -1277,7 +1283,7 @@ public class TestAsyncProcess {
Object[] res = new Object[puts.size()];
try {
ht.processBatch(puts, res);
ht.batch(puts, res);
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
@ -1317,6 +1323,46 @@ public class TestAsyncProcess {
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
}
@Test
public void testReadAndWriteTimeout() throws IOException {
final long readTimeout = 10 * 1000;
final long writeTimeout = 20 * 1000;
Configuration copyConf = new Configuration(conf);
copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
ClusterConnection conn = createHConnection();
Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
try (HTable ht = new HTable(conn, bufferParam)) {
MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
ht.multiAp = ap;
List<Get> gets = new LinkedList<>();
gets.add(new Get(DUMMY_BYTES_1));
gets.add(new Get(DUMMY_BYTES_2));
try {
ht.get(gets);
} catch (ClassCastException e) {
// No result response on this test.
}
assertEquals(readTimeout, ap.previousTimeout);
ap.previousTimeout = -1;
try {
ht.existsAll(gets);
} catch (ClassCastException e) {
// No result response on this test.
}
assertEquals(readTimeout, ap.previousTimeout);
ap.previousTimeout = -1;
List<Delete> deletes = new LinkedList<>();
deletes.add(new Delete(DUMMY_BYTES_1));
deletes.add(new Delete(DUMMY_BYTES_2));
ht.delete(deletes);
assertEquals(writeTimeout, ap.previousTimeout);
}
}
@Test
public void testGlobalErrors() throws IOException {
ClusterConnection conn = new MyConnectionImpl(conf);

View File

@ -42,5 +42,5 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment {
RegionServerServices getRegionServerServices();
/** @return shared data between all instances of this coprocessor */
ConcurrentMap<String, Object> getSharedData();
ConcurrentMap<String, Object> getSharedData();
}