HBASE-6284 HRegion#doMiniBatchMutation() (Anoop)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1356566 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5eacb5a6cf
commit
6505939970
|
@ -1778,15 +1778,14 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
/**
|
||||
* Setup a Delete object with correct timestamps.
|
||||
* Caller should the row and region locks.
|
||||
* @param delete
|
||||
* Setup correct timestamps in the KVs in Delete object.
|
||||
* Caller should have the row and region locks.
|
||||
* @param familyMap
|
||||
* @param now
|
||||
* @throws IOException
|
||||
*/
|
||||
void prepareDeleteTimestamps(Delete delete, byte[] byteNow)
|
||||
void prepareDeleteTimestamps(Map<byte[], List<KeyValue>> familyMap, byte[] byteNow)
|
||||
throws IOException {
|
||||
Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
|
||||
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
|
||||
|
||||
byte[] family = e.getKey();
|
||||
|
@ -1855,7 +1854,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
updatesLock.readLock().lock();
|
||||
try {
|
||||
prepareDeleteTimestamps(delete, byteNow);
|
||||
prepareDeleteTimestamps(delete.getFamilyMap(), byteNow);
|
||||
|
||||
if (writeToWAL) {
|
||||
// write/sync to WAL should happen before we touch memstore.
|
||||
|
@ -1986,27 +1985,27 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*/
|
||||
public OperationStatus[] put(Put[] puts) throws IOException {
|
||||
@SuppressWarnings("unchecked")
|
||||
Pair<Put, Integer> putsAndLocks[] = new Pair[puts.length];
|
||||
Pair<Mutation, Integer> putsAndLocks[] = new Pair[puts.length];
|
||||
|
||||
for (int i = 0; i < puts.length; i++) {
|
||||
putsAndLocks[i] = new Pair<Put, Integer>(puts[i], null);
|
||||
putsAndLocks[i] = new Pair<Mutation, Integer>(puts[i], null);
|
||||
}
|
||||
return put(putsAndLocks);
|
||||
return batchMutate(putsAndLocks);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform a batch of puts.
|
||||
*
|
||||
* Perform a batch of mutations.
|
||||
* It supports only Put and Delete mutations and will ignore other types passed.
|
||||
* @param putsAndLocks
|
||||
* the list of puts paired with their requested lock IDs.
|
||||
* the list of mutations paired with their requested lock IDs.
|
||||
* @return an array of OperationStatus which internally contains the
|
||||
* OperationStatusCode and the exceptionMessage if any.
|
||||
* @throws IOException
|
||||
*/
|
||||
public OperationStatus[] put(
|
||||
Pair<Put, Integer>[] putsAndLocks) throws IOException {
|
||||
BatchOperationInProgress<Pair<Put, Integer>> batchOp =
|
||||
new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
|
||||
public OperationStatus[] batchMutate(
|
||||
Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
|
||||
BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
|
||||
new BatchOperationInProgress<Pair<Mutation,Integer>>(mutationsAndLocks);
|
||||
|
||||
boolean initialized = false;
|
||||
|
||||
|
@ -2020,10 +2019,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
try {
|
||||
if (!initialized) {
|
||||
this.writeRequestsCount.increment();
|
||||
doPrePutHook(batchOp);
|
||||
doPreMutationHook(batchOp);
|
||||
initialized = true;
|
||||
}
|
||||
long addedSize = doMiniBatchPut(batchOp);
|
||||
long addedSize = doMiniBatchMutation(batchOp);
|
||||
newSize = this.addAndGetGlobalMemstoreSize(addedSize);
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
|
@ -2035,18 +2034,32 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return batchOp.retCodeDetails;
|
||||
}
|
||||
|
||||
private void doPrePutHook(BatchOperationInProgress<Pair<Put, Integer>> batchOp)
|
||||
private void doPreMutationHook(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp)
|
||||
throws IOException {
|
||||
/* Run coprocessor pre hook outside of locks to avoid deadlock */
|
||||
WALEdit walEdit = new WALEdit();
|
||||
if (coprocessorHost != null) {
|
||||
for (int i = 0 ; i < batchOp.operations.length; i++) {
|
||||
Pair<Put, Integer> nextPair = batchOp.operations[i];
|
||||
Put put = nextPair.getFirst();
|
||||
if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) {
|
||||
// pre hook says skip this Put
|
||||
// mark as success and skip in doMiniBatchPut
|
||||
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
||||
Pair<Mutation, Integer> nextPair = batchOp.operations[i];
|
||||
Mutation m = nextPair.getFirst();
|
||||
if (m instanceof Put) {
|
||||
if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
|
||||
// pre hook says skip this Put
|
||||
// mark as success and skip in doMiniBatchMutation
|
||||
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
||||
}
|
||||
} else if (m instanceof Delete) {
|
||||
if (coprocessorHost.preDelete((Delete) m, walEdit, m.getWriteToWAL())) {
|
||||
// pre hook says skip this Delete
|
||||
// mark as success and skip in doMiniBatchMutation
|
||||
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
||||
}
|
||||
} else {
|
||||
// In case of passing Append mutations along with the Puts and Deletes in batchMutate
|
||||
// mark the operation return code as failure so that it will not be considered in
|
||||
// the doMiniBatchMutation
|
||||
batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
|
||||
"Put/Delete mutations only supported in batchMutate() now");
|
||||
}
|
||||
if (!walEdit.isEmpty()) {
|
||||
batchOp.walEditsFromCoprocessors[i] = walEdit;
|
||||
|
@ -2058,15 +2071,18 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private long doMiniBatchPut(
|
||||
BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
|
||||
private long doMiniBatchMutation(
|
||||
BatchOperationInProgress<Pair<Mutation, Integer>> batchOp) throws IOException {
|
||||
|
||||
// variable to note if all Put items are for the same CF -- metrics related
|
||||
boolean cfSetConsistent = true;
|
||||
boolean putsCfSetConsistent = true;
|
||||
//The set of columnFamilies first seen for Put.
|
||||
Set<byte[]> putsCfSet = null;
|
||||
// variable to note if all Delete items are for the same CF -- metrics related
|
||||
boolean deletesCfSetConsistent = true;
|
||||
//The set of columnFamilies first seen for Delete.
|
||||
Set<byte[]> deletesCfSet = null;
|
||||
|
||||
//The set of columnFamilies first seen.
|
||||
Set<byte[]> cfSet = null;
|
||||
|
||||
WALEdit walEdit = new WALEdit();
|
||||
|
||||
long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
|
||||
|
@ -2085,6 +2101,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
int firstIndex = batchOp.nextIndexToProcess;
|
||||
int lastIndexExclusive = firstIndex;
|
||||
boolean success = false;
|
||||
int noOfPuts = 0, noOfDeletes = 0;
|
||||
try {
|
||||
// ------------------------------------
|
||||
// STEP 1. Try to acquire as many locks as we can, and ensure
|
||||
|
@ -2093,11 +2110,12 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
int numReadyToWrite = 0;
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
while (lastIndexExclusive < batchOp.operations.length) {
|
||||
Pair<Put, Integer> nextPair = batchOp.operations[lastIndexExclusive];
|
||||
Put put = nextPair.getFirst();
|
||||
Pair<Mutation, Integer> nextPair = batchOp.operations[lastIndexExclusive];
|
||||
Mutation mutation = nextPair.getFirst();
|
||||
boolean isPutMutation = mutation instanceof Put;
|
||||
Integer providedLockId = nextPair.getSecond();
|
||||
|
||||
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
|
||||
Map<byte[], List<KeyValue>> familyMap = mutation.getFamilyMap();
|
||||
// store the family map reference to allow for mutations
|
||||
familyMaps[lastIndexExclusive] = familyMap;
|
||||
|
||||
|
@ -2108,22 +2126,25 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
continue;
|
||||
}
|
||||
|
||||
// Check the families in the put. If bad, skip this one.
|
||||
try {
|
||||
checkFamilies(familyMap.keySet());
|
||||
checkTimestamps(put, now);
|
||||
if (isPutMutation) {
|
||||
// Check the families in the put. If bad, skip this one.
|
||||
checkFamilies(familyMap.keySet());
|
||||
checkTimestamps(mutation.getFamilyMap(), now);
|
||||
} else {
|
||||
prepareDelete((Delete) mutation);
|
||||
}
|
||||
} catch (DoNotRetryIOException dnrioe) {
|
||||
LOG.warn("No such column family in batch put", dnrioe);
|
||||
LOG.warn("No such column family in batch mutation", dnrioe);
|
||||
batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
|
||||
OperationStatusCode.SANITY_CHECK_FAILURE, dnrioe.getMessage());
|
||||
lastIndexExclusive++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// If we haven't got any rows in our batch, we should block to
|
||||
// get the next one.
|
||||
boolean shouldBlock = numReadyToWrite == 0;
|
||||
Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock);
|
||||
Integer acquiredLockId = getLock(providedLockId, mutation.getRow(), shouldBlock);
|
||||
if (acquiredLockId == null) {
|
||||
// We failed to grab another lock
|
||||
assert !shouldBlock : "Should never fail to get lock when blocking";
|
||||
|
@ -2135,25 +2156,35 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
lastIndexExclusive++;
|
||||
numReadyToWrite++;
|
||||
|
||||
//If Column Families stay consistent through out all of the
|
||||
//individual puts then metrics can be reported as a mutliput across
|
||||
//column families in the first put.
|
||||
if (cfSet == null) {
|
||||
cfSet = put.getFamilyMap().keySet();
|
||||
if (isPutMutation) {
|
||||
// If Column Families stay consistent through out all of the
|
||||
// individual puts then metrics can be reported as a mutliput across
|
||||
// column families in the first put.
|
||||
if (putsCfSet == null) {
|
||||
putsCfSet = mutation.getFamilyMap().keySet();
|
||||
} else {
|
||||
putsCfSetConsistent = putsCfSetConsistent
|
||||
&& mutation.getFamilyMap().keySet().equals(putsCfSet);
|
||||
}
|
||||
} else {
|
||||
cfSetConsistent = cfSetConsistent && put.getFamilyMap().keySet().equals(cfSet);
|
||||
if (deletesCfSet == null) {
|
||||
deletesCfSet = mutation.getFamilyMap().keySet();
|
||||
} else {
|
||||
deletesCfSetConsistent = deletesCfSetConsistent
|
||||
&& mutation.getFamilyMap().keySet().equals(deletesCfSet);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// we should record the timestamp only after we have acquired the rowLock,
|
||||
// otherwise, newer puts are not guaranteed to have a newer timestamp
|
||||
// otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
|
||||
now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
byte[] byteNow = Bytes.toBytes(now);
|
||||
|
||||
// Nothing to put -- an exception in the above such as NoSuchColumnFamily?
|
||||
// Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
|
||||
if (numReadyToWrite <= 0) return 0L;
|
||||
|
||||
// We've now grabbed as many puts off the list as we can
|
||||
// We've now grabbed as many mutations off the list as we can
|
||||
|
||||
// ------------------------------------
|
||||
// STEP 2. Update any LATEST_TIMESTAMP timestamps
|
||||
|
@ -2163,9 +2194,14 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if (batchOp.retCodeDetails[i].getOperationStatusCode()
|
||||
!= OperationStatusCode.NOT_RUN) continue;
|
||||
|
||||
updateKVTimestamps(
|
||||
familyMaps[i].values(),
|
||||
byteNow);
|
||||
Mutation mutation = batchOp.operations[i].getFirst();
|
||||
if (mutation instanceof Put) {
|
||||
updateKVTimestamps(familyMaps[i].values(), byteNow);
|
||||
noOfPuts++;
|
||||
} else {
|
||||
prepareDeleteTimestamps(familyMaps[i], byteNow);
|
||||
noOfDeletes++;
|
||||
}
|
||||
}
|
||||
|
||||
this.updatesLock.readLock().lock();
|
||||
|
@ -2206,9 +2242,11 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
|
||||
|
||||
Put p = batchOp.operations[i].getFirst();
|
||||
if (!p.getWriteToWAL()) {
|
||||
recordPutWithoutWal(p.getFamilyMap());
|
||||
Mutation m = batchOp.operations[i].getFirst();
|
||||
if (!m.getWriteToWAL()) {
|
||||
if (m instanceof Put) {
|
||||
recordPutWithoutWal(m.getFamilyMap());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
// Add WAL edits by CP
|
||||
|
@ -2225,7 +2263,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// -------------------------
|
||||
// STEP 5. Append the edit to WAL. Do not sync wal.
|
||||
// -------------------------
|
||||
Put first = batchOp.operations[firstIndex].getFirst();
|
||||
Mutation first = batchOp.operations[firstIndex].getFirst();
|
||||
txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
|
||||
walEdit, first.getClusterId(), now, this.htableDescriptor);
|
||||
|
||||
|
@ -2261,7 +2299,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// ------------------------------------
|
||||
// STEP 9. Run coprocessor post hooks. This should be done after the wal is
|
||||
// sycned so that the coprocessor contract is adhered to.
|
||||
// synced so that the coprocessor contract is adhered to.
|
||||
// ------------------------------------
|
||||
if (coprocessorHost != null) {
|
||||
for (int i = firstIndex; i < lastIndexExclusive; i++) {
|
||||
|
@ -2270,8 +2308,12 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
!= OperationStatusCode.SUCCESS) {
|
||||
continue;
|
||||
}
|
||||
Put p = batchOp.operations[i].getFirst();
|
||||
coprocessorHost.postPut(p, walEdit, p.getWriteToWAL());
|
||||
Mutation m = batchOp.operations[i].getFirst();
|
||||
if (m instanceof Put) {
|
||||
coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
|
||||
} else {
|
||||
coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2296,14 +2338,26 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
// do after lock
|
||||
final long endTimeMs = EnvironmentEdgeManager.currentTimeMillis();
|
||||
final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis() - startTimeMs;
|
||||
|
||||
//See if the column families were consistent through the whole thing.
|
||||
//if they were then keep them. If they were not then pass a null.
|
||||
//null will be treated as unknown.
|
||||
final Set<byte[]> keptCfs = cfSetConsistent ? cfSet : null;
|
||||
this.opMetrics.updateMultiPutMetrics(keptCfs, endTimeMs - startTimeMs);
|
||||
|
||||
// See if the column families were consistent through the whole thing.
|
||||
// if they were then keep them. If they were not then pass a null.
|
||||
// null will be treated as unknown.
|
||||
// Total time taken might be involving Puts and Deletes.
|
||||
// Split the time for puts and deletes based on the total number of Puts and Deletes.
|
||||
long timeTakenForPuts = 0;
|
||||
if (noOfPuts > 0) {
|
||||
// There were some Puts in the batch.
|
||||
double noOfMutations = noOfPuts + noOfDeletes;
|
||||
timeTakenForPuts = (long) (netTimeMs * (noOfPuts / noOfMutations));
|
||||
final Set<byte[]> keptCfs = putsCfSetConsistent ? putsCfSet : null;
|
||||
this.opMetrics.updateMultiPutMetrics(keptCfs, timeTakenForPuts);
|
||||
}
|
||||
if (noOfDeletes > 0) {
|
||||
// There were some Deletes in the batch.
|
||||
final Set<byte[]> keptCfs = deletesCfSetConsistent ? deletesCfSet : null;
|
||||
this.opMetrics.updateMultiDeleteMetrics(keptCfs, netTimeMs - timeTakenForPuts);
|
||||
}
|
||||
if (!success) {
|
||||
for (int i = firstIndex; i < lastIndexExclusive; i++) {
|
||||
if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
|
||||
|
@ -2612,10 +2666,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
/**
|
||||
* Remove all the keys listed in the map from the memstore. This method is
|
||||
* called when a Put has updated memstore but subequently fails to update
|
||||
* called when a Put/Delete has updated memstore but subequently fails to update
|
||||
* the wal. This method is then invoked to rollback the memstore.
|
||||
*/
|
||||
private void rollbackMemstore(BatchOperationInProgress<Pair<Put, Integer>> batchOp,
|
||||
private void rollbackMemstore(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
|
||||
Map<byte[], List<KeyValue>>[] familyMaps,
|
||||
int start, int end) {
|
||||
int kvsRolledback = 0;
|
||||
|
@ -2656,9 +2710,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
checkFamily(family);
|
||||
}
|
||||
}
|
||||
private void checkTimestamps(Put p, long now) throws DoNotRetryIOException {
|
||||
checkTimestamps(p.getFamilyMap(), now);
|
||||
}
|
||||
|
||||
void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap,
|
||||
long now) throws DoNotRetryIOException {
|
||||
|
|
|
@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.MultiAction;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
|
@ -3224,7 +3225,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
mutateRows(region, mutates);
|
||||
} else {
|
||||
ActionResult.Builder resultBuilder = null;
|
||||
List<Mutate> puts = new ArrayList<Mutate>();
|
||||
List<Mutate> mutates = new ArrayList<Mutate>();
|
||||
for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
|
||||
requestCount.incrementAndGet();
|
||||
try {
|
||||
|
@ -3239,10 +3240,10 @@ public class HRegionServer implements ClientProtocol,
|
|||
} else if (actionUnion.hasMutate()) {
|
||||
Mutate mutate = actionUnion.getMutate();
|
||||
MutateType type = mutate.getMutateType();
|
||||
if (type != MutateType.PUT) {
|
||||
if (!puts.isEmpty()) {
|
||||
put(builder, region, puts);
|
||||
puts.clear();
|
||||
if (type != MutateType.PUT && type != MutateType.DELETE) {
|
||||
if (!mutates.isEmpty()) {
|
||||
doBatchOp(builder, region, mutates);
|
||||
mutates.clear();
|
||||
} else if (!region.getRegionInfo().isMetaTable()) {
|
||||
cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
|
@ -3256,17 +3257,13 @@ public class HRegionServer implements ClientProtocol,
|
|||
r = increment(region, mutate);
|
||||
break;
|
||||
case PUT:
|
||||
puts.add(mutate);
|
||||
mutates.add(mutate);
|
||||
break;
|
||||
case DELETE:
|
||||
Delete delete = ProtobufUtil.toDelete(mutate);
|
||||
Integer lock = getLockFromId(delete.getLockId());
|
||||
region.delete(delete, lock, delete.getWriteToWAL());
|
||||
r = new Result();
|
||||
mutates.add(mutate);
|
||||
break;
|
||||
default:
|
||||
throw new DoNotRetryIOException(
|
||||
"Unsupported mutate type: " + type.name());
|
||||
default:
|
||||
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
|
||||
}
|
||||
if (r != null) {
|
||||
result = ProtobufUtil.toResult(r);
|
||||
|
@ -3294,8 +3291,8 @@ public class HRegionServer implements ClientProtocol,
|
|||
builder.addResult(ResponseConverter.buildActionResult(ie));
|
||||
}
|
||||
}
|
||||
if (!puts.isEmpty()) {
|
||||
put(builder, region, puts);
|
||||
if (!mutates.isEmpty()) {
|
||||
doBatchOp(builder, region, mutates);
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
|
@ -3755,16 +3752,16 @@ public class HRegionServer implements ClientProtocol,
|
|||
}
|
||||
|
||||
/**
|
||||
* Execute a list of put mutations.
|
||||
* Execute a list of Put/Delete mutations.
|
||||
*
|
||||
* @param builder
|
||||
* @param region
|
||||
* @param puts
|
||||
* @param mutates
|
||||
*/
|
||||
protected void put(final MultiResponse.Builder builder,
|
||||
final HRegion region, final List<Mutate> puts) {
|
||||
protected void doBatchOp(final MultiResponse.Builder builder,
|
||||
final HRegion region, final List<Mutate> mutates) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
|
||||
Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutates.size()];
|
||||
|
||||
try {
|
||||
ActionResult.Builder resultBuilder = ActionResult.newBuilder();
|
||||
|
@ -3773,19 +3770,24 @@ public class HRegionServer implements ClientProtocol,
|
|||
ActionResult result = resultBuilder.build();
|
||||
|
||||
int i = 0;
|
||||
for (Mutate put : puts) {
|
||||
Put p = ProtobufUtil.toPut(put);
|
||||
Integer lock = getLockFromId(p.getLockId());
|
||||
putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
|
||||
for (Mutate m : mutates) {
|
||||
Mutation mutation = null;
|
||||
if (m.getMutateType() == MutateType.PUT) {
|
||||
mutation = ProtobufUtil.toPut(m);
|
||||
} else {
|
||||
mutation = ProtobufUtil.toDelete(m);
|
||||
}
|
||||
Integer lock = getLockFromId(mutation.getLockId());
|
||||
mutationsWithLocks[i++] = new Pair<Mutation, Integer>(mutation, lock);
|
||||
builder.addResult(result);
|
||||
}
|
||||
|
||||
requestCount.addAndGet(puts.size());
|
||||
requestCount.addAndGet(mutates.size());
|
||||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
|
||||
OperationStatus codes[] = region.put(putsWithLocks);
|
||||
OperationStatus codes[] = region.batchMutate(mutationsWithLocks);
|
||||
for (i = 0; i < codes.length; i++) {
|
||||
if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
|
||||
result = ResponseConverter.buildActionResult(
|
||||
|
@ -3795,7 +3797,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
}
|
||||
} catch (IOException ie) {
|
||||
ActionResult result = ResponseConverter.buildActionResult(ie);
|
||||
for (int i = 0, n = puts.size(); i < n; i++) {
|
||||
for (int i = 0, n = mutates.size(); i < n; i++) {
|
||||
builder.setResult(i, result);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ class MultiRowMutationProcessor extends BaseRowProcessor<Void> {
|
|||
} else if (m instanceof Delete) {
|
||||
Delete d = (Delete) m;
|
||||
region.prepareDelete(d);
|
||||
region.prepareDeleteTimestamps(d, byteNow);
|
||||
region.prepareDeleteTimestamps(d.getFamilyMap(), byteNow);
|
||||
} else {
|
||||
throw new DoNotRetryIOException(
|
||||
"Action must be Put or Delete. But was: "
|
||||
|
|
|
@ -46,6 +46,7 @@ public class OperationMetrics {
|
|||
private static final String ICV_KEY = "incrementColumnValue_";
|
||||
private static final String INCREMENT_KEY = "increment_";
|
||||
private static final String MULTIPUT_KEY = "multiput_";
|
||||
private static final String MULTIDELETE_KEY = "multidelete_";
|
||||
private static final String APPEND_KEY = "append_";
|
||||
|
||||
/** Conf key controlling whether we should expose metrics.*/
|
||||
|
@ -101,6 +102,16 @@ public class OperationMetrics {
|
|||
doUpdateTimeVarying(columnFamilies, MULTIPUT_KEY, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the stats associated with {@link HTable#delete(java.util.List)}.
|
||||
*
|
||||
* @param columnFamilies Set of CF's this multidelete is associated with
|
||||
* @param value the time
|
||||
*/
|
||||
public void updateMultiDeleteMetrics(Set<byte[]> columnFamilies, long value) {
|
||||
doUpdateTimeVarying(columnFamilies, MULTIDELETE_KEY, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the metrics associated with a {@link Get}
|
||||
*
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
|
@ -610,14 +611,14 @@ public class TestHRegion extends HBaseTestCase {
|
|||
LOG.info("Nexta, a batch put which uses an already-held lock");
|
||||
lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
|
||||
LOG.info("...obtained row lock");
|
||||
List<Pair<Put, Integer>> putsAndLocks = Lists.newArrayList();
|
||||
List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Pair<Put, Integer> pair = new Pair<Put, Integer>(puts[i], null);
|
||||
Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[i], null);
|
||||
if (i == 2) pair.setSecond(lockedRow);
|
||||
putsAndLocks.add(pair);
|
||||
}
|
||||
|
||||
codes = region.put(putsAndLocks.toArray(new Pair[0]));
|
||||
codes = region.batchMutate(putsAndLocks.toArray(new Pair[0]));
|
||||
LOG.info("...performed put");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :
|
||||
|
|
Loading…
Reference in New Issue