HBASE-17904 Get runs into NoSuchElementException when using Read Replica, with hbase. ipc.client.specificThreadForWriting

to be true and hbase.rpc.client.impl to be org.apache.hadoop.hbase.ipc.RpcClientImpl (Huaxiang Sun)
This commit is contained in:
Michael Stack 2017-04-16 11:00:57 -07:00
parent 0cd4cec5d2
commit 7678855fac
2 changed files with 51 additions and 1 deletions

View File

@ -156,7 +156,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
}
public void remove(Call call) {
callsToWrite.remove();
callsToWrite.remove(call);
// By removing the call from the expected call list, we make the list smaller, but
// it means as well that we don't know how many calls we cancelled.
calls.remove(call.id);

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@ -515,7 +516,56 @@ public class TestReplicaWithCluster {
Assert.assertTrue(r.isStale());
} finally {
HTU.getAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}
}
@Test
public void testReplicaGetWithRpcClientImpl() throws IOException {
HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
HTU.getConfiguration().set("hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.RpcClientImpl");
// Create table then get the single region for our new table.
HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithRpcClientImpl");
hdt.setRegionReplication(NB_SERVERS);
hdt.addCoprocessor(SlowMeCopro.class.getName());
try {
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
Put p = new Put(row);
p.addColumn(f, row, row);
table.put(p);
// Flush so it can be picked by the replica refresher thread
HTU.flush(table.getName());
// Sleep for some time until data is picked up by replicas
try {
Thread.sleep(2 * REFRESH_PERIOD);
} catch (InterruptedException e1) {
LOG.error(e1);
}
try {
// Create the new connection so new config can kick in
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table t = connection.getTable(hdt.getTableName());
// But if we ask for stale we will get it
SlowMeCopro.cdl.set(new CountDownLatch(1));
Get g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
Result r = t.get(g);
Assert.assertTrue(r.isStale());
SlowMeCopro.cdl.get().countDown();
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
}
} finally {
HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
HTU.getConfiguration().unset("hbase.rpc.client.impl");
HTU.getAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}