HBASE-7397 HTable.coprocessorService() should allow null values in returned Map
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1424282 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c8486bb705
commit
cdf88c07dc
|
@ -30,7 +30,6 @@ import java.util.Map;
|
|||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
@ -1259,18 +1258,12 @@ public class HTable implements HTableInterface {
|
|||
public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
|
||||
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
|
||||
throws ServiceException, Throwable {
|
||||
final Map<byte[],R> results = new ConcurrentSkipListMap<byte[], R>(Bytes.BYTES_COMPARATOR);
|
||||
final Map<byte[],R> results = Collections.synchronizedMap(
|
||||
new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
|
||||
coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
|
||||
public void update(byte[] region, byte[] row, R value) {
|
||||
if (value == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Call to " + service.getName() +
|
||||
" received NULL value from Batch.Call for region " + Bytes.toStringBinary(region));
|
||||
}
|
||||
} else {
|
||||
results.put(region, value);
|
||||
}
|
||||
}
|
||||
});
|
||||
return results;
|
||||
}
|
||||
|
|
|
@ -256,6 +256,44 @@ public class TestCoprocessorEndpoint {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoprocessorServiceNullResponse() throws Throwable {
|
||||
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
|
||||
NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
|
||||
|
||||
final TestProtos.EchoRequestProto request =
|
||||
TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
|
||||
try {
|
||||
// scan: for all regions
|
||||
final RpcController controller = new ServerRpcController();
|
||||
// test that null results are supported
|
||||
Map<byte[], String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
|
||||
ROWS[0], ROWS[ROWS.length - 1],
|
||||
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
|
||||
public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
|
||||
throws IOException {
|
||||
BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
|
||||
instance.echo(controller, request, callback);
|
||||
TestProtos.EchoResponseProto response = callback.get();
|
||||
LOG.debug("Batch.Call got result " + response);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
for (Map.Entry<byte[], String> e : results.entrySet()) {
|
||||
LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
|
||||
}
|
||||
assertEquals(3, results.size());
|
||||
for (HRegionInfo info : regions.navigableKeySet()) {
|
||||
LOG.info("Region info is "+info.getRegionNameAsString());
|
||||
assertTrue(results.containsKey(info.getRegionName()));
|
||||
assertNull(results.get(info.getRegionName()));
|
||||
}
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMasterCoprocessorService() throws Throwable {
|
||||
HBaseAdmin admin = util.getHBaseAdmin();
|
||||
|
|
Loading…
Reference in New Issue