HBASE-18961 doMiniBatchMutate() is split into smaller member methods of BatchOperation and it's sub-classes

There is no functionality change except for below:
* Variable lastIndexExclusive was getting incremented while locking rows corresponding to input
  operations. As a result when getRowLockInternal() method throws TimeoutIOException only operations
  in range [nextIndexToProcess, lastIndexExclusive) was getting marked as FAILED before raising
  exception up the call stack. With these changes all operations are getting marked as FAILED.
* Cluster Ids of first mutation is used consistently for entire batch. Previous behavior was to use
  cluster ids of first mutation in a mini-batch

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Umesh Agashe 2017-10-08 00:31:12 -07:00 committed by Michael Stack
parent 1110910b3a
commit fa3cc6c3bc
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
6 changed files with 807 additions and 581 deletions

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -31,7 +30,6 @@ public class NonceKey {
private long nonce;
public NonceKey(long group, long nonce) {
assert nonce != HConstants.NO_NONCE;
this.group = group;
this.nonce = nonce;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.wal.WALEdit;
@ -40,13 +41,22 @@ public class MiniBatchOperationInProgress<T> {
private final int firstIndex;
private final int lastIndexExclusive;
private int readyToWriteCount = 0;
private int cellCount = 0;
private int numOfPuts = 0;
private int numOfDeletes = 0;
public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive) {
WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive,
int readyToWriteCount) {
Preconditions.checkArgument(readyToWriteCount <= (lastIndexExclusive - firstIndex));
this.operations = operations;
this.retCodeDetails = retCodeDetails;
this.walEditsFromCoprocessors = walEditsFromCoprocessors;
this.firstIndex = firstIndex;
this.lastIndexExclusive = lastIndexExclusive;
this.readyToWriteCount = readyToWriteCount;
}
/**
@ -127,4 +137,36 @@ public class MiniBatchOperationInProgress<T> {
return operationsFromCoprocessors == null ? null :
operationsFromCoprocessors[getAbsoluteIndex(index)];
}
public int getReadyToWriteCount() {
return readyToWriteCount;
}
public int getLastIndexExclusive() {
return lastIndexExclusive;
}
public int getCellCount() {
return cellCount;
}
public void addCellCount(int cellCount) {
this.cellCount += cellCount;
}
public int getNumOfPuts() {
return numOfPuts;
}
public void incrementNumOfPuts() {
this.numOfPuts += 1;
}
public int getNumOfDeletes() {
return numOfDeletes;
}
public void incrementNumOfDeletes() {
this.numOfDeletes += 1;
}
}

View File

@ -137,7 +137,7 @@ MultiRowMutationProcessorResponse> {
if (coprocessorHost != null) {
miniBatch = new MiniBatchOperationInProgress<>(
mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
mutations.size());
mutations.size(), mutations.size());
coprocessorHost.preBatchMutate(miniBatch);
}
// Apply edits to a single WALEdit

View File

@ -44,7 +44,7 @@ public class TestMiniBatchOperationInProgress {
}
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatch =
new MiniBatchOperationInProgress<>(operations, retCodeDetails,
walEditsFromCoprocessors, 0, 5);
walEditsFromCoprocessors, 0, 5, 5);
assertEquals(5, miniBatch.size());
assertTrue(Bytes.equals(Bytes.toBytes(0), miniBatch.getOperation(0).getFirst().getRow()));
@ -69,7 +69,7 @@ public class TestMiniBatchOperationInProgress {
}
miniBatch = new MiniBatchOperationInProgress<>(operations,
retCodeDetails, walEditsFromCoprocessors, 7, 10);
retCodeDetails, walEditsFromCoprocessors, 7, 10, 3);
try {
miniBatch.setWalEdit(-1, new WALEdit());
fail("Should throw Exception while accessing out of range");

View File

@ -867,7 +867,7 @@ public class TestWithDisabledAuthorization extends SecureTestUtil {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preBatchMutate(ObserverContextImpl.createAndPrepare(RCP_ENV),
new MiniBatchOperationInProgress<>(null, null, null, 0, 0));
new MiniBatchOperationInProgress<>(null, null, null, 0, 0, 0));
return null;
}
}, SUPERUSER, USER_ADMIN, USER_RW, USER_RO, USER_OWNER, USER_CREATE, USER_QUAL, USER_NONE);