HBASE-17904 Get runs into NoSuchElementException when using Read Replica, with hbase. ipc.client.specificThreadForWriting to be true and hbase.rpc.client.impl to be org.apache.hadoop.hbase.ipc.RpcClientImpl
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
a2e4e0a1eb
commit
cdda1d0302
|
@ -152,7 +152,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void remove(Call call) {
|
public void remove(Call call) {
|
||||||
callsToWrite.remove();
|
callsToWrite.remove(call);
|
||||||
// By removing the call from the expected call list, we make the list smaller, but
|
// By removing the call from the expected call list, we make the list smaller, but
|
||||||
// it means as well that we don't know how many calls we cancelled.
|
// it means as well that we don't know how many calls we cancelled.
|
||||||
calls.remove(call.id);
|
calls.remove(call.id);
|
||||||
|
|
|
@ -521,4 +521,54 @@ public class TestReplicaWithCluster {
|
||||||
HTU.deleteTable(hdt.getTableName());
|
HTU.deleteTable(hdt.getTableName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicaGetWithRpcClientImpl() throws IOException {
|
||||||
|
HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
|
||||||
|
HTU.getConfiguration().set("hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.RpcClientImpl");
|
||||||
|
// Create table then get the single region for our new table.
|
||||||
|
HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithRpcClientImpl");
|
||||||
|
hdt.setRegionReplication(NB_SERVERS);
|
||||||
|
hdt.addCoprocessor(SlowMeCopro.class.getName());
|
||||||
|
|
||||||
|
try {
|
||||||
|
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
|
||||||
|
|
||||||
|
Put p = new Put(row);
|
||||||
|
p.addColumn(f, row, row);
|
||||||
|
table.put(p);
|
||||||
|
|
||||||
|
// Flush so it can be picked by the replica refresher thread
|
||||||
|
HTU.flush(table.getName());
|
||||||
|
|
||||||
|
// Sleep for some time until data is picked up by replicas
|
||||||
|
try {
|
||||||
|
Thread.sleep(2 * REFRESH_PERIOD);
|
||||||
|
} catch (InterruptedException e1) {
|
||||||
|
LOG.error(e1);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Create the new connection so new config can kick in
|
||||||
|
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
|
||||||
|
Table t = connection.getTable(hdt.getTableName());
|
||||||
|
|
||||||
|
// But if we ask for stale we will get it
|
||||||
|
SlowMeCopro.cdl.set(new CountDownLatch(1));
|
||||||
|
Get g = new Get(row);
|
||||||
|
g.setConsistency(Consistency.TIMELINE);
|
||||||
|
Result r = t.get(g);
|
||||||
|
Assert.assertTrue(r.isStale());
|
||||||
|
SlowMeCopro.cdl.get().countDown();
|
||||||
|
} finally {
|
||||||
|
SlowMeCopro.cdl.get().countDown();
|
||||||
|
SlowMeCopro.sleepTime.set(0);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
|
||||||
|
HTU.getConfiguration().unset("hbase.rpc.client.impl");
|
||||||
|
HTU.getHBaseAdmin().disableTable(hdt.getTableName());
|
||||||
|
HTU.deleteTable(hdt.getTableName());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue