HBASE-18961 doMiniBatchMutate() is split into smaller member methods of BatchOperation and it's sub-classes
There is no functionality change except for below: * Variable lastIndexExclusive was getting incremented while locking rows corresponding to input operations. As a result when getRowLockInternal() method throws TimeoutIOException only operations in range [nextIndexToProcess, lastIndexExclusive) was getting marked as FAILED before raising exception up the call stack. With these changes all operations are getting marked as FAILED. * Cluster Ids of first mutation is used consistently for entire batch. Previous behavior was to use cluster ids of first mutation in a mini-batch Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
29fd1dead2
commit
4eae5a2974
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -18,7 +18,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -31,7 +30,6 @@ public class NonceKey {
|
|||
private long nonce;
|
||||
|
||||
public NonceKey(long group, long nonce) {
|
||||
assert nonce != HConstants.NO_NONCE;
|
||||
this.group = group;
|
||||
this.nonce = nonce;
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
|
@ -40,13 +41,22 @@ public class MiniBatchOperationInProgress<T> {
|
|||
private final int firstIndex;
|
||||
private final int lastIndexExclusive;
|
||||
|
||||
private int readyToWriteCount = 0;
|
||||
private int cellCount = 0;
|
||||
private int numOfPuts = 0;
|
||||
private int numOfDeletes = 0;
|
||||
|
||||
|
||||
public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
|
||||
WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive) {
|
||||
WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive,
|
||||
int readyToWriteCount) {
|
||||
Preconditions.checkArgument(readyToWriteCount <= (lastIndexExclusive - firstIndex));
|
||||
this.operations = operations;
|
||||
this.retCodeDetails = retCodeDetails;
|
||||
this.walEditsFromCoprocessors = walEditsFromCoprocessors;
|
||||
this.firstIndex = firstIndex;
|
||||
this.lastIndexExclusive = lastIndexExclusive;
|
||||
this.readyToWriteCount = readyToWriteCount;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -127,4 +137,36 @@ public class MiniBatchOperationInProgress<T> {
|
|||
return operationsFromCoprocessors == null ? null :
|
||||
operationsFromCoprocessors[getAbsoluteIndex(index)];
|
||||
}
|
||||
|
||||
public int getReadyToWriteCount() {
|
||||
return readyToWriteCount;
|
||||
}
|
||||
|
||||
public int getLastIndexExclusive() {
|
||||
return lastIndexExclusive;
|
||||
}
|
||||
|
||||
public int getCellCount() {
|
||||
return cellCount;
|
||||
}
|
||||
|
||||
public void addCellCount(int cellCount) {
|
||||
this.cellCount += cellCount;
|
||||
}
|
||||
|
||||
public int getNumOfPuts() {
|
||||
return numOfPuts;
|
||||
}
|
||||
|
||||
public void incrementNumOfPuts() {
|
||||
this.numOfPuts += 1;
|
||||
}
|
||||
|
||||
public int getNumOfDeletes() {
|
||||
return numOfDeletes;
|
||||
}
|
||||
|
||||
public void incrementNumOfDeletes() {
|
||||
this.numOfDeletes += 1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -137,7 +137,7 @@ MultiRowMutationProcessorResponse> {
|
|||
if (coprocessorHost != null) {
|
||||
miniBatch = new MiniBatchOperationInProgress<>(
|
||||
mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
|
||||
mutations.size());
|
||||
mutations.size(), mutations.size());
|
||||
coprocessorHost.preBatchMutate(miniBatch);
|
||||
}
|
||||
// Apply edits to a single WALEdit
|
||||
|
|
|
@ -44,7 +44,7 @@ public class TestMiniBatchOperationInProgress {
|
|||
}
|
||||
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatch =
|
||||
new MiniBatchOperationInProgress<>(operations, retCodeDetails,
|
||||
walEditsFromCoprocessors, 0, 5);
|
||||
walEditsFromCoprocessors, 0, 5, 5);
|
||||
|
||||
assertEquals(5, miniBatch.size());
|
||||
assertTrue(Bytes.equals(Bytes.toBytes(0), miniBatch.getOperation(0).getFirst().getRow()));
|
||||
|
@ -69,7 +69,7 @@ public class TestMiniBatchOperationInProgress {
|
|||
}
|
||||
|
||||
miniBatch = new MiniBatchOperationInProgress<>(operations,
|
||||
retCodeDetails, walEditsFromCoprocessors, 7, 10);
|
||||
retCodeDetails, walEditsFromCoprocessors, 7, 10, 3);
|
||||
try {
|
||||
miniBatch.setWalEdit(-1, new WALEdit());
|
||||
fail("Should throw Exception while accessing out of range");
|
||||
|
|
|
@ -867,7 +867,7 @@ public class TestWithDisabledAuthorization extends SecureTestUtil {
|
|||
@Override
|
||||
public Object run() throws Exception {
|
||||
ACCESS_CONTROLLER.preBatchMutate(ObserverContextImpl.createAndPrepare(RCP_ENV),
|
||||
new MiniBatchOperationInProgress<>(null, null, null, 0, 0));
|
||||
new MiniBatchOperationInProgress<>(null, null, null, 0, 0, 0));
|
||||
return null;
|
||||
}
|
||||
}, SUPERUSER, USER_ADMIN, USER_RW, USER_RO, USER_OWNER, USER_CREATE, USER_QUAL, USER_NONE);
|
||||
|
|
Loading…
Reference in New Issue