HBASE-15214 Valid mutate Ops fail with RPC Codec in use and region moves across.

This commit is contained in:
anoopsjohn 2016-02-06 02:40:49 +05:30
parent 4265bf275f
commit 7239056c78
3 changed files with 45 additions and 11 deletions

View File

@ -543,7 +543,7 @@ public final class ProtobufUtil {
MutationType type = proto.getMutateType(); MutationType type = proto.getMutateType();
assert type == MutationType.PUT: type.name(); assert type == MutationType.PUT: type.name();
long timestamp = proto.hasTimestamp()? proto.getTimestamp(): HConstants.LATEST_TIMESTAMP; long timestamp = proto.hasTimestamp()? proto.getTimestamp(): HConstants.LATEST_TIMESTAMP;
Put put = null; Put put = proto.hasRow() ? new Put(proto.getRow().toByteArray(), timestamp) : null;
int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0; int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
if (cellCount > 0) { if (cellCount > 0) {
// The proto has metadata only and the data is separate to be found in the cellScanner. // The proto has metadata only and the data is separate to be found in the cellScanner.
@ -563,9 +563,7 @@ public final class ProtobufUtil {
put.add(cell); put.add(cell);
} }
} else { } else {
if (proto.hasRow()) { if (put == null) {
put = new Put(proto.getRow().asReadOnlyByteBuffer(), timestamp);
} else {
throw new IllegalArgumentException("row cannot be null"); throw new IllegalArgumentException("row cannot be null");
} }
// The proto has the metadata and the data itself // The proto has the metadata and the data itself
@ -639,12 +637,8 @@ public final class ProtobufUtil {
throws IOException { throws IOException {
MutationType type = proto.getMutateType(); MutationType type = proto.getMutateType();
assert type == MutationType.DELETE : type.name(); assert type == MutationType.DELETE : type.name();
byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null; long timestamp = proto.hasTimestamp() ? proto.getTimestamp() : HConstants.LATEST_TIMESTAMP;
long timestamp = HConstants.LATEST_TIMESTAMP; Delete delete = proto.hasRow() ? new Delete(proto.getRow().toByteArray(), timestamp) : null;
if (proto.hasTimestamp()) {
timestamp = proto.getTimestamp();
}
Delete delete = null;
int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0; int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
if (cellCount > 0) { if (cellCount > 0) {
// The proto has metadata only and the data is separate to be found in the cellScanner. // The proto has metadata only and the data is separate to be found in the cellScanner.
@ -667,7 +661,9 @@ public final class ProtobufUtil {
delete.addDeleteMarker(cell); delete.addDeleteMarker(cell);
} }
} else { } else {
delete = new Delete(row, timestamp); if (delete == null) {
throw new IllegalArgumentException("row cannot be null");
}
for (ColumnValue column: proto.getColumnValueList()) { for (ColumnValue column: proto.getColumnValueList()) {
byte[] family = column.getFamily().toByteArray(); byte[] family = column.getFamily().toByteArray();
for (QualifierValue qv: column.getQualifierValueList()) { for (QualifierValue qv: column.getQualifierValueList()) {

View File

@ -129,6 +129,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
@ -696,6 +697,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
setException(ResponseConverter.buildException(sizeIOE)); setException(ResponseConverter.buildException(sizeIOE));
resultOrExceptionBuilder.setIndex(action.getIndex()); resultOrExceptionBuilder.setIndex(action.getIndex());
builder.addResultOrException(resultOrExceptionBuilder.build()); builder.addResultOrException(resultOrExceptionBuilder.build());
if (cellScanner != null) {
skipCellsForMutation(action, cellScanner);
}
continue; continue;
} }
if (action.hasGet()) { if (action.hasGet()) {
@ -2239,6 +2243,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rpcServer.getMetrics().exception(e); rpcServer.getMetrics().exception(e);
regionActionResultBuilder.setException(ResponseConverter.buildException(e)); regionActionResultBuilder.setException(ResponseConverter.buildException(e));
responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
// All Mutations in this RegionAction not executed as we can not see the Region online here
// in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
// corresponding to these Mutations.
if (cellScanner != null) {
skipCellsForMutations(regionAction.getActionList(), cellScanner);
}
continue; // For this region it's a failure. continue; // For this region it's a failure.
} }
@ -2296,6 +2306,30 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return responseBuilder.build(); return responseBuilder.build();
} }
private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
for (Action action : actions) {
skipCellsForMutation(action, cellScanner);
}
}
private void skipCellsForMutation(Action action, CellScanner cellScanner) {
try {
if (action.hasMutation()) {
MutationProto m = action.getMutation();
if (m.hasAssociatedCellCount()) {
for (int i = 0; i < m.getAssociatedCellCount(); i++) {
cellScanner.advance();
}
}
}
} catch (IOException e) {
// No need to handle these Individual Muatation level issue. Any way this entire RegionAction
// marked as failed as we could not see the Region here. At client side the top level
// RegionAction exception will be considered first.
LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
}
}
/** /**
* Mutate data in a table. * Mutate data in a table.
* *

View File

@ -35,10 +35,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -75,6 +77,8 @@ public class TestMultiParallel {
//((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
KeyValueCodec.class.getCanonicalName());
UTIL.startMiniCluster(slaves); UTIL.startMiniCluster(slaves);
Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY)); Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
UTIL.waitTableEnabled(TEST_TABLE); UTIL.waitTableEnabled(TEST_TABLE);