HBASE-19096 Add RowMutions batch support in AsyncTable
Signed-off-by: Jerry He <jerryjch@apache.org>
This commit is contained in:
parent
8688da9e9c
commit
0c4c395538
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.IdentityHashMap;
|
import java.util.IdentityHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -58,7 +59,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
|
||||||
|
@ -232,27 +232,19 @@ class AsyncBatchRpcRetryingCaller<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
|
private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
|
||||||
List<CellScannable> cells) throws IOException {
|
List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
|
||||||
ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
|
ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
|
||||||
ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
|
ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
|
||||||
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
|
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
|
||||||
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
|
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
|
||||||
for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
|
for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
|
||||||
// TODO: remove the extra for loop as we will iterate it in mutationBuilder.
|
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||||
if (!multiRequestBuilder.hasNonceGroup()) {
|
// multiRequestBuilder will be populated with region actions.
|
||||||
for (Action action : entry.getValue().actions) {
|
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
|
||||||
if (action.hasNonce()) {
|
// action list.
|
||||||
multiRequestBuilder.setNonceGroup(conn.getNonceGenerator().getNonceGroup());
|
RequestConverter.buildNoDataRegionActions(entry.getKey(),
|
||||||
break;
|
entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder,
|
||||||
}
|
mutationBuilder, nonceGroup, rowMutationsIndexMap);
|
||||||
}
|
|
||||||
}
|
|
||||||
regionActionBuilder.clear();
|
|
||||||
regionActionBuilder.setRegion(
|
|
||||||
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey()));
|
|
||||||
regionActionBuilder = RequestConverter.buildNoDataRegionAction(entry.getKey(),
|
|
||||||
entry.getValue().actions, cells, regionActionBuilder, actionBuilder, mutationBuilder);
|
|
||||||
multiRequestBuilder.addRegionAction(regionActionBuilder.build());
|
|
||||||
}
|
}
|
||||||
return multiRequestBuilder.build();
|
return multiRequestBuilder.build();
|
||||||
}
|
}
|
||||||
|
@ -337,8 +329,12 @@ class AsyncBatchRpcRetryingCaller<T> {
|
||||||
}
|
}
|
||||||
ClientProtos.MultiRequest req;
|
ClientProtos.MultiRequest req;
|
||||||
List<CellScannable> cells = new ArrayList<>();
|
List<CellScannable> cells = new ArrayList<>();
|
||||||
|
// Map from a created RegionAction to the original index for a RowMutations within
|
||||||
|
// the original list of actions. This will be used to process the results when there
|
||||||
|
// is RowMutations in the action list.
|
||||||
|
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
|
||||||
try {
|
try {
|
||||||
req = buildReq(serverReq.actionsByRegion, cells);
|
req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
onError(serverReq.actionsByRegion, tries, e, sn);
|
onError(serverReq.actionsByRegion, tries, e, sn);
|
||||||
return;
|
return;
|
||||||
|
@ -353,8 +349,8 @@ class AsyncBatchRpcRetryingCaller<T> {
|
||||||
onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);
|
onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
onComplete(serverReq.actionsByRegion, tries, sn,
|
onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req,
|
||||||
ResponseConverter.getResults(req, resp, controller.cellScanner()));
|
rowMutationsIndexMap, resp, controller.cellScanner()));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
onError(serverReq.actionsByRegion, tries, e, sn);
|
onError(serverReq.actionsByRegion, tries, e, sn);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -431,11 +431,11 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of
|
* Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The
|
||||||
* execution of the actions is not defined. Meaning if you do a Put and a Get in the same
|
* ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the
|
||||||
* {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put
|
* same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the
|
||||||
* had put.
|
* Put had put.
|
||||||
* @param actions list of Get, Put, Delete, Increment, Append objects
|
* @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects
|
||||||
* @return A list of {@link CompletableFuture}s that represent the result for each action.
|
* @return A list of {@link CompletableFuture}s that represent the result for each action.
|
||||||
*/
|
*/
|
||||||
<T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
|
<T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
|
||||||
|
@ -443,7 +443,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
|
||||||
/**
|
/**
|
||||||
* A simple version of batch. It will fail if there are any failures and you will get the whole
|
* A simple version of batch. It will fail if there are any failures and you will get the whole
|
||||||
* result list at once if the operation is succeeded.
|
* result list at once if the operation is succeeded.
|
||||||
* @param actions list of Get, Put, Delete, Increment, Append objects
|
* @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects
|
||||||
* @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
|
* @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
|
||||||
*/
|
*/
|
||||||
default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
|
default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
|
||||||
|
|
|
@ -26,7 +26,6 @@ import java.util.Map;
|
||||||
import org.apache.hadoop.hbase.CellScannable;
|
import org.apache.hadoop.hbase.CellScannable;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -100,57 +98,29 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
|
||||||
(this.cellBlock ? new ArrayList<CellScannable>(countOfActions) : null);
|
(this.cellBlock ? new ArrayList<CellScannable>(countOfActions) : null);
|
||||||
|
|
||||||
long nonceGroup = multiAction.getNonceGroup();
|
long nonceGroup = multiAction.getNonceGroup();
|
||||||
if (nonceGroup != HConstants.NO_NONCE) {
|
|
||||||
multiRequestBuilder.setNonceGroup(nonceGroup);
|
|
||||||
}
|
|
||||||
// Index to track RegionAction within the MultiRequest
|
|
||||||
int regionActionIndex = -1;
|
|
||||||
// Map from a created RegionAction to the original index for a RowMutations within
|
// Map from a created RegionAction to the original index for a RowMutations within
|
||||||
// its original list of actions
|
// the original list of actions. This will be used to process the results when there
|
||||||
|
// is RowMutations in the action list.
|
||||||
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
|
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
|
||||||
// The multi object is a list of Actions by region. Iterate by region.
|
// The multi object is a list of Actions by region. Iterate by region.
|
||||||
for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
|
for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
|
||||||
final byte [] regionName = e.getKey();
|
final byte [] regionName = e.getKey();
|
||||||
final List<Action> actions = e.getValue();
|
final List<Action> actions = e.getValue();
|
||||||
regionActionBuilder.clear();
|
if (this.cellBlock) {
|
||||||
regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
|
// Send data in cellblocks.
|
||||||
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName));
|
// multiRequestBuilder will be populated with region actions.
|
||||||
|
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
|
||||||
int rowMutations = 0;
|
// action list.
|
||||||
for (Action action : actions) {
|
RequestConverter.buildNoDataRegionActions(regionName, actions, cells, multiRequestBuilder,
|
||||||
Row row = action.getAction();
|
regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
|
||||||
// Row Mutations are a set of Puts and/or Deletes all to be applied atomically
|
|
||||||
// on the one row. We do separate RegionAction for each RowMutations.
|
|
||||||
// We maintain a map to keep track of this RegionAction and the original Action index.
|
|
||||||
if (row instanceof RowMutations) {
|
|
||||||
RowMutations rms = (RowMutations)row;
|
|
||||||
if (this.cellBlock) {
|
|
||||||
// Build a multi request absent its Cell payload. Send data in cellblocks.
|
|
||||||
regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms, cells,
|
|
||||||
regionActionBuilder, actionBuilder, mutationBuilder);
|
|
||||||
} else {
|
|
||||||
regionActionBuilder = RequestConverter.buildRegionAction(regionName, rms);
|
|
||||||
}
|
|
||||||
regionActionBuilder.setAtomic(true);
|
|
||||||
multiRequestBuilder.addRegionAction(regionActionBuilder.build());
|
|
||||||
regionActionIndex++;
|
|
||||||
rowMutationsIndexMap.put(regionActionIndex, action.getOriginalIndex());
|
|
||||||
rowMutations++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
if (actions.size() > rowMutations) {
|
// multiRequestBuilder will be populated with region actions.
|
||||||
if (this.cellBlock) {
|
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
|
||||||
// Send data in cellblocks. The call to buildNoDataRegionAction will skip RowMutations.
|
// action list.
|
||||||
// They have already been handled above. Guess at count of cells
|
RequestConverter.buildRegionActions(regionName, actions, multiRequestBuilder,
|
||||||
regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
|
regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
|
||||||
regionActionBuilder, actionBuilder, mutationBuilder);
|
|
||||||
} else {
|
|
||||||
regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
|
|
||||||
regionActionBuilder, actionBuilder, mutationBuilder);
|
|
||||||
}
|
|
||||||
multiRequestBuilder.addRegionAction(regionActionBuilder.build());
|
|
||||||
regionActionIndex++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -79,6 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
|
||||||
|
@ -623,19 +625,32 @@ public final class RequestConverter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a protocol buffer multi request for a list of actions.
|
* Create a protocol buffer multi request for a list of actions. Propagates Actions original
|
||||||
* Propagates Actions original index.
|
* index. The passed in multiRequestBuilder will be populated with region actions.
|
||||||
*
|
* @param regionName The region name of the actions.
|
||||||
* @param regionName
|
* @param actions The actions that are grouped by the same region name.
|
||||||
* @param actions
|
* @param multiRequestBuilder The multiRequestBuilder to be populated with region actions.
|
||||||
* @return a multi request
|
* @param regionActionBuilder regionActionBuilder to be used to build region action.
|
||||||
|
* @param actionBuilder actionBuilder to be used to build action.
|
||||||
|
* @param mutationBuilder mutationBuilder to be used to build mutation.
|
||||||
|
* @param nonceGroup nonceGroup to be applied.
|
||||||
|
* @param rowMutationsIndexMap Map of created RegionAction to the original index for a
|
||||||
|
* RowMutations within the original list of actions
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static RegionAction.Builder buildRegionAction(final byte[] regionName,
|
public static void buildRegionActions(final byte[] regionName,
|
||||||
final List<Action> actions, final RegionAction.Builder regionActionBuilder,
|
final List<Action> actions, final MultiRequest.Builder multiRequestBuilder,
|
||||||
|
final RegionAction.Builder regionActionBuilder,
|
||||||
final ClientProtos.Action.Builder actionBuilder,
|
final ClientProtos.Action.Builder actionBuilder,
|
||||||
final MutationProto.Builder mutationBuilder) throws IOException {
|
final MutationProto.Builder mutationBuilder,
|
||||||
|
long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
|
||||||
|
regionActionBuilder.clear();
|
||||||
|
RegionAction.Builder builder = getRegionActionBuilderWithRegion(
|
||||||
|
regionActionBuilder, regionName);
|
||||||
ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
|
ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
|
||||||
|
boolean hasNonce = false;
|
||||||
|
List<Action> rowMutationsList = new ArrayList<>();
|
||||||
|
|
||||||
for (Action action: actions) {
|
for (Action action: actions) {
|
||||||
Row row = action.getAction();
|
Row row = action.getAction();
|
||||||
actionBuilder.clear();
|
actionBuilder.clear();
|
||||||
|
@ -643,19 +658,21 @@ public final class RequestConverter {
|
||||||
mutationBuilder.clear();
|
mutationBuilder.clear();
|
||||||
if (row instanceof Get) {
|
if (row instanceof Get) {
|
||||||
Get g = (Get)row;
|
Get g = (Get)row;
|
||||||
regionActionBuilder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
|
builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
|
||||||
} else if (row instanceof Put) {
|
} else if (row instanceof Put) {
|
||||||
regionActionBuilder.addAction(actionBuilder.
|
builder.addAction(actionBuilder.
|
||||||
setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row, mutationBuilder)));
|
setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row, mutationBuilder)));
|
||||||
} else if (row instanceof Delete) {
|
} else if (row instanceof Delete) {
|
||||||
regionActionBuilder.addAction(actionBuilder.
|
builder.addAction(actionBuilder.
|
||||||
setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row, mutationBuilder)));
|
setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row, mutationBuilder)));
|
||||||
} else if (row instanceof Append) {
|
} else if (row instanceof Append) {
|
||||||
regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
|
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
|
||||||
MutationType.APPEND, (Append)row, mutationBuilder, action.getNonce())));
|
MutationType.APPEND, (Append)row, mutationBuilder, action.getNonce())));
|
||||||
|
hasNonce = true;
|
||||||
} else if (row instanceof Increment) {
|
} else if (row instanceof Increment) {
|
||||||
regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
|
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
|
||||||
MutationType.INCREMENT, (Increment)row, mutationBuilder, action.getNonce())));
|
MutationType.INCREMENT, (Increment)row, mutationBuilder, action.getNonce())));
|
||||||
|
hasNonce = true;
|
||||||
} else if (row instanceof RegionCoprocessorServiceExec) {
|
} else if (row instanceof RegionCoprocessorServiceExec) {
|
||||||
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
|
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
|
||||||
// DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString.
|
// DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString.
|
||||||
|
@ -667,19 +684,39 @@ public final class RequestConverter {
|
||||||
} else {
|
} else {
|
||||||
cpBuilder.clear();
|
cpBuilder.clear();
|
||||||
}
|
}
|
||||||
regionActionBuilder.addAction(actionBuilder.setServiceCall(
|
builder.addAction(actionBuilder.setServiceCall(
|
||||||
cpBuilder.setRow(UnsafeByteOperations.unsafeWrap(exec.getRow()))
|
cpBuilder.setRow(UnsafeByteOperations.unsafeWrap(exec.getRow()))
|
||||||
.setServiceName(exec.getMethod().getService().getFullName())
|
.setServiceName(exec.getMethod().getService().getFullName())
|
||||||
.setMethodName(exec.getMethod().getName())
|
.setMethodName(exec.getMethod().getName())
|
||||||
.setRequest(value)));
|
.setRequest(value)));
|
||||||
} else if (row instanceof RowMutations) {
|
} else if (row instanceof RowMutations) {
|
||||||
// Skip RowMutations, which has been separately converted to RegionAction
|
rowMutationsList.add(action);
|
||||||
continue;
|
|
||||||
} else {
|
} else {
|
||||||
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
|
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return regionActionBuilder;
|
if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
|
||||||
|
multiRequestBuilder.setNonceGroup(nonceGroup);
|
||||||
|
}
|
||||||
|
multiRequestBuilder.addRegionAction(builder.build());
|
||||||
|
|
||||||
|
// Process RowMutations here. We can not process it in the big loop above because
|
||||||
|
// it will corrupt the sequence order maintained in cells.
|
||||||
|
// RowMutations is a set of Puts and/or Deletes all to be applied atomically
|
||||||
|
// on the one row. We do separate RegionAction for each RowMutations.
|
||||||
|
// We maintain a map to keep track of this RegionAction and the original Action index.
|
||||||
|
for (Action action : rowMutationsList) {
|
||||||
|
RowMutations rms = (RowMutations) action.getAction();
|
||||||
|
RegionAction.Builder rowMutationsRegionActionBuilder =
|
||||||
|
RequestConverter.buildRegionAction(regionName, rms);
|
||||||
|
rowMutationsRegionActionBuilder.setAtomic(true);
|
||||||
|
// Put it in the multiRequestBuilder
|
||||||
|
multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
|
||||||
|
// This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1)
|
||||||
|
// in the overall multiRequest.
|
||||||
|
rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
|
||||||
|
action.getOriginalIndex());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -689,23 +726,35 @@ public final class RequestConverter {
|
||||||
* coming along otherwise. Note that Get is different. It does not contain 'data' and is always
|
* coming along otherwise. Note that Get is different. It does not contain 'data' and is always
|
||||||
* carried by protobuf. We return references to the data by adding them to the passed in
|
* carried by protobuf. We return references to the data by adding them to the passed in
|
||||||
* <code>data</code> param.
|
* <code>data</code> param.
|
||||||
*
|
* <p> Propagates Actions original index.
|
||||||
* <p>Propagates Actions original index.
|
* <p> The passed in multiRequestBuilder will be populated with region actions.
|
||||||
*
|
* @param regionName The region name of the actions.
|
||||||
* @param regionName
|
* @param actions The actions that are grouped by the same region name.
|
||||||
* @param actions
|
|
||||||
* @param cells Place to stuff references to actual data.
|
* @param cells Place to stuff references to actual data.
|
||||||
* @return a multi request that does not carry any data.
|
* @param multiRequestBuilder The multiRequestBuilder to be populated with region actions.
|
||||||
|
* @param regionActionBuilder regionActionBuilder to be used to build region action.
|
||||||
|
* @param actionBuilder actionBuilder to be used to build action.
|
||||||
|
* @param mutationBuilder mutationBuilder to be used to build mutation.
|
||||||
|
* @param nonceGroup nonceGroup to be applied.
|
||||||
|
* @param rowMutationsIndexMap Map of created RegionAction to the original index for a
|
||||||
|
* RowMutations within the original list of actions
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
|
public static void buildNoDataRegionActions(final byte[] regionName,
|
||||||
final Iterable<Action> actions, final List<CellScannable> cells,
|
final Iterable<Action> actions, final List<CellScannable> cells,
|
||||||
|
final MultiRequest.Builder multiRequestBuilder,
|
||||||
final RegionAction.Builder regionActionBuilder,
|
final RegionAction.Builder regionActionBuilder,
|
||||||
final ClientProtos.Action.Builder actionBuilder,
|
final ClientProtos.Action.Builder actionBuilder,
|
||||||
final MutationProto.Builder mutationBuilder) throws IOException {
|
final MutationProto.Builder mutationBuilder,
|
||||||
|
long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
|
||||||
|
regionActionBuilder.clear();
|
||||||
RegionAction.Builder builder = getRegionActionBuilderWithRegion(
|
RegionAction.Builder builder = getRegionActionBuilderWithRegion(
|
||||||
regionActionBuilder, regionName);
|
regionActionBuilder, regionName);
|
||||||
ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
|
ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
|
||||||
|
RegionAction.Builder rowMutationsRegionActionBuilder = null;
|
||||||
|
boolean hasNonce = false;
|
||||||
|
List<Action> rowMutationsList = new ArrayList<>();
|
||||||
|
|
||||||
for (Action action: actions) {
|
for (Action action: actions) {
|
||||||
Row row = action.getAction();
|
Row row = action.getAction();
|
||||||
actionBuilder.clear();
|
actionBuilder.clear();
|
||||||
|
@ -740,11 +789,13 @@ public final class RequestConverter {
|
||||||
cells.add(a);
|
cells.add(a);
|
||||||
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
|
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
|
||||||
MutationType.APPEND, a, mutationBuilder, action.getNonce())));
|
MutationType.APPEND, a, mutationBuilder, action.getNonce())));
|
||||||
|
hasNonce = true;
|
||||||
} else if (row instanceof Increment) {
|
} else if (row instanceof Increment) {
|
||||||
Increment i = (Increment)row;
|
Increment i = (Increment)row;
|
||||||
cells.add(i);
|
cells.add(i);
|
||||||
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
|
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
|
||||||
MutationType.INCREMENT, i, mutationBuilder, action.getNonce())));
|
MutationType.INCREMENT, i, mutationBuilder, action.getNonce())));
|
||||||
|
hasNonce = true;
|
||||||
} else if (row instanceof RegionCoprocessorServiceExec) {
|
} else if (row instanceof RegionCoprocessorServiceExec) {
|
||||||
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
|
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
|
||||||
// DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString.
|
// DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString.
|
||||||
|
@ -762,13 +813,40 @@ public final class RequestConverter {
|
||||||
.setMethodName(exec.getMethod().getName())
|
.setMethodName(exec.getMethod().getName())
|
||||||
.setRequest(value)));
|
.setRequest(value)));
|
||||||
} else if (row instanceof RowMutations) {
|
} else if (row instanceof RowMutations) {
|
||||||
// Skip RowMutations, which has been separately converted to RegionAction
|
rowMutationsList.add(action);
|
||||||
continue;
|
|
||||||
} else {
|
} else {
|
||||||
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
|
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return builder;
|
if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
|
||||||
|
multiRequestBuilder.setNonceGroup(nonceGroup);
|
||||||
|
}
|
||||||
|
multiRequestBuilder.addRegionAction(builder.build());
|
||||||
|
|
||||||
|
// Process RowMutations here. We can not process it in the big loop above because
|
||||||
|
// it will corrupt the sequence order maintained in cells.
|
||||||
|
// RowMutations is a set of Puts and/or Deletes all to be applied atomically
|
||||||
|
// on the one row. We do separate RegionAction for each RowMutations.
|
||||||
|
// We maintain a map to keep track of this RegionAction and the original Action index.
|
||||||
|
for (Action action : rowMutationsList) {
|
||||||
|
RowMutations rms = (RowMutations) action.getAction();
|
||||||
|
if (rowMutationsRegionActionBuilder == null) {
|
||||||
|
rowMutationsRegionActionBuilder = ClientProtos.RegionAction.newBuilder();
|
||||||
|
} else {
|
||||||
|
rowMutationsRegionActionBuilder.clear();
|
||||||
|
}
|
||||||
|
rowMutationsRegionActionBuilder.setRegion(
|
||||||
|
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
|
||||||
|
rowMutationsRegionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms,
|
||||||
|
cells, rowMutationsRegionActionBuilder, actionBuilder, mutationBuilder);
|
||||||
|
rowMutationsRegionActionBuilder.setAtomic(true);
|
||||||
|
// Put it in the multiRequestBuilder
|
||||||
|
multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
|
||||||
|
// This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1)
|
||||||
|
// in the overall multiRequest.
|
||||||
|
rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
|
||||||
|
action.getOriginalIndex());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// End utilities for Client
|
// End utilities for Client
|
||||||
|
|
|
@ -71,6 +71,7 @@ public class TestAsyncTableBatch {
|
||||||
private static byte[] FAMILY = Bytes.toBytes("cf");
|
private static byte[] FAMILY = Bytes.toBytes("cf");
|
||||||
|
|
||||||
private static byte[] CQ = Bytes.toBytes("cq");
|
private static byte[] CQ = Bytes.toBytes("cq");
|
||||||
|
private static byte[] CQ1 = Bytes.toBytes("cq1");
|
||||||
|
|
||||||
private static int COUNT = 1000;
|
private static int COUNT = 1000;
|
||||||
|
|
||||||
|
@ -178,9 +179,9 @@ public class TestAsyncTableBatch {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMixed() throws InterruptedException, ExecutionException {
|
public void testMixed() throws InterruptedException, ExecutionException, IOException {
|
||||||
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
|
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
|
||||||
table.putAll(IntStream.range(0, 5)
|
table.putAll(IntStream.range(0, 7)
|
||||||
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i)))
|
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i)))
|
||||||
.collect(Collectors.toList())).get();
|
.collect(Collectors.toList())).get();
|
||||||
List<Row> actions = new ArrayList<>();
|
List<Row> actions = new ArrayList<>();
|
||||||
|
@ -189,8 +190,14 @@ public class TestAsyncTableBatch {
|
||||||
actions.add(new Delete(Bytes.toBytes(2)));
|
actions.add(new Delete(Bytes.toBytes(2)));
|
||||||
actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
|
actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
|
||||||
actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
|
actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
|
||||||
|
RowMutations rm = new RowMutations(Bytes.toBytes(5));
|
||||||
|
rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes((long) 100)));
|
||||||
|
rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes((long) 200)));
|
||||||
|
actions.add(rm);
|
||||||
|
actions.add(new Get(Bytes.toBytes(6)));
|
||||||
|
|
||||||
List<Object> results = table.batchAll(actions).get();
|
List<Object> results = table.batchAll(actions).get();
|
||||||
assertEquals(5, results.size());
|
assertEquals(7, results.size());
|
||||||
Result getResult = (Result) results.get(0);
|
Result getResult = (Result) results.get(0);
|
||||||
assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
|
assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
|
||||||
assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ)));
|
assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ)));
|
||||||
|
@ -202,6 +209,12 @@ public class TestAsyncTableBatch {
|
||||||
assertEquals(12, appendValue.length);
|
assertEquals(12, appendValue.length);
|
||||||
assertEquals(4, Bytes.toLong(appendValue));
|
assertEquals(4, Bytes.toLong(appendValue));
|
||||||
assertEquals(4, Bytes.toInt(appendValue, 8));
|
assertEquals(4, Bytes.toInt(appendValue, 8));
|
||||||
|
assertEquals(100,
|
||||||
|
Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ)));
|
||||||
|
assertEquals(200,
|
||||||
|
Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1)));
|
||||||
|
getResult = (Result) results.get(6);
|
||||||
|
assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {
|
public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {
|
||||||
|
|
Loading…
Reference in New Issue