HBASE-18962 Support atomic (all or none) BatchOperations through batchMutate()
Signed-off-by: Apekshit Sharma <appy@apache.org> Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
07b0ac4161
commit
85227d6a72
|
@ -2952,6 +2952,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
protected final ObservedExceptionsInBatch observedExceptions;
|
protected final ObservedExceptionsInBatch observedExceptions;
|
||||||
//Durability of the batch (highest durability of all operations)
|
//Durability of the batch (highest durability of all operations)
|
||||||
protected Durability durability;
|
protected Durability durability;
|
||||||
|
protected boolean atomic = false;
|
||||||
|
|
||||||
public BatchOperation(final HRegion region, T[] operations) {
|
public BatchOperation(final HRegion region, T[] operations) {
|
||||||
this.operations = operations;
|
this.operations = operations;
|
||||||
|
@ -3067,6 +3068,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
return getMutation(0).getClusterIds();
|
return getMutation(0).getClusterIds();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean isAtomic() {
|
||||||
|
return atomic;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method that checks and prepares only one mutation. This can be used to implement
|
* Helper method that checks and prepares only one mutation. This can be used to implement
|
||||||
* {@link #checkAndPrepare()} for entire Batch.
|
* {@link #checkAndPrepare()} for entire Batch.
|
||||||
|
@ -3097,16 +3102,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
if (tmpDur.ordinal() > durability.ordinal()) {
|
if (tmpDur.ordinal() > durability.ordinal()) {
|
||||||
durability = tmpDur;
|
durability = tmpDur;
|
||||||
}
|
}
|
||||||
} catch (NoSuchColumnFamilyException nscf) {
|
} catch (NoSuchColumnFamilyException nscfe) {
|
||||||
final String msg = "No such column family in batch mutation. ";
|
final String msg = "No such column family in batch mutation. ";
|
||||||
if (observedExceptions.hasSeenNoSuchFamily()) {
|
if (observedExceptions.hasSeenNoSuchFamily()) {
|
||||||
LOG.warn(msg + nscf.getMessage());
|
LOG.warn(msg + nscfe.getMessage());
|
||||||
} else {
|
} else {
|
||||||
LOG.warn(msg, nscf);
|
LOG.warn(msg, nscfe);
|
||||||
observedExceptions.sawNoSuchFamily();
|
observedExceptions.sawNoSuchFamily();
|
||||||
}
|
}
|
||||||
retCodeDetails[index] = new OperationStatus(
|
retCodeDetails[index] = new OperationStatus(
|
||||||
OperationStatusCode.BAD_FAMILY, nscf.getMessage());
|
OperationStatusCode.BAD_FAMILY, nscfe.getMessage());
|
||||||
|
if (isAtomic()) { // fail, atomic means all or none
|
||||||
|
throw nscfe;
|
||||||
|
}
|
||||||
} catch (FailedSanityCheckException fsce) {
|
} catch (FailedSanityCheckException fsce) {
|
||||||
final String msg = "Batch Mutation did not pass sanity check. ";
|
final String msg = "Batch Mutation did not pass sanity check. ";
|
||||||
if (observedExceptions.hasSeenFailedSanityCheck()) {
|
if (observedExceptions.hasSeenFailedSanityCheck()) {
|
||||||
|
@ -3117,6 +3125,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
retCodeDetails[index] = new OperationStatus(
|
retCodeDetails[index] = new OperationStatus(
|
||||||
OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
|
OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
|
||||||
|
if (isAtomic()) {
|
||||||
|
throw fsce;
|
||||||
|
}
|
||||||
} catch (WrongRegionException we) {
|
} catch (WrongRegionException we) {
|
||||||
final String msg = "Batch mutation had a row that does not belong to this region. ";
|
final String msg = "Batch mutation had a row that does not belong to this region. ";
|
||||||
if (observedExceptions.hasSeenWrongRegion()) {
|
if (observedExceptions.hasSeenWrongRegion()) {
|
||||||
|
@ -3127,6 +3138,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
retCodeDetails[index] = new OperationStatus(
|
retCodeDetails[index] = new OperationStatus(
|
||||||
OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
|
OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
|
||||||
|
if (isAtomic()) {
|
||||||
|
throw we;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3150,15 +3164,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
// If we haven't got any rows in our batch, we should block to get the next one.
|
// If we haven't got any rows in our batch, we should block to get the next one.
|
||||||
RowLock rowLock = null;
|
RowLock rowLock = null;
|
||||||
try {
|
try {
|
||||||
rowLock = region.getRowLockInternal(mutation.getRow(), true);
|
// if atomic then get exclusive lock, else shared lock
|
||||||
|
rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic());
|
||||||
} catch (TimeoutIOException e) {
|
} catch (TimeoutIOException e) {
|
||||||
// We will retry when other exceptions, but we should stop if we timeout .
|
// We will retry when other exceptions, but we should stop if we timeout .
|
||||||
throw e;
|
throw e;
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
|
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
|
||||||
|
if (isAtomic()) { // fail, atomic means all or none
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (rowLock == null) {
|
if (rowLock == null) {
|
||||||
// We failed to grab another lock
|
// We failed to grab another lock
|
||||||
|
if (isAtomic()) {
|
||||||
|
throw new IOException("Can't apply all operations atomically!");
|
||||||
|
}
|
||||||
break; // Stop acquiring more rows for this batch
|
break; // Stop acquiring more rows for this batch
|
||||||
} else {
|
} else {
|
||||||
acquiredRowLocks.add(rowLock);
|
acquiredRowLocks.add(rowLock);
|
||||||
|
@ -3279,12 +3300,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
* Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most
|
* Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most
|
||||||
* of the logic is same.
|
* of the logic is same.
|
||||||
*/
|
*/
|
||||||
private static class MutationBatchOperation extends BatchOperation<Mutation> {
|
static class MutationBatchOperation extends BatchOperation<Mutation> {
|
||||||
private long nonceGroup;
|
private long nonceGroup;
|
||||||
private long nonce;
|
private long nonce;
|
||||||
public MutationBatchOperation(final HRegion region, Mutation[] operations, long nonceGroup,
|
public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,
|
||||||
long nonce) {
|
long nonceGroup, long nonce) {
|
||||||
super(region, operations);
|
super(region, operations);
|
||||||
|
this.atomic = atomic;
|
||||||
this.nonceGroup = nonceGroup;
|
this.nonceGroup = nonceGroup;
|
||||||
this.nonce = nonce;
|
this.nonce = nonce;
|
||||||
}
|
}
|
||||||
|
@ -3522,11 +3544,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
retCodeDetails[index] = OperationStatus.SUCCESS;
|
retCodeDetails[index] = OperationStatus.SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
String msg = "Put/Delete mutations only supported in a batch";
|
||||||
// In case of passing Append mutations along with the Puts and Deletes in batchMutate
|
// In case of passing Append mutations along with the Puts and Deletes in batchMutate
|
||||||
// mark the operation return code as failure so that it will not be considered in
|
// mark the operation return code as failure so that it will not be considered in
|
||||||
// the doMiniBatchMutation
|
// the doMiniBatchMutation
|
||||||
retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE,
|
retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE, msg);
|
||||||
"Put/Delete mutations only supported in batchMutate() now");
|
|
||||||
|
if (isAtomic()) { // fail, atomic means all or none
|
||||||
|
throw new IOException(msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3582,7 +3608,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
* Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most
|
* Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most
|
||||||
* of the logic is same.
|
* of the logic is same.
|
||||||
*/
|
*/
|
||||||
private static class ReplayBatchOperation extends BatchOperation<MutationReplay> {
|
static class ReplayBatchOperation extends BatchOperation<MutationReplay> {
|
||||||
private long origLogSeqNum = 0;
|
private long origLogSeqNum = 0;
|
||||||
public ReplayBatchOperation(final HRegion region, MutationReplay[] operations,
|
public ReplayBatchOperation(final HRegion region, MutationReplay[] operations,
|
||||||
long origLogSeqNum) {
|
long origLogSeqNum) {
|
||||||
|
@ -3695,11 +3721,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
|
public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
return batchMutate(mutations, false, nonceGroup, nonce);
|
||||||
|
}
|
||||||
|
|
||||||
|
public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
|
||||||
|
long nonce) throws IOException {
|
||||||
// As it stands, this is used for 3 things
|
// As it stands, this is used for 3 things
|
||||||
// * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
|
// * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
|
||||||
// * coprocessor calls (see ex. BulkDeleteEndpoint).
|
// * coprocessor calls (see ex. BulkDeleteEndpoint).
|
||||||
// So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
|
// So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
|
||||||
return batchMutate(new MutationBatchOperation(this, mutations, nonceGroup, nonce));
|
return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce));
|
||||||
}
|
}
|
||||||
|
|
||||||
public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
|
public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -604,9 +604,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mutate a list of rows atomically.
|
* Mutate a list of rows atomically.
|
||||||
*
|
|
||||||
* @param cellScanner if non-null, the mutation data -- the Cell content.
|
* @param cellScanner if non-null, the mutation data -- the Cell content.
|
||||||
* @param comparator @throws IOException
|
|
||||||
*/
|
*/
|
||||||
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
|
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
|
||||||
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
|
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
|
||||||
|
@ -757,10 +755,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
/**
|
/**
|
||||||
* Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
|
* Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
|
||||||
* done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
|
* done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
|
||||||
* @param region
|
|
||||||
* @param actions
|
|
||||||
* @param cellScanner
|
|
||||||
* @param builder
|
|
||||||
* @param cellsToReturn Could be null. May be allocated in this method. This is what this
|
* @param cellsToReturn Could be null. May be allocated in this method. This is what this
|
||||||
* method returns as a 'result'.
|
* method returns as a 'result'.
|
||||||
* @param closeCallBack the callback to be used with multigets
|
* @param closeCallBack the callback to be used with multigets
|
||||||
|
@ -864,7 +858,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
|
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
|
||||||
!mutations.isEmpty()) {
|
!mutations.isEmpty()) {
|
||||||
// Flush out any Puts or Deletes already collected.
|
// Flush out any Puts or Deletes already collected.
|
||||||
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
|
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
|
||||||
mutations.clear();
|
mutations.clear();
|
||||||
}
|
}
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
@ -925,7 +919,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
// Finish up any outstanding mutations
|
// Finish up any outstanding mutations
|
||||||
if (mutations != null && !mutations.isEmpty()) {
|
if (mutations != null && !mutations.isEmpty()) {
|
||||||
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
|
try {
|
||||||
|
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
rpcServer.getMetrics().exception(ioe);
|
||||||
|
NameBytesPair pair = ResponseConverter.buildException(ioe);
|
||||||
|
resultOrExceptionBuilder.setException(pair);
|
||||||
|
context.incrementResponseExceptionSize(pair.getSerializedSize());
|
||||||
|
builder.addResultOrException(resultOrExceptionBuilder.build());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return cellsToReturn;
|
return cellsToReturn;
|
||||||
}
|
}
|
||||||
|
@ -955,7 +957,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
*/
|
*/
|
||||||
private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
|
private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
|
||||||
final OperationQuota quota, final List<ClientProtos.Action> mutations,
|
final OperationQuota quota, final List<ClientProtos.Action> mutations,
|
||||||
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
|
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
|
||||||
|
throws IOException {
|
||||||
Mutation[] mArray = new Mutation[mutations.size()];
|
Mutation[] mArray = new Mutation[mutations.size()];
|
||||||
long before = EnvironmentEdgeManager.currentTime();
|
long before = EnvironmentEdgeManager.currentTime();
|
||||||
boolean batchContainsPuts = false, batchContainsDelete = false;
|
boolean batchContainsPuts = false, batchContainsDelete = false;
|
||||||
|
@ -967,7 +970,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
* is the mutation belong to. We can't sort ClientProtos.Action array, since they
|
* is the mutation belong to. We can't sort ClientProtos.Action array, since they
|
||||||
* are bonded to cellscanners.
|
* are bonded to cellscanners.
|
||||||
*/
|
*/
|
||||||
Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<Mutation, ClientProtos.Action>();
|
Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (ClientProtos.Action action: mutations) {
|
for (ClientProtos.Action action: mutations) {
|
||||||
MutationProto m = action.getMutation();
|
MutationProto m = action.getMutation();
|
||||||
|
@ -995,7 +998,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
// sort to improve lock efficiency
|
// sort to improve lock efficiency
|
||||||
Arrays.sort(mArray);
|
Arrays.sort(mArray);
|
||||||
|
|
||||||
OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
|
OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE,
|
||||||
HConstants.NO_NONCE);
|
HConstants.NO_NONCE);
|
||||||
for (i = 0; i < codes.length; i++) {
|
for (i = 0; i < codes.length; i++) {
|
||||||
Mutation currentMutation = mArray[i];
|
Mutation currentMutation = mArray[i];
|
||||||
|
@ -1025,6 +1028,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
|
if (atomic) {
|
||||||
|
throw ie;
|
||||||
|
}
|
||||||
for (int i = 0; i < mutations.size(); i++) {
|
for (int i = 0; i < mutations.size(); i++) {
|
||||||
builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
|
builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
|
||||||
}
|
}
|
||||||
|
@ -1130,7 +1136,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exposed for testing
|
// Exposed for testing
|
||||||
static interface LogDelegate {
|
interface LogDelegate {
|
||||||
void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold);
|
void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3229,7 +3235,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
addScannerLeaseBack(lease);
|
addScannerLeaseBack(lease);
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
};
|
}
|
||||||
try {
|
try {
|
||||||
checkScanNextCallSeq(request, rsh);
|
checkScanNextCallSeq(request, rsh);
|
||||||
} catch (OutOfOrderScannerNextException e) {
|
} catch (OutOfOrderScannerNextException e) {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -130,6 +130,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
|
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
|
||||||
import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
|
import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
|
||||||
|
@ -165,6 +166,7 @@ import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
import org.junit.rules.TestRule;
|
import org.junit.rules.TestRule;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
|
@ -200,6 +202,7 @@ public class TestHRegion {
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final TestRule timeout =
|
public static final TestRule timeout =
|
||||||
CategoryBasedTimeout.forClass(TestHRegion.class);
|
CategoryBasedTimeout.forClass(TestHRegion.class);
|
||||||
|
@Rule public final ExpectedException thrown = ExpectedException.none();
|
||||||
|
|
||||||
private static final String COLUMN_FAMILY = "MyCF";
|
private static final String COLUMN_FAMILY = "MyCF";
|
||||||
private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
|
private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
|
||||||
|
@ -215,9 +218,11 @@ public class TestHRegion {
|
||||||
// Test names
|
// Test names
|
||||||
protected TableName tableName;
|
protected TableName tableName;
|
||||||
protected String method;
|
protected String method;
|
||||||
|
protected final byte[] qual = Bytes.toBytes("qual");
|
||||||
protected final byte[] qual1 = Bytes.toBytes("qual1");
|
protected final byte[] qual1 = Bytes.toBytes("qual1");
|
||||||
protected final byte[] qual2 = Bytes.toBytes("qual2");
|
protected final byte[] qual2 = Bytes.toBytes("qual2");
|
||||||
protected final byte[] qual3 = Bytes.toBytes("qual3");
|
protected final byte[] qual3 = Bytes.toBytes("qual3");
|
||||||
|
protected final byte[] value = Bytes.toBytes("value");
|
||||||
protected final byte[] value1 = Bytes.toBytes("value1");
|
protected final byte[] value1 = Bytes.toBytes("value1");
|
||||||
protected final byte[] value2 = Bytes.toBytes("value2");
|
protected final byte[] value2 = Bytes.toBytes("value2");
|
||||||
protected final byte[] row = Bytes.toBytes("rowA");
|
protected final byte[] row = Bytes.toBytes("rowA");
|
||||||
|
@ -1522,21 +1527,10 @@ public class TestHRegion {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBatchPut_whileNoRowLocksHeld() throws IOException {
|
public void testBatchPut_whileNoRowLocksHeld() throws IOException {
|
||||||
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
|
final Put[] puts = new Put[10];
|
||||||
byte[] qual = Bytes.toBytes("qual");
|
|
||||||
byte[] val = Bytes.toBytes("val");
|
|
||||||
this.region = initHRegion(tableName, method, CONF, cf);
|
|
||||||
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
|
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
|
||||||
try {
|
try {
|
||||||
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
|
long syncs = prepareRegionForBachPut(puts, source, false);
|
||||||
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
|
|
||||||
|
|
||||||
LOG.info("First a batch put with all valid puts");
|
|
||||||
final Put[] puts = new Put[10];
|
|
||||||
for (int i = 0; i < 10; i++) {
|
|
||||||
puts[i] = new Put(Bytes.toBytes("row_" + i));
|
|
||||||
puts[i].addColumn(cf, qual, val);
|
|
||||||
}
|
|
||||||
|
|
||||||
OperationStatus[] codes = this.region.batchMutate(puts);
|
OperationStatus[] codes = this.region.batchMutate(puts);
|
||||||
assertEquals(10, codes.length);
|
assertEquals(10, codes.length);
|
||||||
|
@ -1546,7 +1540,7 @@ public class TestHRegion {
|
||||||
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
|
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
|
||||||
|
|
||||||
LOG.info("Next a batch put with one invalid family");
|
LOG.info("Next a batch put with one invalid family");
|
||||||
puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, val);
|
puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
|
||||||
codes = this.region.batchMutate(puts);
|
codes = this.region.batchMutate(puts);
|
||||||
assertEquals(10, codes.length);
|
assertEquals(10, codes.length);
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
|
@ -1563,21 +1557,12 @@ public class TestHRegion {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
|
public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
|
||||||
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
|
final Put[] puts = new Put[10];
|
||||||
byte[] qual = Bytes.toBytes("qual");
|
|
||||||
byte[] val = Bytes.toBytes("val");
|
|
||||||
this.region = initHRegion(tableName, method, CONF, cf);
|
|
||||||
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
|
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
|
||||||
try {
|
try {
|
||||||
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
|
long syncs = prepareRegionForBachPut(puts, source, false);
|
||||||
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
|
|
||||||
|
|
||||||
final Put[] puts = new Put[10];
|
puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
|
||||||
for (int i = 0; i < 10; i++) {
|
|
||||||
puts[i] = new Put(Bytes.toBytes("row_" + i));
|
|
||||||
puts[i].addColumn(cf, qual, val);
|
|
||||||
}
|
|
||||||
puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, val);
|
|
||||||
|
|
||||||
LOG.info("batchPut will have to break into four batches to avoid row locks");
|
LOG.info("batchPut will have to break into four batches to avoid row locks");
|
||||||
RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
|
RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
|
||||||
|
@ -1585,7 +1570,6 @@ public class TestHRegion {
|
||||||
RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
|
RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
|
||||||
RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
|
RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
|
||||||
|
|
||||||
|
|
||||||
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
|
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
|
||||||
final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
|
final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
|
||||||
final CountDownLatch startingPuts = new CountDownLatch(1);
|
final CountDownLatch startingPuts = new CountDownLatch(1);
|
||||||
|
@ -1664,25 +1648,83 @@ public class TestHRegion {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBatchPutWithTsSlop() throws Exception {
|
public void testAtomicBatchPut() throws IOException {
|
||||||
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
|
final Put[] puts = new Put[10];
|
||||||
byte[] qual = Bytes.toBytes("qual");
|
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
|
||||||
byte[] val = Bytes.toBytes("val");
|
try {
|
||||||
|
long syncs = prepareRegionForBachPut(puts, source, false);
|
||||||
|
|
||||||
|
// 1. Straight forward case, should succeed
|
||||||
|
MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
|
||||||
|
HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||||
|
OperationStatus[] codes = this.region.batchMutate(batchOp);
|
||||||
|
assertEquals(10, codes.length);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
|
||||||
|
}
|
||||||
|
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
|
||||||
|
|
||||||
|
// 2. Failed to get lock
|
||||||
|
RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
|
||||||
|
// Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
|
||||||
|
// thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
|
||||||
|
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
|
||||||
|
final AtomicReference<IOException> retFromThread = new AtomicReference<>();
|
||||||
|
final CountDownLatch finishedPuts = new CountDownLatch(1);
|
||||||
|
final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
|
||||||
|
HConstants
|
||||||
|
.NO_NONCE,
|
||||||
|
HConstants.NO_NONCE);
|
||||||
|
TestThread putter = new TestThread(ctx) {
|
||||||
|
@Override
|
||||||
|
public void doWork() throws IOException {
|
||||||
|
try {
|
||||||
|
region.batchMutate(finalBatchOp);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("test failed!", ioe);
|
||||||
|
retFromThread.set(ioe);
|
||||||
|
}
|
||||||
|
finishedPuts.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
LOG.info("...starting put thread while holding locks");
|
||||||
|
ctx.addThread(putter);
|
||||||
|
ctx.startThreads();
|
||||||
|
LOG.info("...waiting for batch puts while holding locks");
|
||||||
|
try {
|
||||||
|
finishedPuts.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.error("Interrupted!", e);
|
||||||
|
} finally {
|
||||||
|
if (lock != null) {
|
||||||
|
lock.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertNotNull(retFromThread.get());
|
||||||
|
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
|
||||||
|
|
||||||
|
// 3. Exception thrown in validation
|
||||||
|
LOG.info("Next a batch put with one invalid family");
|
||||||
|
puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
|
||||||
|
batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
|
||||||
|
HConstants.NO_NONCE);
|
||||||
|
thrown.expect(NoSuchColumnFamilyException.class);
|
||||||
|
this.region.batchMutate(batchOp);
|
||||||
|
} finally {
|
||||||
|
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||||
|
this.region = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBatchPutWithTsSlop() throws Exception {
|
||||||
// add data with a timestamp that is too recent for range. Ensure assert
|
// add data with a timestamp that is too recent for range. Ensure assert
|
||||||
CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
|
CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
|
||||||
this.region = initHRegion(tableName, method, CONF, cf);
|
final Put[] puts = new Put[10];
|
||||||
|
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
|
long syncs = prepareRegionForBachPut(puts, source, true);
|
||||||
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
|
|
||||||
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
|
|
||||||
|
|
||||||
final Put[] puts = new Put[10];
|
|
||||||
for (int i = 0; i < 10; i++) {
|
|
||||||
puts[i] = new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100);
|
|
||||||
puts[i].addColumn(cf, qual, val);
|
|
||||||
}
|
|
||||||
|
|
||||||
OperationStatus[] codes = this.region.batchMutate(puts);
|
OperationStatus[] codes = this.region.batchMutate(puts);
|
||||||
assertEquals(10, codes.length);
|
assertEquals(10, codes.length);
|
||||||
|
@ -1690,12 +1732,29 @@ public class TestHRegion {
|
||||||
assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
|
assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
|
||||||
}
|
}
|
||||||
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
|
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||||
this.region = null;
|
this.region = null;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return syncs initial syncTimeNumOps
|
||||||
|
*/
|
||||||
|
private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source,
|
||||||
|
boolean slop) throws IOException {
|
||||||
|
this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
|
||||||
|
|
||||||
|
LOG.info("First a batch put with all valid puts");
|
||||||
|
for (int i = 0; i < puts.length; i++) {
|
||||||
|
puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) :
|
||||||
|
new Put(Bytes.toBytes("row_" + i));
|
||||||
|
puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
|
||||||
|
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
|
||||||
|
return syncs;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ////////////////////////////////////////////////////////////////////////////
|
// ////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
Loading…
Reference in New Issue