HBASE-5901. Use union type protobufs instead of class/byte pairs for multi requests.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1332882 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-01 23:15:02 +00:00
parent c3262acafe
commit d369701569
4 changed files with 905 additions and 82 deletions

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Compare
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
@ -344,7 +345,7 @@ public final class RequestConverter {
+ mutation.getClass().getName());
}
Mutate mutate = ProtobufUtil.toMutate(mutateType, mutation);
builder.addAction(ProtobufUtil.toParameter(mutate));
builder.addAction(MultiAction.newBuilder().setMutate(mutate).build());
}
return builder.build();
}
@ -479,27 +480,28 @@ public final class RequestConverter {
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
for (Action<R> action: actions) {
Message protoAction = null;
MultiAction.Builder protoAction = MultiAction.newBuilder();
Row row = action.getAction();
if (row instanceof Get) {
protoAction = ProtobufUtil.toGet((Get)row);
protoAction.setGet(ProtobufUtil.toGet((Get)row));
} else if (row instanceof Put) {
protoAction = ProtobufUtil.toMutate(MutateType.PUT, (Put)row);
protoAction.setMutate(ProtobufUtil.toMutate(MutateType.PUT, (Put)row));
} else if (row instanceof Delete) {
protoAction = ProtobufUtil.toMutate(MutateType.DELETE, (Delete)row);
protoAction.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, (Delete)row));
} else if (row instanceof Exec) {
protoAction = ProtobufUtil.toExec((Exec)row);
protoAction.setExec(ProtobufUtil.toExec((Exec)row));
} else if (row instanceof Append) {
protoAction = ProtobufUtil.toMutate(MutateType.APPEND, (Append)row);
protoAction.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, (Append)row));
} else if (row instanceof Increment) {
protoAction = ProtobufUtil.toMutate((Increment)row);
protoAction.setMutate(ProtobufUtil.toMutate((Increment)row));
} else if (row instanceof RowMutations) {
continue; // ignore RowMutations
} else {
throw new DoNotRetryIOException(
"multi doesn't support " + row.getClass().getName());
}
builder.addAction(ProtobufUtil.toParameter(protoAction));
builder.addAction(protoAction.build());
}
return builder.build();
}

View File

@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
@ -1049,34 +1050,31 @@ public abstract class RegionServer implements
MultiResponse.Builder builder = MultiResponse.newBuilder();
if (request.hasAtomic() && request.getAtomic()) {
List<Mutate> mutates = new ArrayList<Mutate>();
for (NameBytesPair parameter: request.getActionList()) {
Object action = ProtobufUtil.toObject(parameter);
if (action instanceof Mutate) {
mutates.add((Mutate)action);
for (MultiAction actionUnion : request.getActionList()) {
if (actionUnion.hasMutate()) {
mutates.add(actionUnion.getMutate());
} else {
throw new DoNotRetryIOException(
"Unsupported atomic atction type: "
+ action.getClass().getName());
"Unsupported atomic action type: " + actionUnion);
}
}
mutateRows(region, mutates);
} else {
ActionResult.Builder resultBuilder = null;
List<Mutate> puts = new ArrayList<Mutate>();
for (NameBytesPair parameter: request.getActionList()) {
for (MultiAction actionUnion : request.getActionList()) {
requestCount.incrementAndGet();
try {
Object result = null;
Object action = ProtobufUtil.toObject(parameter);
if (action instanceof ClientProtos.Get) {
Get get = ProtobufUtil.toGet((ClientProtos.Get)action);
if (actionUnion.hasGet()) {
Get get = ProtobufUtil.toGet(actionUnion.getGet());
Integer lock = getLockFromId(get.getLockId());
Result r = region.get(get, lock);
if (r != null) {
result = ProtobufUtil.toResult(r);
}
} else if (action instanceof Mutate) {
Mutate mutate = (Mutate)action;
} else if (actionUnion.hasMutate()) {
Mutate mutate = actionUnion.getMutate();
MutateType type = mutate.getMutateType();
if (type != MutateType.PUT) {
if (!puts.isEmpty()) {
@ -1110,12 +1108,12 @@ public abstract class RegionServer implements
if (r != null) {
result = ProtobufUtil.toResult(r);
}
} else if (action instanceof ClientProtos.Exec) {
Exec call = ProtobufUtil.toExec((ClientProtos.Exec)action);
} else if (actionUnion.hasExec()) {
Exec call = ProtobufUtil.toExec(actionUnion.getExec());
result = region.exec(call).getValue();
} else {
LOG.debug("Error: invalid action, "
+ "it must be a Get, Mutate, or Exec.");
LOG.warn("Error: invalid action: " + actionUnion + ". " +
"it must be a Get, Mutate, or Exec.");
throw new DoNotRetryIOException("Invalid action, "
+ "it must be a Get, Mutate, or Exec.");
}

View File

@ -299,6 +299,16 @@ message ExecCoprocessorResponse {
required NameBytesPair value = 1;
}
/**
* An action that is part of MultiRequest.
* This is a union type - exactly one of the fields will be set.
*/
message MultiAction {
optional Mutate mutate = 1;
optional Get get = 2;
optional Exec exec = 3;
}
/**
* An individual action result. The result will in the
* same order as the action in the request. If an action
@ -321,7 +331,7 @@ message ActionResult {
*/
message MultiRequest {
required RegionSpecifier region = 1;
repeated NameBytesPair action = 2;
repeated MultiAction action = 2;
optional bool atomic = 3;
}
@ -329,6 +339,7 @@ message MultiResponse {
repeated ActionResult result = 1;
}
service ClientService {
rpc get(GetRequest)
returns(GetResponse);