HBASE-16556 The read/write timeout are not used in HTable.delete(List), HTable.get(List), and HTable.existsAll(List) (ChiaPing Tsai)
This commit is contained in:
parent
520c3cc4e9
commit
592245ff13
|
@ -196,6 +196,12 @@ public class HTable implements Table {
|
||||||
cleanupConnectionOnClose = false;
|
cleanupConnectionOnClose = false;
|
||||||
// used from tests, don't trust the connection is real
|
// used from tests, don't trust the connection is real
|
||||||
this.mutator = new BufferedMutatorImpl(conn, null, null, params);
|
this.mutator = new BufferedMutatorImpl(conn, null, null, params);
|
||||||
|
this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
|
||||||
|
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||||
|
this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||||
|
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -453,7 +459,7 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Object[] r1 = new Object[gets.size()];
|
Object[] r1 = new Object[gets.size()];
|
||||||
batch((List<? extends Row>)gets, r1);
|
batch((List<? extends Row>)gets, r1, readRpcTimeout);
|
||||||
// Translate.
|
// Translate.
|
||||||
Result [] results = new Result[r1.length];
|
Result [] results = new Result[r1.length];
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
@ -480,6 +486,15 @@ public class HTable implements Table {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void batch(final List<? extends Row> actions, final Object[] results, int timeout)
|
||||||
|
throws InterruptedException, IOException {
|
||||||
|
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout);
|
||||||
|
ars.waitUntilDone();
|
||||||
|
if (ars.hasError()) {
|
||||||
|
throw ars.getErrors();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
|
@ -529,7 +544,7 @@ public class HTable implements Table {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Object[] results = new Object[deletes.size()];
|
Object[] results = new Object[deletes.size()];
|
||||||
try {
|
try {
|
||||||
batch(deletes, results);
|
batch(deletes, results, writeRpcTimeout);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -866,7 +881,7 @@ public class HTable implements Table {
|
||||||
|
|
||||||
Object[] r1= new Object[exists.size()];
|
Object[] r1= new Object[exists.size()];
|
||||||
try {
|
try {
|
||||||
batch(exists, r1);
|
batch(exists, r1, readRpcTimeout);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
||||||
}
|
}
|
||||||
|
@ -910,17 +925,6 @@ public class HTable implements Table {
|
||||||
this.batchCallback(list, results, callback);
|
this.batchCallback(list, results, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Parameterized batch processing, allowing varying return types for different
|
|
||||||
* {@link Row} implementations.
|
|
||||||
*/
|
|
||||||
public void processBatch(final List<? extends Row> list, final Object[] results)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
this.batch(list, results);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (this.closed) {
|
if (this.closed) {
|
||||||
|
|
|
@ -155,7 +155,7 @@ public class TestAsyncProcess {
|
||||||
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
|
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
|
||||||
public AtomicInteger callsCt = new AtomicInteger();
|
public AtomicInteger callsCt = new AtomicInteger();
|
||||||
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
|
private long previousTimeout = -1;
|
||||||
@Override
|
@Override
|
||||||
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
|
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
|
||||||
List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
|
List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
|
||||||
|
@ -210,7 +210,13 @@ public class TestAsyncProcess {
|
||||||
// We use results in tests to check things, so override to always save them.
|
// We use results in tests to check things, so override to always save them.
|
||||||
return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
|
return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
|
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
|
||||||
|
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
|
||||||
|
CancellableRegionServerCallable callable, int curTimeout) {
|
||||||
|
previousTimeout = curTimeout;
|
||||||
|
return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
|
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
|
||||||
|
@ -1273,7 +1279,7 @@ public class TestAsyncProcess {
|
||||||
|
|
||||||
Object[] res = new Object[puts.size()];
|
Object[] res = new Object[puts.size()];
|
||||||
try {
|
try {
|
||||||
ht.processBatch(puts, res);
|
ht.batch(puts, res);
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
} catch (RetriesExhaustedException expected) {
|
} catch (RetriesExhaustedException expected) {
|
||||||
}
|
}
|
||||||
|
@ -1313,6 +1319,46 @@ public class TestAsyncProcess {
|
||||||
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
|
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadAndWriteTimeout() throws IOException {
|
||||||
|
final long readTimeout = 10 * 1000;
|
||||||
|
final long writeTimeout = 20 * 1000;
|
||||||
|
Configuration copyConf = new Configuration(conf);
|
||||||
|
copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
|
||||||
|
copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
|
||||||
|
ClusterConnection conn = createHConnection();
|
||||||
|
Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
|
||||||
|
BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
|
||||||
|
try (HTable ht = new HTable(conn, bufferParam)) {
|
||||||
|
MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
|
||||||
|
ht.multiAp = ap;
|
||||||
|
List<Get> gets = new LinkedList<>();
|
||||||
|
gets.add(new Get(DUMMY_BYTES_1));
|
||||||
|
gets.add(new Get(DUMMY_BYTES_2));
|
||||||
|
try {
|
||||||
|
ht.get(gets);
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
// No result response on this test.
|
||||||
|
}
|
||||||
|
assertEquals(readTimeout, ap.previousTimeout);
|
||||||
|
ap.previousTimeout = -1;
|
||||||
|
|
||||||
|
try {
|
||||||
|
ht.existsAll(gets);
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
// No result response on this test.
|
||||||
|
}
|
||||||
|
assertEquals(readTimeout, ap.previousTimeout);
|
||||||
|
ap.previousTimeout = -1;
|
||||||
|
|
||||||
|
List<Delete> deletes = new LinkedList<>();
|
||||||
|
deletes.add(new Delete(DUMMY_BYTES_1));
|
||||||
|
deletes.add(new Delete(DUMMY_BYTES_2));
|
||||||
|
ht.delete(deletes);
|
||||||
|
assertEquals(writeTimeout, ap.previousTimeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGlobalErrors() throws IOException {
|
public void testGlobalErrors() throws IOException {
|
||||||
ClusterConnection conn = new MyConnectionImpl(conf);
|
ClusterConnection conn = new MyConnectionImpl(conf);
|
||||||
|
|
Loading…
Reference in New Issue