HBASE-9612 Ability to batch edits destined to different regions -- REVERT

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1527789 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-09-30 21:27:33 +00:00
parent dc959eadc2
commit 5987ee67e1
20 changed files with 2461 additions and 3375 deletions

View File

@ -171,7 +171,6 @@ public class ClientSmallScanner extends ClientScanner {
ScanResponse response = null; ScanResponse response = null;
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
try { try {
controller.setPriority(getTableName());
response = getStub().scan(controller, request); response = getStub().scan(controller, request);
return ResponseConverter.getResults(controller.cellScanner(), return ResponseConverter.getResults(controller.cellScanner(),
response); response);

View File

@ -643,7 +643,6 @@ public class HBaseAdmin implements Abortable, Closeable {
.getServerName()); .getServerName());
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
try { try {
controller.setPriority(tableName);
ScanResponse response = server.scan(controller, request); ScanResponse response = server.scan(controller, request);
values = ResponseConverter.getResults(controller.cellScanner(), response); values = ResponseConverter.getResults(controller.cellScanner(), response);
} catch (ServiceException se) { } catch (ServiceException se) {

View File

@ -961,8 +961,6 @@ public class HTable implements HTableInterface {
try { try {
MultiRequest request = RequestConverter.buildMultiRequest( MultiRequest request = RequestConverter.buildMultiRequest(
getLocation().getRegionInfo().getRegionName(), rm); getLocation().getRegionInfo().getRegionName(), rm);
PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController();
pcrc.setPriority(tableName);
getStub().multi(null, request); getStub().multi(null, request);
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
@ -989,7 +987,6 @@ public class HTable implements HTableInterface {
MutateRequest request = RequestConverter.buildMutateRequest( MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), append); getLocation().getRegionInfo().getRegionName(), append);
PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
rpcController.setPriority(getTableName());
MutateResponse response = getStub().mutate(rpcController, request); MutateResponse response = getStub().mutate(rpcController, request);
if (!response.hasResult()) return null; if (!response.hasResult()) return null;
return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
@ -1016,10 +1013,9 @@ public class HTable implements HTableInterface {
try { try {
MutateRequest request = RequestConverter.buildMutateRequest( MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), increment); getLocation().getRegionInfo().getRegionName(), increment);
PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController();
rpcController.setPriority(getTableName()); MutateResponse response = getStub().mutate(rpcContoller, request);
MutateResponse response = getStub().mutate(rpcController, request); return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner());
return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
} catch (ServiceException se) { } catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
} }
@ -1078,7 +1074,6 @@ public class HTable implements HTableInterface {
getLocation().getRegionInfo().getRegionName(), row, family, getLocation().getRegionInfo().getRegionName(), row, family,
qualifier, amount, durability); qualifier, amount, durability);
PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController(); PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
rpcController.setPriority(getTableName());
MutateResponse response = getStub().mutate(rpcController, request); MutateResponse response = getStub().mutate(rpcController, request);
Result result = Result result =
ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner()); ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());

View File

@ -24,8 +24,8 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -84,11 +84,8 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
multiRequest = RequestConverter.buildMultiRequest(regionName, rms); multiRequest = RequestConverter.buildMultiRequest(regionName, rms);
} }
// Carry the cells if any over the proxy/pb Service interface using the payload // Carry the cells if any over the proxy/pb Service interface using the payload
// carrying rpc controller. Also set priority on this controller so available down // carrying rpc controller.
// in RpcClient when we go to craft the request header. getStub().multi(new PayloadCarryingRpcController(cells), multiRequest);
PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(cells);
pcrc.setPriority(getTableName());
getStub().multi(pcrc, multiRequest);
// This multi call does not return results. // This multi call does not return results.
response.add(regionName, action.getOriginalIndex(), Result.EMPTY_RESULT); response.add(regionName, action.getOriginalIndex(), Result.EMPTY_RESULT);
} catch (ServiceException se) { } catch (ServiceException se) {
@ -116,7 +113,6 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
// Controller optionally carries cell data over the proxy/service boundary and also // Controller optionally carries cell data over the proxy/service boundary and also
// optionally ferries cell response data back out again. // optionally ferries cell response data back out again.
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
controller.setPriority(getTableName());
ClientProtos.MultiResponse responseProto = getStub().multi(controller, multiRequest); ClientProtos.MultiResponse responseProto = getStub().multi(controller, multiRequest);
results = ResponseConverter.getResults(responseProto, controller.cellScanner()); results = ResponseConverter.getResults(responseProto, controller.cellScanner());
} catch (ServiceException se) { } catch (ServiceException se) {

View File

@ -163,7 +163,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
ScanResponse response = null; ScanResponse response = null;
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
try { try {
controller.setPriority(getTableName());
response = getStub().scan(controller, request); response = getStub().scan(controller, request);
// Client and RS maintain a nextCallSeq number during the scan. Every next() call // Client and RS maintain a nextCallSeq number during the scan. Every next() call
// from client to server will increment this number in both sides. Client passes this // from client to server will increment this number in both sides. Client passes this

View File

@ -23,8 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
@ -37,15 +36,6 @@ import com.google.protobuf.RpcController;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class PayloadCarryingRpcController implements RpcController, CellScannable { public class PayloadCarryingRpcController implements RpcController, CellScannable {
/**
* Priority to set on this request. Set it here in controller so available composing the
* request. This is the ordained way of setting priorities going forward. We will be
* undoing the old annotation-based mechanism.
*/
// Currently only multi call makes use of this. Eventually this should be only way to set
// priority.
private int priority = 0;
// TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException // TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException
/** /**
@ -113,26 +103,4 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl
public void startCancel() { public void startCancel() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
/**
* @param priority Priority for this request; should fall roughly in the range
* {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
*/
public void setPriority(int priority) {
this.priority = priority;
}
/**
* @param tn Set priority based off the table we are going against.
*/
public void setPriority(final TableName tn) {
this.priority = tn != null && tn.isSystemTable()? HConstants.HIGH_QOS: HConstants.NORMAL_QOS;
}
/**
* @return The priority of this request
*/
public int getPriority() {
return priority;
}
} }

View File

@ -1002,10 +1002,9 @@ public class RpcClient {
* Note: this is not called from the Connection thread, but by other * Note: this is not called from the Connection thread, but by other
* threads. * threads.
* @param call * @param call
* @param priority
* @see #readResponse() * @see #readResponse()
*/ */
protected void writeRequest(Call call, final int priority) { protected void writeRequest(Call call) {
if (shouldCloseConnection.get()) return; if (shouldCloseConnection.get()) return;
try { try {
RequestHeader.Builder builder = RequestHeader.newBuilder(); RequestHeader.Builder builder = RequestHeader.newBuilder();
@ -1023,8 +1022,6 @@ public class RpcClient {
cellBlockBuilder.setLength(cellBlock.limit()); cellBlockBuilder.setLength(cellBlock.limit());
builder.setCellBlockMeta(cellBlockBuilder.build()); builder.setCellBlockMeta(cellBlockBuilder.build());
} }
// Only pass priority if there one. Let zero be same as no priority.
if (priority != 0) builder.setPriority(priority);
//noinspection SynchronizeOnNonFinalField //noinspection SynchronizeOnNonFinalField
RequestHeader header = builder.build(); RequestHeader header = builder.build();
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
@ -1383,12 +1380,6 @@ public class RpcClient {
} }
} }
Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout)
throws InterruptedException, IOException {
return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS);
}
/** Make a call, passing <code>param</code>, to the IPC server running at /** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol, * <code>address</code> which is servicing the <code>protocol</code> protocol,
* with the <code>ticket</code> credentials, returning the value. * with the <code>ticket</code> credentials, returning the value.
@ -1409,12 +1400,12 @@ public class RpcClient {
*/ */
Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells, Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
Message returnType, User ticket, InetSocketAddress addr, Message returnType, User ticket, InetSocketAddress addr,
int rpcTimeout, int priority) int rpcTimeout)
throws InterruptedException, IOException { throws InterruptedException, IOException {
Call call = new Call(md, param, cells, returnType); Call call = new Call(md, param, cells, returnType);
Connection connection = Connection connection =
getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor); getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
connection.writeRequest(call, priority); // send the parameter connection.writeRequest(call); // send the parameter
boolean interrupted = false; boolean interrupted = false;
//noinspection SynchronizationOnLocalVariableOrMethodParameter //noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (call) { synchronized (call) {
@ -1641,8 +1632,7 @@ public class RpcClient {
} }
Pair<Message, CellScanner> val = null; Pair<Message, CellScanner> val = null;
try { try {
val = call(md, param, cells, returnType, ticket, isa, rpcTimeout, val = call(md, param, cells, returnType, ticket, isa, rpcTimeout);
pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
if (pcrc != null) { if (pcrc != null) {
// Shove the results into controller so can be carried across the proxy/pb service void. // Shove the results into controller so can be carried across the proxy/pb service void.
if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond()); if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond());

View File

@ -2257,11 +2257,11 @@ public final class ProtobufUtil {
", row=" + getStringForByteString(r.getGet().getRow()); ", row=" + getStringForByteString(r.getGet().getRow());
} else if (m instanceof ClientProtos.MultiRequest) { } else if (m instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest r = (ClientProtos.MultiRequest) m; ClientProtos.MultiRequest r = (ClientProtos.MultiRequest) m;
ClientProtos.RegionMutation rm = r.getRegionMutationList().get(0); ClientProtos.MultiAction action = r.getActionList().get(0);
return "region= " + getStringForByteString(rm.getRegion().getValue()) + return "region= " + getStringForByteString(r.getRegion().getValue()) +
", for " + r.getRegionMutationCount() + ", for " + r.getActionCount() +
" actions and 1st row key=" + getStringForByteString(rm.getMutationCount() > 0? " actions and 1st row key=" + getStringForByteString(action.hasMutation() ?
rm.getMutation(0).getRow(): ByteString.EMPTY); action.getMutation().getRow() : action.getGet().getRow());
} else if (m instanceof ClientProtos.MutateRequest) { } else if (m instanceof ClientProtos.MutateRequest) {
ClientProtos.MutateRequest r = (ClientProtos.MutateRequest) m; ClientProtos.MutateRequest r = (ClientProtos.MutateRequest) m;
return "region= " + getStringForByteString(r.getRegion().getValue()) + return "region= " + getStringForByteString(r.getRegion().getValue()) +

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
@ -70,7 +71,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionMutation;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
@ -365,11 +365,10 @@ public final class RequestConverter {
* @return a multi request * @return a multi request
* @throws IOException * @throws IOException
*/ */
public static MultiRequest buildMultiRequest(final byte [] regionName, public static MultiRequest buildMultiRequest(final byte[] regionName,
final RowMutations rowMutations) final RowMutations rowMutations)
throws IOException { throws IOException {
RegionMutation.Builder builder = MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, true);
getRegionMutationBuilderWithRegionAndAtomicSet(regionName, true);
for (Mutation mutation: rowMutations.getMutations()) { for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null; MutationType mutateType = null;
if (mutation instanceof Put) { if (mutation instanceof Put) {
@ -381,9 +380,9 @@ public final class RequestConverter {
mutation.getClass().getName()); mutation.getClass().getName());
} }
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation); MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation);
builder.addMutation(mp); builder.addAction(MultiAction.newBuilder().setMutation(mp).build());
} }
return createMultiRequest(builder.build()); return builder.build();
} }
/** /**
@ -399,8 +398,7 @@ public final class RequestConverter {
public static MultiRequest buildNoDataMultiRequest(final byte[] regionName, public static MultiRequest buildNoDataMultiRequest(final byte[] regionName,
final RowMutations rowMutations, final List<CellScannable> cells) final RowMutations rowMutations, final List<CellScannable> cells)
throws IOException { throws IOException {
RegionMutation.Builder builder = MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, true);
getRegionMutationBuilderWithRegionAndAtomicSet(regionName, true);
for (Mutation mutation: rowMutations.getMutations()) { for (Mutation mutation: rowMutations.getMutations()) {
MutationType type = null; MutationType type = null;
if (mutation instanceof Put) { if (mutation instanceof Put) {
@ -413,18 +411,14 @@ public final class RequestConverter {
} }
MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation); MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation);
cells.add(mutation); cells.add(mutation);
builder.addMutation(mp); builder.addAction(MultiAction.newBuilder().setMutation(mp).build());
} }
return createMultiRequest(builder.build()); return builder.build();
} }
private static MultiRequest createMultiRequest(final RegionMutation rm) { private static MultiRequest.Builder getMultiRequestBuilderWithRegionAndAtomicSet(final byte [] regionName,
return MultiRequest.newBuilder().addRegionMutation(rm).build(); final boolean atomic) {
} MultiRequest.Builder builder = MultiRequest.newBuilder();
private static RegionMutation.Builder getRegionMutationBuilderWithRegionAndAtomicSet(
final byte [] regionName, final boolean atomic) {
RegionMutation.Builder builder = RegionMutation.newBuilder();
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region); builder.setRegion(region);
return builder.setAtomic(atomic); return builder.setAtomic(atomic);
@ -526,27 +520,29 @@ public final class RequestConverter {
public static <R> MultiRequest buildMultiRequest(final byte[] regionName, public static <R> MultiRequest buildMultiRequest(final byte[] regionName,
final List<Action<R>> actions) final List<Action<R>> actions)
throws IOException { throws IOException {
RegionMutation.Builder builder = MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, false);
getRegionMutationBuilderWithRegionAndAtomicSet(regionName, false);
for (Action<R> action: actions) { for (Action<R> action: actions) {
MultiAction.Builder protoAction = MultiAction.newBuilder();
Row row = action.getAction(); Row row = action.getAction();
if (row instanceof Get) { if (row instanceof Get) {
throw new UnsupportedOperationException("Removed"); protoAction.setGet(ProtobufUtil.toGet((Get)row));
} else if (row instanceof Put) { } else if (row instanceof Put) {
builder.addMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row)); protoAction.setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row));
} else if (row instanceof Delete) { } else if (row instanceof Delete) {
builder.addMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row)); protoAction.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row));
} else if (row instanceof Append) { } else if (row instanceof Append) {
builder.addMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row)); protoAction.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row));
} else if (row instanceof Increment) { } else if (row instanceof Increment) {
builder.addMutation(ProtobufUtil.toMutation((Increment)row)); protoAction.setMutation(ProtobufUtil.toMutation((Increment)row));
} else if (row instanceof RowMutations) { } else if (row instanceof RowMutations) {
continue; // ignore RowMutations continue; // ignore RowMutations
} else { } else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); throw new DoNotRetryIOException(
"multi doesn't support " + row.getClass().getName());
} }
builder.addAction(protoAction.build());
} }
return createMultiRequest(builder.build()); return builder.build();
} }
/** /**
@ -568,16 +564,17 @@ public final class RequestConverter {
public static <R> MultiRequest buildNoDataMultiRequest(final byte[] regionName, public static <R> MultiRequest buildNoDataMultiRequest(final byte[] regionName,
final List<Action<R>> actions, final List<CellScannable> cells) final List<Action<R>> actions, final List<CellScannable> cells)
throws IOException { throws IOException {
RegionMutation.Builder builder = MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, false);
getRegionMutationBuilderWithRegionAndAtomicSet(regionName, false);
for (Action<R> action: actions) { for (Action<R> action: actions) {
MultiAction.Builder protoAction = MultiAction.newBuilder();
Row row = action.getAction(); Row row = action.getAction();
if (row instanceof Get) { if (row instanceof Get) {
throw new UnsupportedOperationException("Removed"); // Gets are carried by protobufs.
protoAction.setGet(ProtobufUtil.toGet((Get)row));
} else if (row instanceof Put) { } else if (row instanceof Put) {
Put p = (Put)row; Put p = (Put)row;
cells.add(p); cells.add(p);
builder.addMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p)); protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p));
} else if (row instanceof Delete) { } else if (row instanceof Delete) {
Delete d = (Delete)row; Delete d = (Delete)row;
int size = d.size(); int size = d.size();
@ -588,25 +585,26 @@ public final class RequestConverter {
// metadata only in the pb and then send the kv along the side in cells. // metadata only in the pb and then send the kv along the side in cells.
if (size > 0) { if (size > 0) {
cells.add(d); cells.add(d);
builder.addMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d)); protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d));
} else { } else {
builder.addMutation(ProtobufUtil.toMutation(MutationType.DELETE, d)); protoAction.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d));
} }
} else if (row instanceof Append) { } else if (row instanceof Append) {
Append a = (Append)row; Append a = (Append)row;
cells.add(a); cells.add(a);
builder.addMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a)); protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a));
} else if (row instanceof Increment) { } else if (row instanceof Increment) {
Increment i = (Increment)row; Increment i = (Increment)row;
cells.add(i); cells.add(i);
builder.addMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i)); protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i));
} else if (row instanceof RowMutations) { } else if (row instanceof RowMutations) {
continue; // ignore RowMutations continue; // ignore RowMutations
} else { } else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
} }
builder.addAction(protoAction.build());
} }
return createMultiRequest(builder.build()); return builder.build();
} }
// End utilities for Client // End utilities for Client

View File

@ -38,10 +38,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRespo
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionMutationResult; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
@ -74,20 +73,20 @@ public final class ResponseConverter {
* @return the results that were in the MultiResponse (a Result or an Exception). * @return the results that were in the MultiResponse (a Result or an Exception).
* @throws IOException * @throws IOException
*/ */
public static List<Object> getResults(final MultiResponse proto, public static List<Object> getResults(final ClientProtos.MultiResponse proto,
final CellScanner cells) final CellScanner cells)
throws IOException { throws IOException {
List<Object> results = new ArrayList<Object>(); List<Object> results = new ArrayList<Object>();
for (RegionMutationResult result: proto.getRegionMutationResultList()) { List<ActionResult> resultList = proto.getResultList();
for (ResultOrException resultOrException: result.getResultOrExceptionList()) { for (int i = 0, n = resultList.size(); i < n; i++) {
if (resultOrException.hasException()) { ActionResult result = resultList.get(i);
results.add(ProtobufUtil.toException(resultOrException.getException())); if (result.hasException()) {
} else if (resultOrException.hasResult()) { results.add(ProtobufUtil.toException(result.getException()));
results.add(ProtobufUtil.toResult(resultOrException.getResult(), cells)); } else if (result.hasValue()) {
} else { ClientProtos.Result value = result.getValue();
// Just a placeholder results.add(ProtobufUtil.toResult(value, cells));
results.add(new Result()); } else {
} results.add(new Result());
} }
} }
return results; return results;
@ -99,22 +98,14 @@ public final class ResponseConverter {
* @param t * @param t
* @return an action result * @return an action result
*/ */
public static ResultOrException buildActionResult(final Throwable t) { public static ActionResult buildActionResult(final Throwable t) {
ResultOrException.Builder builder = ResultOrException.newBuilder(); ActionResult.Builder builder = ActionResult.newBuilder();
builder.setException(buildException(t));
return builder.build();
}
/**
* @param t
* @return NameValuePair of the exception name to stringified version os exception.
*/
public static NameBytesPair buildException(final Throwable t) {
NameBytesPair.Builder parameterBuilder = NameBytesPair.newBuilder(); NameBytesPair.Builder parameterBuilder = NameBytesPair.newBuilder();
parameterBuilder.setName(t.getClass().getName()); parameterBuilder.setName(t.getClass().getName());
parameterBuilder.setValue( parameterBuilder.setValue(
ByteString.copyFromUtf8(StringUtils.stringifyException(t))); ByteString.copyFromUtf8(StringUtils.stringifyException(t)));
return parameterBuilder.build(); builder.setException(parameterBuilder.build());
return builder.build();
} }
/** /**

View File

@ -25,6 +25,7 @@ terminal and hit return -- the protoc compiler runs fast):
do do
protoc -I$PROTO_DIR --java_out=$JAVA_DIR $PROTO_FILE protoc -I$PROTO_DIR --java_out=$JAVA_DIR $PROTO_FILE
done done
ll $JAVA_DIR/org/apache/hadoop/hbase/protobuf/generated
After you've done the above, check it in and then check it in (or post a patch After you've done the above, check it in and then check it in (or post a patch
on a JIRA with your definition file changes and the generated files). on a JIRA with your definition file changes and the generated files).

View File

@ -3662,26 +3662,6 @@ public final class RPCProtos {
* </pre> * </pre>
*/ */
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMetaOrBuilder getCellBlockMetaOrBuilder(); org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMetaOrBuilder getCellBlockMetaOrBuilder();
// optional uint32 priority = 6;
/**
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
boolean hasPriority();
/**
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
int getPriority();
} }
/** /**
* Protobuf type {@code RequestHeader} * Protobuf type {@code RequestHeader}
@ -3779,11 +3759,6 @@ public final class RPCProtos {
bitField0_ |= 0x00000010; bitField0_ |= 0x00000010;
break; break;
} }
case 48: {
bitField0_ |= 0x00000020;
priority_ = input.readUInt32();
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -3971,39 +3946,12 @@ public final class RPCProtos {
return cellBlockMeta_; return cellBlockMeta_;
} }
// optional uint32 priority = 6;
public static final int PRIORITY_FIELD_NUMBER = 6;
private int priority_;
/**
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
public boolean hasPriority() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
public int getPriority() {
return priority_;
}
private void initFields() { private void initFields() {
callId_ = 0; callId_ = 0;
traceInfo_ = org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo.getDefaultInstance(); traceInfo_ = org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo.getDefaultInstance();
methodName_ = ""; methodName_ = "";
requestParam_ = false; requestParam_ = false;
cellBlockMeta_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance(); cellBlockMeta_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance();
priority_ = 0;
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -4032,9 +3980,6 @@ public final class RPCProtos {
if (((bitField0_ & 0x00000010) == 0x00000010)) { if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeMessage(5, cellBlockMeta_); output.writeMessage(5, cellBlockMeta_);
} }
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeUInt32(6, priority_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -4064,10 +4009,6 @@ public final class RPCProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, cellBlockMeta_); .computeMessageSize(5, cellBlockMeta_);
} }
if (((bitField0_ & 0x00000020) == 0x00000020)) {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(6, priority_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -4116,11 +4057,6 @@ public final class RPCProtos {
result = result && getCellBlockMeta() result = result && getCellBlockMeta()
.equals(other.getCellBlockMeta()); .equals(other.getCellBlockMeta());
} }
result = result && (hasPriority() == other.hasPriority());
if (hasPriority()) {
result = result && (getPriority()
== other.getPriority());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -4154,10 +4090,6 @@ public final class RPCProtos {
hash = (37 * hash) + CELL_BLOCK_META_FIELD_NUMBER; hash = (37 * hash) + CELL_BLOCK_META_FIELD_NUMBER;
hash = (53 * hash) + getCellBlockMeta().hashCode(); hash = (53 * hash) + getCellBlockMeta().hashCode();
} }
if (hasPriority()) {
hash = (37 * hash) + PRIORITY_FIELD_NUMBER;
hash = (53 * hash) + getPriority();
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -4291,8 +4223,6 @@ public final class RPCProtos {
cellBlockMetaBuilder_.clear(); cellBlockMetaBuilder_.clear();
} }
bitField0_ = (bitField0_ & ~0x00000010); bitField0_ = (bitField0_ & ~0x00000010);
priority_ = 0;
bitField0_ = (bitField0_ & ~0x00000020);
return this; return this;
} }
@ -4349,10 +4279,6 @@ public final class RPCProtos {
} else { } else {
result.cellBlockMeta_ = cellBlockMetaBuilder_.build(); result.cellBlockMeta_ = cellBlockMetaBuilder_.build();
} }
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
to_bitField0_ |= 0x00000020;
}
result.priority_ = priority_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -4386,9 +4312,6 @@ public final class RPCProtos {
if (other.hasCellBlockMeta()) { if (other.hasCellBlockMeta()) {
mergeCellBlockMeta(other.getCellBlockMeta()); mergeCellBlockMeta(other.getCellBlockMeta());
} }
if (other.hasPriority()) {
setPriority(other.getPriority());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -4858,59 +4781,6 @@ public final class RPCProtos {
return cellBlockMetaBuilder_; return cellBlockMetaBuilder_;
} }
// optional uint32 priority = 6;
private int priority_ ;
/**
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
public boolean hasPriority() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
public int getPriority() {
return priority_;
}
/**
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
public Builder setPriority(int value) {
bitField0_ |= 0x00000020;
priority_ = value;
onChanged();
return this;
}
/**
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
public Builder clearPriority() {
bitField0_ = (bitField0_ & ~0x00000020);
priority_ = 0;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:RequestHeader) // @@protoc_insertion_point(builder_scope:RequestHeader)
} }
@ -5927,15 +5797,15 @@ public final class RPCProtos {
"\001(\r\"|\n\021ExceptionResponse\022\034\n\024exception_cl" + "\001(\r\"|\n\021ExceptionResponse\022\034\n\024exception_cl" +
"ass_name\030\001 \001(\t\022\023\n\013stack_trace\030\002 \001(\t\022\020\n\010h" + "ass_name\030\001 \001(\t\022\023\n\013stack_trace\030\002 \001(\t\022\020\n\010h" +
"ostname\030\003 \001(\t\022\014\n\004port\030\004 \001(\005\022\024\n\014do_not_re", "ostname\030\003 \001(\t\022\014\n\004port\030\004 \001(\005\022\024\n\014do_not_re",
"try\030\005 \001(\010\"\246\001\n\rRequestHeader\022\017\n\007call_id\030\001" + "try\030\005 \001(\010\"\224\001\n\rRequestHeader\022\017\n\007call_id\030\001" +
" \001(\r\022\035\n\ntrace_info\030\002 \001(\0132\t.RPCTInfo\022\023\n\013m" + " \001(\r\022\035\n\ntrace_info\030\002 \001(\0132\t.RPCTInfo\022\023\n\013m" +
"ethod_name\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\022" + "ethod_name\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\022" +
"\'\n\017cell_block_meta\030\005 \001(\0132\016.CellBlockMeta" + "\'\n\017cell_block_meta\030\005 \001(\0132\016.CellBlockMeta" +
"\022\020\n\010priority\030\006 \001(\r\"q\n\016ResponseHeader\022\017\n\007" + "\"q\n\016ResponseHeader\022\017\n\007call_id\030\001 \001(\r\022%\n\te" +
"call_id\030\001 \001(\r\022%\n\texception\030\002 \001(\0132\022.Excep" + "xception\030\002 \001(\0132\022.ExceptionResponse\022\'\n\017ce" +
"tionResponse\022\'\n\017cell_block_meta\030\003 \001(\0132\016." + "ll_block_meta\030\003 \001(\0132\016.CellBlockMetaB<\n*o" +
"CellBlockMetaB<\n*org.apache.hadoop.hbase" + "rg.apache.hadoop.hbase.protobuf.generate" +
".protobuf.generatedB\tRPCProtosH\001\240\001\001" "dB\tRPCProtosH\001\240\001\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -5971,7 +5841,7 @@ public final class RPCProtos {
internal_static_RequestHeader_fieldAccessorTable = new internal_static_RequestHeader_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RequestHeader_descriptor, internal_static_RequestHeader_descriptor,
new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", }); new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", });
internal_static_ResponseHeader_descriptor = internal_static_ResponseHeader_descriptor =
getDescriptor().getMessageTypes().get(5); getDescriptor().getMessageTypes().get(5);
internal_static_ResponseHeader_fieldAccessorTable = new internal_static_ResponseHeader_fieldAccessorTable = new

View File

@ -319,41 +319,42 @@ message CoprocessorServiceResponse {
} }
/** /**
* Mutations to run against a Region. * An action that is part of MultiRequest.
* This is a union type - exactly one of the fields will be set.
*/ */
message RegionMutation { message MultiAction {
required RegionSpecifier region = 1; optional MutationProto mutation = 1;
// When set, run mutations as atomic unit. optional Get get = 2;
optional bool atomic = 2;
repeated MutationProto mutation = 3;
} }
/** /**
* Either a Result or an Exception NameBytesPair (keyed by * An individual action result. The result will in the
* exception name whose value is the exception stringified) * same order as the action in the request. If an action
* or maybe empty if no result and no exception. * returns a value, it is set in value field. If it doesn't
* return anything, the result will be empty. If an action
* fails to execute due to any exception, the exception
* is returned as a stringified parameter.
*/ */
message ResultOrException { message ActionResult {
optional Result result = 1; optional Result value = 1;
optional NameBytesPair exception = 2; optional NameBytesPair exception = 2;
} }
/** /**
* The result of a RegionMutation. * You can execute a list of actions on a given region in order.
*/ *
message RegionMutationResult { * If it is a list of mutate actions, atomic can be set
repeated ResultOrException resultOrException = 1; * to make sure they can be processed atomically, just like
} * RowMutations.
/**
* Execute a list of actions on a given region in order.
*/ */
message MultiRequest { message MultiRequest {
repeated RegionMutation regionMutation = 1; required RegionSpecifier region = 1;
repeated MultiAction action = 2;
optional bool atomic = 3;
} }
message MultiResponse { message MultiResponse {
repeated RegionMutationResult regionMutationResult = 1; repeated ActionResult result = 1;
} }

View File

@ -119,9 +119,7 @@ message RequestHeader {
optional bool request_param = 4; optional bool request_param = 4;
// If present, then an encoded data block follows. // If present, then an encoded data block follows.
optional CellBlockMeta cell_block_meta = 5; optional CellBlockMeta cell_block_meta = 5;
// 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL. // TODO: Have client specify priority
// See HConstants.
optional uint32 priority = 6;
} }
message ResponseHeader { message ResponseHeader {

View File

@ -83,7 +83,8 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
CompactRegionRequest.class, CompactRegionRequest.class,
GetRequest.class, GetRequest.class,
MutateRequest.class, MutateRequest.class,
ScanRequest.class ScanRequest.class,
MultiRequest.class
}; };
// Some caches for helping performance // Some caches for helping performance
@ -100,7 +101,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
if (p != null) { if (p != null) {
// Since we protobuf'd, and then subsequently, when we went with pb style, method names // Since we protobuf'd, and then subsequently, when we went with pb style, method names
// are capitalized. This meant that this brittle compare of method names gotten by // are capitalized. This meant that this brittle compare of method names gotten by
// reflection no longer matched the method names coming in over pb. TODO: Get rid of this // reflection no longer matched the method names comeing in over pb. TODO: Get rid of this
// check. For now, workaround is to capitalize the names we got from reflection so they // check. For now, workaround is to capitalize the names we got from reflection so they
// have chance of matching the pb ones. // have chance of matching the pb ones.
String capitalizedMethodName = capitalize(m.getName()); String capitalizedMethodName = capitalize(m.getName());
@ -108,6 +109,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
} }
} }
this.annotatedQos = qosMap; this.annotatedQos = qosMap;
if (methodMap.get("getRegion") == null) { if (methodMap.get("getRegion") == null) {
methodMap.put("hasRegion", new HashMap<Class<? extends Message>, Method>()); methodMap.put("hasRegion", new HashMap<Class<? extends Message>, Method>());
methodMap.put("getRegion", new HashMap<Class<? extends Message>, Method>()); methodMap.put("getRegion", new HashMap<Class<? extends Message>, Method>());
@ -146,14 +148,10 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
if (priorityByAnnotation != null) { if (priorityByAnnotation != null) {
return priorityByAnnotation; return priorityByAnnotation;
} }
if (param == null) { if (param == null) {
return HConstants.NORMAL_QOS; return HConstants.NORMAL_QOS;
} }
if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) {
// The multi call has its priority set in the header. All calls should work this way but
// only this one has been converted so far. No priority == NORMAL_QOS.
return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS;
}
String cls = param.getClass().getName(); String cls = param.getClass().getName();
Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls); Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
RegionSpecifier regionSpecifier = null; RegionSpecifier regionSpecifier = null;
@ -203,4 +201,4 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
void setRegionServer(final HRegionServer hrs) { void setRegionServer(final HRegionServer hrs) {
this.hRegionServer = hrs; this.hRegionServer = hrs;
} }
} }

View File

@ -73,16 +73,16 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
@ -113,9 +113,11 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;

View File

@ -18,6 +18,8 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import javax.management.ObjectName;
import java.io.IOException; import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
@ -36,20 +38,18 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.HealthCheckChore;
@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@ -118,6 +120,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@ -135,6 +138,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRespon
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
@ -148,8 +152,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
@ -166,9 +170,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionMutation;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionMutationResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
@ -184,6 +185,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
@ -3303,115 +3305,108 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
// It is also the conduit via which we pass back data. // It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
CellScanner cellScanner = controller != null ? controller.cellScanner(): null; CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
// Clear scanner so we are not holding on to reference across call.
if (controller != null) controller.setCellScanner(null); if (controller != null) controller.setCellScanner(null);
List<CellScannable> cellsToReturn = null; List<CellScannable> cellsToReturn = null;
MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
try { try {
for (RegionMutation regionMutation: request.getRegionMutationList()) { HRegion region = getRegion(request.getRegion());
RegionMutationResult.Builder regionMutationResultBuilder = null; MultiResponse.Builder builder = MultiResponse.newBuilder();
HRegion region = getRegion(regionMutation.getRegion()); List<MutationProto> mutations = new ArrayList<MutationProto>(request.getActionCount());
if (regionMutation.hasAtomic() && regionMutation.getAtomic()) { // Do a bunch of mutations atomically. Mutations are Puts and Deletes. NOT Gets.
this.requestCount.increment(); if (request.hasAtomic() && request.getAtomic()) {
mutateRows(region, regionMutation.getMutationList(), cellScanner); // MultiAction is union type. Has a Get or a Mutate.
} else { for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
regionMutationResultBuilder = RegionMutationResult.newBuilder(); if (actionUnion.hasMutation()) {
cellsToReturn = doNonAtomicRegionMutation(region, regionMutation, cellScanner, mutations.add(actionUnion.getMutation());
regionMutationResultBuilder, cellsToReturn); } else {
throw new DoNotRetryIOException("Unsupported atomic action type: " + actionUnion);
}
}
// TODO: We are not updating a metric here. Should we up requestCount?
if (!mutations.isEmpty()) mutateRows(region, mutations, cellScanner);
} else {
// Do a bunch of Actions.
ActionResult.Builder resultBuilder = null;
cellsToReturn = new ArrayList<CellScannable>(request.getActionCount());
for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
this.requestCount.increment();
ClientProtos.Result result = null;
try {
if (actionUnion.hasGet()) {
Get get = ProtobufUtil.toGet(actionUnion.getGet());
Result r = region.get(get);
if (r != null) {
// Get a result with no data. The data will be carried alongside pbs, not as pbs.
result = ProtobufUtil.toResultNoData(r);
// Add the Result to controller so it gets serialized apart from pb. Get
// Results could be big so good if they are not serialized as pb.
cellsToReturn.add(r);
}
} else if (actionUnion.hasMutation()) {
MutationProto mutation = actionUnion.getMutation();
MutationType type = mutation.getMutateType();
if (type != MutationType.PUT && type != MutationType.DELETE) {
if (!mutations.isEmpty()) {
doBatchOp(builder, region, mutations, cellScanner);
mutations.clear();
} else if (!region.getRegionInfo().isMetaTable()) {
cacheFlusher.reclaimMemStoreMemory();
}
}
Result r = null;
switch (type) {
case APPEND:
r = append(region, mutation, cellScanner);
break;
case INCREMENT:
r = increment(region, mutation, cellScanner);
break;
case PUT:
case DELETE:
mutations.add(mutation);
break;
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}
if (r != null) {
// Put the data into the cellsToReturn and the metadata about the result is all that
// we will pass back in the protobuf result.
result = ProtobufUtil.toResultNoData(r);
cellsToReturn.add(r);
}
} else {
LOG.warn("Error: invalid action: " + actionUnion + ". "
+ "it must be a Get, Mutate, or Exec.");
throw new DoNotRetryIOException("Invalid action, "
+ "it must be a Get, Mutate, or Exec.");
}
if (result != null) {
if (resultBuilder == null) {
resultBuilder = ActionResult.newBuilder();
} else {
resultBuilder.clear();
}
resultBuilder.setValue(result);
builder.addResult(resultBuilder.build());
}
} catch (IOException ie) {
builder.addResult(ResponseConverter.buildActionResult(ie));
}
}
if (!mutations.isEmpty()) {
doBatchOp(builder, region, mutations, cellScanner);
} }
// Have one regionmutationresult per regionmutation even if it is empty so we keep results
// aligned w/ how the requests came in.
responseBuilder.addRegionMutationResult(regionMutationResultBuilder == null?
RegionMutationResult.getDefaultInstance(): regionMutationResultBuilder.build());
} }
// Load the controller with the Cells to return. // Load the controller with the Cells to return.
if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
} }
return responseBuilder.build(); return builder.build();
} catch (IOException ie) { } catch (IOException ie) {
throw new ServiceException(ie); throw new ServiceException(ie);
} }
} }
/**
* Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
* done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
* @param region
* @param rm
* @param cellScanner
* @param builder
* @param cellsToReturn Could be null. May be allocated in this method. This is what this
* method returns as a 'result'.
* @return Return the <code>cellScanner</code> passed
*/
private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
final RegionMutation rm, final CellScanner cellScanner,
final RegionMutationResult.Builder builder, List<CellScannable> cellsToReturn) {
// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
// one at a time, we instead pass them in batch. Be aware that the corresponding
// ResultOrException instance that matches each Put or Delete is then added down in the
// doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
List<MutationProto> mutations = null;
for (ClientProtos.MutationProto m: rm.getMutationList()) {
ClientProtos.ResultOrException resultOrException = null;
try {
Result r = null;
MutationType type = m.getMutateType();
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
!mutations.isEmpty()) {
// Flush out any Puts or Deletes already collected.
doBatchOp(builder, region, mutations, cellScanner);
mutations.clear();
}
switch (type) {
case APPEND:
r = append(region, m, cellScanner);
break;
case INCREMENT:
r = increment(region, m, cellScanner);
break;
case PUT:
case DELETE:
// Collect the individual mutations and apply in a batch
if (mutations == null) mutations =
new ArrayList<MutationProto>(rm.getMutationCount());
mutations.add(m);
break;
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}
if (r != null) {
ClientProtos.Result pbResult = null;
if (isClientCellBlockSupport()) {
pbResult = ProtobufUtil.toResultNoData(r);
// Hard to guess the size here. Just make a rough guess.
if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>(256);
cellsToReturn.add(r);
} else {
pbResult = ProtobufUtil.toResult(r);
}
resultOrException =
ClientProtos.ResultOrException.newBuilder().setResult(pbResult).build();
}
// Could get to here and there was no result and no exception. Presumes we added
// a Put or Delete to the collecting Mutations List for adding later. In this
// case the corresponding ResultOrException instance for the Put or Delete will be added
// down in the doBatchOp method call rather than up here.
} catch (IOException ie) {
resultOrException = ResultOrException.newBuilder().
setException(ResponseConverter.buildException(ie)).build();
}
if (resultOrException != null) {
builder.addResultOrException(resultOrException);
}
}
// Finish up any outstanding mutations
if (!mutations.isEmpty()) {
doBatchOp(builder, region, mutations, cellScanner);
}
return cellsToReturn;
}
// End Client methods // End Client methods
// Start Admin methods // Start Admin methods
@ -3887,10 +3882,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
try { try {
checkOpen(); checkOpen();
List<WALEntry> entries = request.getEntryList(); List<WALEntry> entries = request.getEntryList();
if (entries == null || entries.isEmpty()) { if(entries == null || entries.isEmpty()) {
// empty input // empty input
return ReplicateWALEntryResponse.newBuilder().build(); return ReplicateWALEntryResponse.newBuilder().build();
} }
HRegion region = this.getRegionByEncodedName( HRegion region = this.getRegionByEncodedName(
entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
@ -4071,13 +4067,15 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
* @param region * @param region
* @param mutations * @param mutations
*/ */
protected void doBatchOp(final RegionMutationResult.Builder builder, final HRegion region, protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
final List<MutationProto> mutations, final CellScanner cells) { final List<MutationProto> mutations, final CellScanner cells) {
Mutation[] mArray = new Mutation[mutations.size()]; Mutation[] mArray = new Mutation[mutations.size()];
long before = EnvironmentEdgeManager.currentTimeMillis(); long before = EnvironmentEdgeManager.currentTimeMillis();
boolean batchContainsPuts = false, batchContainsDelete = false; boolean batchContainsPuts = false, batchContainsDelete = false;
ResultOrException resultOrException = null;
try { try {
ActionResult.Builder resultBuilder = ActionResult.newBuilder();
resultBuilder.setValue(ClientProtos.Result.newBuilder().build());
ActionResult result = resultBuilder.build();
int i = 0; int i = 0;
for (MutationProto m : mutations) { for (MutationProto m : mutations) {
Mutation mutation; Mutation mutation;
@ -4089,6 +4087,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
batchContainsDelete = true; batchContainsDelete = true;
} }
mArray[i++] = mutation; mArray[i++] = mutation;
builder.addResult(result);
} }
requestCount.add(mutations.size()); requestCount.add(mutations.size());
@ -4100,21 +4099,21 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
for (i = 0; i < codes.length; i++) { for (i = 0; i < codes.length; i++) {
switch (codes[i].getOperationStatusCode()) { switch (codes[i].getOperationStatusCode()) {
case BAD_FAMILY: case BAD_FAMILY:
resultOrException = ResponseConverter.buildActionResult( result = ResponseConverter.buildActionResult(
new NoSuchColumnFamilyException(codes[i].getExceptionMsg())); new NoSuchColumnFamilyException(codes[i].getExceptionMsg()));
builder.setResultOrException(i, resultOrException); builder.setResult(i, result);
break; break;
case SANITY_CHECK_FAILURE: case SANITY_CHECK_FAILURE:
resultOrException = ResponseConverter.buildActionResult( result = ResponseConverter.buildActionResult(
new FailedSanityCheckException(codes[i].getExceptionMsg())); new FailedSanityCheckException(codes[i].getExceptionMsg()));
builder.setResultOrException(i, resultOrException); builder.setResult(i, result);
break; break;
default: default:
resultOrException = ResponseConverter.buildActionResult( result = ResponseConverter.buildActionResult(
new DoNotRetryIOException(codes[i].getExceptionMsg())); new DoNotRetryIOException(codes[i].getExceptionMsg()));
builder.setResultOrException(i, resultOrException); builder.setResult(i, result);
break; break;
case SUCCESS: case SUCCESS:
@ -4122,9 +4121,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
} }
} }
} catch (IOException ie) { } catch (IOException ie) {
resultOrException = ResponseConverter.buildActionResult(ie); ActionResult result = ResponseConverter.buildActionResult(ie);
for (int i = 0; i < mutations.size(); i++) { for (int i = 0; i < mutations.size(); i++) {
builder.setResultOrException(i, resultOrException); builder.setResult(i, result);
} }
} }
long after = EnvironmentEdgeManager.currentTimeMillis(); long after = EnvironmentEdgeManager.currentTimeMillis();
@ -4146,9 +4145,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
* exceptionMessage if any * exceptionMessage if any
* @throws IOException * @throws IOException
*/ */
protected OperationStatus [] doBatchOp(final HRegion region, protected OperationStatus[] doBatchOp(final HRegion region,
final List<Pair<MutationType, Mutation>> mutations, boolean isReplay) final List<Pair<MutationType, Mutation>> mutations, boolean isReplay) throws IOException {
throws IOException {
Mutation[] mArray = new Mutation[mutations.size()]; Mutation[] mArray = new Mutation[mutations.size()];
long before = EnvironmentEdgeManager.currentTimeMillis(); long before = EnvironmentEdgeManager.currentTimeMillis();
boolean batchContainsPuts = false, batchContainsDelete = false; boolean batchContainsPuts = false, batchContainsDelete = false;

View File

@ -33,14 +33,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -51,10 +53,10 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryR
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionMutationResult; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -116,7 +118,7 @@ public class WALEditsReplaySink {
HRegionLocation loc = null; HRegionLocation loc = null;
HLog.Entry entry = null; HLog.Entry entry = null;
List<HLog.Entry> regionEntries = null; List<HLog.Entry> regionEntries = null;
// Build the action list. // Build the action list.
for (int i = 0; i < batchSize; i++) { for (int i = 0; i < batchSize; i++) {
loc = entries.get(i).getFirst(); loc = entries.get(i).getFirst();
entry = entries.get(i).getSecond(); entry = entries.get(i).getSecond();
@ -128,7 +130,7 @@ public class WALEditsReplaySink {
} }
regionEntries.add(entry); regionEntries.add(entry);
} }
long startTime = EnvironmentEdgeManager.currentTimeMillis(); long startTime = EnvironmentEdgeManager.currentTimeMillis();
// replaying edits by region // replaying edits by region
@ -141,7 +143,7 @@ public class WALEditsReplaySink {
for (; replayedActions < totalActions;) { for (; replayedActions < totalActions;) {
curBatchSize = (totalActions > (MAX_BATCH_SIZE + replayedActions)) ? MAX_BATCH_SIZE curBatchSize = (totalActions > (MAX_BATCH_SIZE + replayedActions)) ? MAX_BATCH_SIZE
: (totalActions - replayedActions); : (totalActions - replayedActions);
replayEdits(loc, curRegion, allActions.subList(replayedActions, replayEdits(loc, curRegion, allActions.subList(replayedActions,
replayedActions + curBatchSize)); replayedActions + curBatchSize));
replayedActions += curBatchSize; replayedActions += curBatchSize;
} }
@ -183,7 +185,7 @@ public class WALEditsReplaySink {
} }
} }
} }
/** /**
* Callable that handles the <code>replay</code> method call going against a single regionserver * Callable that handles the <code>replay</code> method call going against a single regionserver
* @param <R> * @param <R>
@ -200,7 +202,7 @@ public class WALEditsReplaySink {
this.regionInfo = regionInfo; this.regionInfo = regionInfo;
setLocation(regionLoc); setLocation(regionLoc);
} }
@Override @Override
public ReplicateWALEntryResponse call() throws IOException { public ReplicateWALEntryResponse call() throws IOException {
try { try {

View File

@ -20,8 +20,8 @@ import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -43,19 +43,11 @@ public class TestQosFunction {
checkMethod("ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction); checkMethod("ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction);
// Set method name in pb style with the method name capitalized. // Set method name in pb style with the method name capitalized.
checkMethod("OpenRegion", HConstants.HIGH_QOS, qosFunction); checkMethod("OpenRegion", HConstants.HIGH_QOS, qosFunction);
// Check multi works.
checkMethod("Multi", HConstants.NORMAL_QOS, qosFunction, MultiRequest.getDefaultInstance());
} }
private void checkMethod(final String methodName, final int expected, private void checkMethod(final String methodName, final int expected, final AnnotationReadingPriorityFunction qosf) {
final AnnotationReadingPriorityFunction qosf) {
checkMethod(methodName, expected, qosf, null);
}
private void checkMethod(final String methodName, final int expected,
final AnnotationReadingPriorityFunction qosf, final Message param) {
RequestHeader.Builder builder = RequestHeader.newBuilder(); RequestHeader.Builder builder = RequestHeader.newBuilder();
builder.setMethodName(methodName); builder.setMethodName(methodName);
assertEquals(methodName, expected, qosf.getPriority(builder.build(), param)); assertEquals(methodName, expected, qosf.getPriority(builder.build(), null));
} }
} }