HBASE-18963 Removed MultiRowMutationProcessor and modified mutateRows...() to use batchMutate() with atomic set to true
This commit is contained in:
parent
3d4fd90947
commit
7139113fde
@ -1,4 +1,4 @@
|
||||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
@ -47,14 +48,14 @@ import com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
* This class demonstrates how to implement atomic multi row transactions using
|
||||
* {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)}
|
||||
* {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)}
|
||||
* and Coprocessor endpoints.
|
||||
*
|
||||
* Defines a protocol to perform multi row transactions.
|
||||
* See {@link MultiRowMutationEndpoint} for the implementation.
|
||||
* <br>
|
||||
* See
|
||||
* {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)}
|
||||
* {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)}
|
||||
* for details and limitations.
|
||||
* <br>
|
||||
* Example:
|
||||
|
@ -3279,6 +3279,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
|
||||
WALEdit walEdit) {
|
||||
for (List<Cell> edits : familyMap.values()) {
|
||||
// Optimization: 'foreach' loop is not used. See:
|
||||
// HBASE-12023 HRegion.applyFamilyMapToMemstore creates too many iterator objects
|
||||
assert edits instanceof RandomAccess;
|
||||
int listSize = edits.size();
|
||||
for (int i=0; i < listSize; i++) {
|
||||
@ -4109,6 +4111,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
throws IOException {
|
||||
for (List<Cell> cells: cellItr) {
|
||||
if (cells == null) continue;
|
||||
// Optimization: 'foreach' loop is not used. See:
|
||||
// HBASE-12023 HRegion.applyFamilyMapToMemstore creates too many iterator objects
|
||||
assert cells instanceof RandomAccess;
|
||||
int listSize = cells.size();
|
||||
for (int i = 0; i < listSize; i++) {
|
||||
@ -4259,6 +4263,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
}
|
||||
long maxTs = now + timestampSlop;
|
||||
for (List<Cell> kvs : familyMap.values()) {
|
||||
// Optimization: 'foreach' loop is not used. See:
|
||||
// HBASE-12023 HRegion.applyFamilyMapToMemstore creates too many iterator objects
|
||||
assert kvs instanceof RandomAccess;
|
||||
int listSize = kvs.size();
|
||||
for (int i=0; i < listSize; i++) {
|
||||
@ -7135,20 +7141,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
@Override
|
||||
public void mutateRow(RowMutations rm) throws IOException {
|
||||
// Don't need nonces here - RowMutations only supports puts and deletes
|
||||
mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
|
||||
final List<Mutation> m = rm.getMutations();
|
||||
batchMutate(m.toArray(new Mutation[m.size()]), true, HConstants.NO_NONCE,
|
||||
HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform atomic mutations within the region w/o nonces.
|
||||
* See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
|
||||
*/
|
||||
public void mutateRowsWithLocks(Collection<Mutation> mutations,
|
||||
Collection<byte[]> rowsToLock) throws IOException {
|
||||
mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform atomic mutations within the region.
|
||||
* Perform atomic (all or none) mutations within the region.
|
||||
* @param mutations The list of mutations to perform.
|
||||
* <code>mutations</code> can contain operations for multiple rows.
|
||||
* Caller has to ensure that all rows are contained in this region.
|
||||
@ -7162,8 +7161,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
@Override
|
||||
public void mutateRowsWithLocks(Collection<Mutation> mutations,
|
||||
Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
|
||||
MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
|
||||
processRowsWithLocks(proc, -1, nonceGroup, nonce);
|
||||
batchMutate(new MutationBatchOperation(this, mutations.toArray(new Mutation[mutations.size()]),
|
||||
true, nonceGroup, nonce) {
|
||||
@Override
|
||||
public MiniBatchOperationInProgress<Mutation> lockRowsAndBuildMiniBatch(
|
||||
List<RowLock> acquiredRowLocks) throws IOException {
|
||||
for (byte[] row : rowsToLock) {
|
||||
try {
|
||||
RowLock rowLock = region.getRowLockInternal(row, false); // write lock
|
||||
acquiredRowLocks.add(rowLock);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(row), ioe);
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
return createMiniBatch(size(), size());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -7193,8 +7207,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
|
||||
@Override
|
||||
public void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException {
|
||||
processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE,
|
||||
HConstants.NO_NONCE);
|
||||
processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -8094,6 +8107,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
|
||||
long mutationSize = 0;
|
||||
for (List<Cell> cells: familyMap.values()) {
|
||||
// Optimization: 'foreach' loop is not used. See:
|
||||
// HBASE-12023 HRegion.applyFamilyMapToMemstore creates too many iterator objects
|
||||
assert cells instanceof RandomAccess;
|
||||
int listSize = cells.size();
|
||||
for (int i=0; i < listSize; i++) {
|
||||
|
@ -1,209 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* A <code>MultiRowProcessor</code> that performs multiple puts and deletes.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class MultiRowMutationProcessor extends BaseRowProcessor<MultiRowMutationProcessorRequest,
|
||||
MultiRowMutationProcessorResponse> {
|
||||
Collection<byte[]> rowsToLock;
|
||||
Collection<Mutation> mutations;
|
||||
MiniBatchOperationInProgress<Mutation> miniBatch;
|
||||
|
||||
MultiRowMutationProcessor(Collection<Mutation> mutations,
|
||||
Collection<byte[]> rowsToLock) {
|
||||
this.rowsToLock = rowsToLock;
|
||||
this.mutations = mutations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<byte[]> getRowsToLock() {
|
||||
return rowsToLock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readOnly() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultiRowMutationProcessorResponse getResult() {
|
||||
return MultiRowMutationProcessorResponse.getDefaultInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(long now,
|
||||
HRegion region,
|
||||
List<Mutation> mutationsToApply,
|
||||
WALEdit walEdit) throws IOException {
|
||||
byte[] byteNow = Bytes.toBytes(now);
|
||||
// Check mutations
|
||||
for (Mutation m : this.mutations) {
|
||||
if (m instanceof Put) {
|
||||
Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
|
||||
region.checkFamilies(familyMap.keySet());
|
||||
region.checkTimestamps(familyMap, now);
|
||||
region.updateCellTimestamps(familyMap.values(), byteNow);
|
||||
} else if (m instanceof Delete) {
|
||||
Delete d = (Delete) m;
|
||||
region.prepareDelete(d);
|
||||
region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow);
|
||||
} else {
|
||||
throw new DoNotRetryIOException("Action must be Put or Delete. But was: "
|
||||
+ m.getClass().getName());
|
||||
}
|
||||
mutationsToApply.add(m);
|
||||
}
|
||||
// Apply edits to a single WALEdit
|
||||
for (Mutation m : mutations) {
|
||||
for (List<Cell> cells : m.getFamilyCellMap().values()) {
|
||||
boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
|
||||
for (Cell cell : cells) {
|
||||
if (writeToWAL) walEdit.add(cell);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
|
||||
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||
if (coprocessorHost != null) {
|
||||
for (Mutation m : mutations) {
|
||||
if (m instanceof Put) {
|
||||
if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
|
||||
// by pass everything
|
||||
// Is this right? Bypass everything and not just this individual put?
|
||||
// This class is going away in hbase2 so lets not sweat it.
|
||||
return;
|
||||
}
|
||||
} else if (m instanceof Delete) {
|
||||
Delete d = (Delete) m;
|
||||
region.prepareDelete(d);
|
||||
if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) {
|
||||
// by pass everything
|
||||
// Is this right? Bypass everything and not just this individual put?
|
||||
// This class is going away in hbase2 so lets not sweat it.
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
|
||||
// TODO we should return back the status of this hook run to HRegion so that those Mutations
|
||||
// with OperationStatus as SUCCESS or FAILURE should not get applied to memstore.
|
||||
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||
OperationStatus[] opStatus = new OperationStatus[mutations.size()];
|
||||
Arrays.fill(opStatus, OperationStatus.NOT_RUN);
|
||||
WALEdit[] walEditsFromCP = new WALEdit[mutations.size()];
|
||||
if (coprocessorHost != null) {
|
||||
miniBatch = new MiniBatchOperationInProgress<>(
|
||||
mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
|
||||
mutations.size(), mutations.size());
|
||||
coprocessorHost.preBatchMutate(miniBatch);
|
||||
}
|
||||
// Apply edits to a single WALEdit
|
||||
for (int i = 0; i < mutations.size(); i++) {
|
||||
if (opStatus[i] == OperationStatus.NOT_RUN) {
|
||||
// Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook
|
||||
// itself. No need to apply again to region
|
||||
if (walEditsFromCP[i] != null) {
|
||||
// Add the WALEdit created by CP hook
|
||||
for (Cell walCell : walEditsFromCP[i].getCells()) {
|
||||
walEdit.add(walCell);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postBatchMutate(HRegion region) throws IOException {
|
||||
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||
if (coprocessorHost != null) {
|
||||
assert miniBatch != null;
|
||||
// Use the same miniBatch state used to call the preBatchMutate()
|
||||
coprocessorHost.postBatchMutate(miniBatch);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException {
|
||||
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||
if (coprocessorHost != null) {
|
||||
for (Mutation m : mutations) {
|
||||
if (m instanceof Put) {
|
||||
coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
|
||||
} else if (m instanceof Delete) {
|
||||
coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
|
||||
}
|
||||
}
|
||||
// At the end call the CP hook postBatchMutateIndispensably
|
||||
if (miniBatch != null) {
|
||||
// Directly calling this hook, with out calling pre/postBatchMutate() when Processor do a
|
||||
// read only process. Then no need to call this batch based CP hook also.
|
||||
coprocessorHost.postBatchMutateIndispensably(miniBatch, success);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultiRowMutationProcessorRequest getRequestData() {
|
||||
return MultiRowMutationProcessorRequest.getDefaultInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(MultiRowMutationProcessorRequest msg) {
|
||||
//nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public Durability useDurability() {
|
||||
// return true when at least one mutation requested a WAL flush (default)
|
||||
Durability durability = Durability.USE_DEFAULT;
|
||||
for (Mutation m : mutations) {
|
||||
if (m.getDurability().ordinal() > durability.ordinal()) {
|
||||
durability = m.getDurability();
|
||||
}
|
||||
}
|
||||
return durability;
|
||||
}
|
||||
}
|
@ -551,51 +551,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||
|
||||
/**
|
||||
* Mutate a list of rows atomically.
|
||||
*
|
||||
* @param region
|
||||
* @param actions
|
||||
* @param cellScanner if non-null, the mutation data -- the Cell content.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void mutateRows(final HRegion region,
|
||||
final List<ClientProtos.Action> actions,
|
||||
final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException {
|
||||
if (!region.getRegionInfo().isMetaRegion()) {
|
||||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
RowMutations rm = null;
|
||||
int i = 0;
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
|
||||
ClientProtos.ResultOrException.newBuilder();
|
||||
private void mutateRows(final HRegion region, final OperationQuota quota,
|
||||
final List<ClientProtos.Action> actions, final CellScanner cellScanner,
|
||||
RegionActionResult.Builder builder, final ActivePolicyEnforcement spaceQuotaEnforcement)
|
||||
throws IOException {
|
||||
for (ClientProtos.Action action: actions) {
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
|
||||
action.getGet());
|
||||
}
|
||||
MutationType type = action.getMutation().getMutateType();
|
||||
if (rm == null) {
|
||||
rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size());
|
||||
}
|
||||
switch (type) {
|
||||
case PUT:
|
||||
Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
|
||||
checkCellSizeLimit(region, put);
|
||||
rm.add(put);
|
||||
break;
|
||||
case DELETE:
|
||||
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
|
||||
break;
|
||||
default:
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
|
||||
}
|
||||
// To unify the response format with doNonAtomicRegionMutation and read through client's
|
||||
// AsyncProcess we have to add an empty result instance per operation
|
||||
resultOrExceptionOrBuilder.clear();
|
||||
resultOrExceptionOrBuilder.setIndex(i++);
|
||||
builder.addResultOrException(
|
||||
resultOrExceptionOrBuilder.build());
|
||||
}
|
||||
region.mutateRow(rm);
|
||||
doBatchOp(builder, region, quota, actions, cellScanner, spaceQuotaEnforcement, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -991,15 +959,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||
}
|
||||
|
||||
// HBASE-17924
|
||||
// sort to improve lock efficiency
|
||||
Arrays.sort(mArray);
|
||||
// Sort to improve lock efficiency for non-atomic batch of operations. If atomic (mostly
|
||||
// called from mutateRows()), order is preserved as its expected from the client
|
||||
if (!atomic) {
|
||||
Arrays.sort(mArray);
|
||||
}
|
||||
|
||||
OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE,
|
||||
HConstants.NO_NONCE);
|
||||
for (i = 0; i < codes.length; i++) {
|
||||
Mutation currentMutation = mArray[i];
|
||||
ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
|
||||
int index = currentAction.getIndex();
|
||||
int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i;
|
||||
Exception e = null;
|
||||
switch (codes[i].getOperationStatusCode()) {
|
||||
case BAD_FAMILY:
|
||||
@ -1027,8 +998,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||
if (atomic) {
|
||||
throw ie;
|
||||
}
|
||||
for (int i = 0; i < mutations.size(); i++) {
|
||||
builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
|
||||
for (Action mutation : mutations) {
|
||||
builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
|
||||
}
|
||||
}
|
||||
if (regionServer.metricsRegionServer != null) {
|
||||
@ -1165,9 +1136,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||
REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
|
||||
SimpleRpcSchedulerFactory.class);
|
||||
rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
|
||||
} catch (InstantiationException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
} catch (IllegalAccessException e) {
|
||||
} catch (InstantiationException | IllegalAccessException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
// Server to handle client requests.
|
||||
@ -2580,8 +2549,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||
cellScanner, row, family, qualifier, op,
|
||||
comparator, regionActionResultBuilder, spaceQuotaEnforcement);
|
||||
} else {
|
||||
mutateRows(region, regionAction.getActionList(), cellScanner,
|
||||
regionActionResultBuilder);
|
||||
mutateRows(region, quota, regionAction.getActionList(), cellScanner,
|
||||
regionActionResultBuilder, spaceQuotaEnforcement);
|
||||
processed = Boolean.TRUE;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
@ -2604,7 +2573,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||
}
|
||||
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
|
||||
quota.close();
|
||||
ClientProtos.RegionLoadStats regionLoadStats = ((HRegion)region).getLoadStatistics();
|
||||
ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
|
||||
if(regionLoadStats != null) {
|
||||
regionStats.put(regionSpecifier, regionLoadStats);
|
||||
}
|
||||
@ -2656,7 +2625,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||
*
|
||||
* @param rpcc the RPC controller
|
||||
* @param request the mutate request
|
||||
* @throws ServiceException
|
||||
*/
|
||||
@Override
|
||||
public MutateResponse mutate(final RpcController rpcc,
|
||||
|
Loading…
x
Reference in New Issue
Block a user