HBASE-19096 Add RowMutions batch support in AsyncTable

Signed-off-by: Jerry He <jerryjch@apache.org>
This commit is contained in:
Jerry He 2017-11-28 18:41:23 -08:00
parent 93b91e2cc6
commit e67a3699c4
5 changed files with 161 additions and 104 deletions

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
@ -58,7 +59,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -232,27 +232,19 @@ class AsyncBatchRpcRetryingCaller<T> {
}
private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
List<CellScannable> cells) throws IOException {
List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
// TODO: remove the extra for loop as we will iterate it in mutationBuilder.
if (!multiRequestBuilder.hasNonceGroup()) {
for (Action action : entry.getValue().actions) {
if (action.hasNonce()) {
multiRequestBuilder.setNonceGroup(conn.getNonceGenerator().getNonceGroup());
break;
}
}
}
regionActionBuilder.clear();
regionActionBuilder.setRegion(
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey()));
regionActionBuilder = RequestConverter.buildNoDataRegionAction(entry.getKey(),
entry.getValue().actions, cells, regionActionBuilder, actionBuilder, mutationBuilder);
multiRequestBuilder.addRegionAction(regionActionBuilder.build());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
// multiRequestBuilder will be populated with region actions.
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
// action list.
RequestConverter.buildNoDataRegionActions(entry.getKey(),
entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder,
mutationBuilder, nonceGroup, rowMutationsIndexMap);
}
return multiRequestBuilder.build();
}
@ -337,8 +329,12 @@ class AsyncBatchRpcRetryingCaller<T> {
}
ClientProtos.MultiRequest req;
List<CellScannable> cells = new ArrayList<>();
// Map from a created RegionAction to the original index for a RowMutations within
// the original list of actions. This will be used to process the results when there
// is RowMutations in the action list.
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
try {
req = buildReq(serverReq.actionsByRegion, cells);
req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
} catch (IOException e) {
onError(serverReq.actionsByRegion, tries, e, sn);
return;
@ -353,8 +349,8 @@ class AsyncBatchRpcRetryingCaller<T> {
onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);
} else {
try {
onComplete(serverReq.actionsByRegion, tries, sn,
ResponseConverter.getResults(req, resp, controller.cellScanner()));
onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req,
rowMutationsIndexMap, resp, controller.cellScanner()));
} catch (Exception e) {
onError(serverReq.actionsByRegion, tries, e, sn);
return;

View File

@ -431,11 +431,11 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
}
/**
* Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of
* execution of the actions is not defined. Meaning if you do a Put and a Get in the same
* {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put
* had put.
* @param actions list of Get, Put, Delete, Increment, Append objects
* Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The
* ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the
* same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the
* Put had put.
* @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects
* @return A list of {@link CompletableFuture}s that represent the result for each action.
*/
<T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
@ -443,7 +443,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
/**
* A simple version of batch. It will fail if there are any failures and you will get the whole
* result list at once if the operation is succeeded.
* @param actions list of Get, Put, Delete, Increment, Append objects
* @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects
* @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
*/
default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {

View File

@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@ -100,57 +98,29 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
(this.cellBlock ? new ArrayList<CellScannable>(countOfActions) : null);
long nonceGroup = multiAction.getNonceGroup();
if (nonceGroup != HConstants.NO_NONCE) {
multiRequestBuilder.setNonceGroup(nonceGroup);
}
// Index to track RegionAction within the MultiRequest
int regionActionIndex = -1;
// Map from a created RegionAction to the original index for a RowMutations within
// its original list of actions
// the original list of actions. This will be used to process the results when there
// is RowMutations in the action list.
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
// The multi object is a list of Actions by region. Iterate by region.
for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
final byte [] regionName = e.getKey();
final List<Action> actions = e.getValue();
regionActionBuilder.clear();
regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName));
int rowMutations = 0;
for (Action action : actions) {
Row row = action.getAction();
// Row Mutations are a set of Puts and/or Deletes all to be applied atomically
// on the one row. We do separate RegionAction for each RowMutations.
// We maintain a map to keep track of this RegionAction and the original Action index.
if (row instanceof RowMutations) {
RowMutations rms = (RowMutations)row;
if (this.cellBlock) {
// Build a multi request absent its Cell payload. Send data in cellblocks.
regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms, cells,
regionActionBuilder, actionBuilder, mutationBuilder);
} else {
regionActionBuilder = RequestConverter.buildRegionAction(regionName, rms);
// Send data in cellblocks.
// multiRequestBuilder will be populated with region actions.
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
// action list.
RequestConverter.buildNoDataRegionActions(regionName, actions, cells, multiRequestBuilder,
regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
}
regionActionBuilder.setAtomic(true);
multiRequestBuilder.addRegionAction(regionActionBuilder.build());
regionActionIndex++;
rowMutationsIndexMap.put(regionActionIndex, action.getOriginalIndex());
rowMutations++;
}
}
if (actions.size() > rowMutations) {
if (this.cellBlock) {
// Send data in cellblocks. The call to buildNoDataRegionAction will skip RowMutations.
// They have already been handled above. Guess at count of cells
regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
regionActionBuilder, actionBuilder, mutationBuilder);
} else {
regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
regionActionBuilder, actionBuilder, mutationBuilder);
}
multiRequestBuilder.addRegionAction(regionActionBuilder.build());
regionActionIndex++;
else {
// multiRequestBuilder will be populated with region actions.
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
// action list.
RequestConverter.buildRegionActions(regionName, actions, multiRequestBuilder,
regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
}
}

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@ -79,6 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
@ -623,19 +625,32 @@ public final class RequestConverter {
}
/**
* Create a protocol buffer multi request for a list of actions.
* Propagates Actions original index.
*
* @param regionName
* @param actions
* @return a multi request
* Create a protocol buffer multi request for a list of actions. Propagates Actions original
* index. The passed in multiRequestBuilder will be populated with region actions.
* @param regionName The region name of the actions.
* @param actions The actions that are grouped by the same region name.
* @param multiRequestBuilder The multiRequestBuilder to be populated with region actions.
* @param regionActionBuilder regionActionBuilder to be used to build region action.
* @param actionBuilder actionBuilder to be used to build action.
* @param mutationBuilder mutationBuilder to be used to build mutation.
* @param nonceGroup nonceGroup to be applied.
* @param rowMutationsIndexMap Map of created RegionAction to the original index for a
* RowMutations within the original list of actions
* @throws IOException
*/
public static RegionAction.Builder buildRegionAction(final byte[] regionName,
final List<Action> actions, final RegionAction.Builder regionActionBuilder,
public static void buildRegionActions(final byte[] regionName,
final List<Action> actions, final MultiRequest.Builder multiRequestBuilder,
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder) throws IOException {
final MutationProto.Builder mutationBuilder,
long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
regionActionBuilder.clear();
RegionAction.Builder builder = getRegionActionBuilderWithRegion(
regionActionBuilder, regionName);
ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
boolean hasNonce = false;
List<Action> rowMutationsList = new ArrayList<>();
for (Action action: actions) {
Row row = action.getAction();
actionBuilder.clear();
@ -643,19 +658,21 @@ public final class RequestConverter {
mutationBuilder.clear();
if (row instanceof Get) {
Get g = (Get)row;
regionActionBuilder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
} else if (row instanceof Put) {
regionActionBuilder.addAction(actionBuilder.
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row, mutationBuilder)));
} else if (row instanceof Delete) {
regionActionBuilder.addAction(actionBuilder.
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row, mutationBuilder)));
} else if (row instanceof Append) {
regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
MutationType.APPEND, (Append)row, mutationBuilder, action.getNonce())));
hasNonce = true;
} else if (row instanceof Increment) {
regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
MutationType.INCREMENT, (Increment)row, mutationBuilder, action.getNonce())));
hasNonce = true;
} else if (row instanceof RegionCoprocessorServiceExec) {
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
// DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString.
@ -667,19 +684,39 @@ public final class RequestConverter {
} else {
cpBuilder.clear();
}
regionActionBuilder.addAction(actionBuilder.setServiceCall(
builder.addAction(actionBuilder.setServiceCall(
cpBuilder.setRow(UnsafeByteOperations.unsafeWrap(exec.getRow()))
.setServiceName(exec.getMethod().getService().getFullName())
.setMethodName(exec.getMethod().getName())
.setRequest(value)));
} else if (row instanceof RowMutations) {
// Skip RowMutations, which has been separately converted to RegionAction
continue;
rowMutationsList.add(action);
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
}
return regionActionBuilder;
if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
multiRequestBuilder.setNonceGroup(nonceGroup);
}
multiRequestBuilder.addRegionAction(builder.build());
// Process RowMutations here. We can not process it in the big loop above because
// it will corrupt the sequence order maintained in cells.
// RowMutations is a set of Puts and/or Deletes all to be applied atomically
// on the one row. We do separate RegionAction for each RowMutations.
// We maintain a map to keep track of this RegionAction and the original Action index.
for (Action action : rowMutationsList) {
RowMutations rms = (RowMutations) action.getAction();
RegionAction.Builder rowMutationsRegionActionBuilder =
RequestConverter.buildRegionAction(regionName, rms);
rowMutationsRegionActionBuilder.setAtomic(true);
// Put it in the multiRequestBuilder
multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
// This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1)
// in the overall multiRequest.
rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
action.getOriginalIndex());
}
}
/**
@ -689,23 +726,35 @@ public final class RequestConverter {
* coming along otherwise. Note that Get is different. It does not contain 'data' and is always
* carried by protobuf. We return references to the data by adding them to the passed in
* <code>data</code> param.
*
* <p>Propagates Actions original index.
*
* @param regionName
* @param actions
* <p> Propagates Actions original index.
* <p> The passed in multiRequestBuilder will be populated with region actions.
* @param regionName The region name of the actions.
* @param actions The actions that are grouped by the same region name.
* @param cells Place to stuff references to actual data.
* @return a multi request that does not carry any data.
* @param multiRequestBuilder The multiRequestBuilder to be populated with region actions.
* @param regionActionBuilder regionActionBuilder to be used to build region action.
* @param actionBuilder actionBuilder to be used to build action.
* @param mutationBuilder mutationBuilder to be used to build mutation.
* @param nonceGroup nonceGroup to be applied.
* @param rowMutationsIndexMap Map of created RegionAction to the original index for a
* RowMutations within the original list of actions
* @throws IOException
*/
public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
public static void buildNoDataRegionActions(final byte[] regionName,
final Iterable<Action> actions, final List<CellScannable> cells,
final MultiRequest.Builder multiRequestBuilder,
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder) throws IOException {
final MutationProto.Builder mutationBuilder,
long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
regionActionBuilder.clear();
RegionAction.Builder builder = getRegionActionBuilderWithRegion(
regionActionBuilder, regionName);
ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
RegionAction.Builder rowMutationsRegionActionBuilder = null;
boolean hasNonce = false;
List<Action> rowMutationsList = new ArrayList<>();
for (Action action: actions) {
Row row = action.getAction();
actionBuilder.clear();
@ -740,11 +789,13 @@ public final class RequestConverter {
cells.add(a);
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.APPEND, a, mutationBuilder, action.getNonce())));
hasNonce = true;
} else if (row instanceof Increment) {
Increment i = (Increment)row;
cells.add(i);
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.INCREMENT, i, mutationBuilder, action.getNonce())));
hasNonce = true;
} else if (row instanceof RegionCoprocessorServiceExec) {
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
// DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString.
@ -762,13 +813,40 @@ public final class RequestConverter {
.setMethodName(exec.getMethod().getName())
.setRequest(value)));
} else if (row instanceof RowMutations) {
// Skip RowMutations, which has been separately converted to RegionAction
continue;
rowMutationsList.add(action);
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
}
return builder;
if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
multiRequestBuilder.setNonceGroup(nonceGroup);
}
multiRequestBuilder.addRegionAction(builder.build());
// Process RowMutations here. We can not process it in the big loop above because
// it will corrupt the sequence order maintained in cells.
// RowMutations is a set of Puts and/or Deletes all to be applied atomically
// on the one row. We do separate RegionAction for each RowMutations.
// We maintain a map to keep track of this RegionAction and the original Action index.
for (Action action : rowMutationsList) {
RowMutations rms = (RowMutations) action.getAction();
if (rowMutationsRegionActionBuilder == null) {
rowMutationsRegionActionBuilder = ClientProtos.RegionAction.newBuilder();
} else {
rowMutationsRegionActionBuilder.clear();
}
rowMutationsRegionActionBuilder.setRegion(
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
rowMutationsRegionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms,
cells, rowMutationsRegionActionBuilder, actionBuilder, mutationBuilder);
rowMutationsRegionActionBuilder.setAtomic(true);
// Put it in the multiRequestBuilder
multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
// This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1)
// in the overall multiRequest.
rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
action.getOriginalIndex());
}
}
// End utilities for Client

View File

@ -71,6 +71,7 @@ public class TestAsyncTableBatch {
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] CQ = Bytes.toBytes("cq");
private static byte[] CQ1 = Bytes.toBytes("cq1");
private static int COUNT = 1000;
@ -178,9 +179,9 @@ public class TestAsyncTableBatch {
}
@Test
public void testMixed() throws InterruptedException, ExecutionException {
public void testMixed() throws InterruptedException, ExecutionException, IOException {
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
table.putAll(IntStream.range(0, 5)
table.putAll(IntStream.range(0, 7)
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i)))
.collect(Collectors.toList())).get();
List<Row> actions = new ArrayList<>();
@ -189,8 +190,14 @@ public class TestAsyncTableBatch {
actions.add(new Delete(Bytes.toBytes(2)));
actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
RowMutations rm = new RowMutations(Bytes.toBytes(5));
rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes((long) 100)));
rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes((long) 200)));
actions.add(rm);
actions.add(new Get(Bytes.toBytes(6)));
List<Object> results = table.batchAll(actions).get();
assertEquals(5, results.size());
assertEquals(7, results.size());
Result getResult = (Result) results.get(0);
assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ)));
@ -202,6 +209,12 @@ public class TestAsyncTableBatch {
assertEquals(12, appendValue.length);
assertEquals(4, Bytes.toLong(appendValue));
assertEquals(4, Bytes.toInt(appendValue, 8));
assertEquals(100,
Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ)));
assertEquals(200,
Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1)));
getResult = (Result) results.get(6);
assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
}
public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {