diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java index 5d44cd6cd1c..d2307735c1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.SortedSet; @@ -47,14 +48,14 @@ import com.google.protobuf.Service; /** * This class demonstrates how to implement atomic multi row transactions using - * {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)} + * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} * and Coprocessor endpoints. * * Defines a protocol to perform multi row transactions. * See {@link MultiRowMutationEndpoint} for the implementation. *
* See - * {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)} + * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} * for details and limitations. *
* Example: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 197aa3ce808..3a3cb03bf3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3279,6 +3279,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private void addFamilyMapToWALEdit(Map> familyMap, WALEdit walEdit) { for (List edits : familyMap.values()) { + // Optimization: 'foreach' loop is not used. See: + // HBASE-12023 HRegion.applyFamilyMapToMemstore creates too many iterator objects assert edits instanceof RandomAccess; int listSize = edits.size(); for (int i=0; i < listSize; i++) { @@ -4109,6 +4111,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throws IOException { for (List cells: cellItr) { if (cells == null) continue; + // Optimization: 'foreach' loop is not used. See: + // HBASE-12023 HRegion.applyFamilyMapToMemstore creates too many iterator objects assert cells instanceof RandomAccess; int listSize = cells.size(); for (int i = 0; i < listSize; i++) { @@ -4259,6 +4263,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } long maxTs = now + timestampSlop; for (List kvs : familyMap.values()) { + // Optimization: 'foreach' loop is not used. See: + // HBASE-12023 HRegion.applyFamilyMapToMemstore creates too many iterator objects assert kvs instanceof RandomAccess; int listSize = kvs.size(); for (int i=0; i < listSize; i++) { @@ -7135,20 +7141,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void mutateRow(RowMutations rm) throws IOException { // Don't need nonces here - RowMutations only supports puts and deletes - mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); + final List m = rm.getMutations(); + batchMutate(m.toArray(new Mutation[m.size()]), true, HConstants.NO_NONCE, + HConstants.NO_NONCE); } /** - * Perform atomic mutations within the region w/o nonces. - * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)} - */ - public void mutateRowsWithLocks(Collection mutations, - Collection rowsToLock) throws IOException { - mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); - } - - /** - * Perform atomic mutations within the region. + * Perform atomic (all or none) mutations within the region. * @param mutations The list of mutations to perform. * mutations can contain operations for multiple rows. * Caller has to ensure that all rows are contained in this region. @@ -7162,8 +7161,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException { - MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); - processRowsWithLocks(proc, -1, nonceGroup, nonce); + batchMutate(new MutationBatchOperation(this, mutations.toArray(new Mutation[mutations.size()]), + true, nonceGroup, nonce) { + @Override + public MiniBatchOperationInProgress lockRowsAndBuildMiniBatch( + List acquiredRowLocks) throws IOException { + for (byte[] row : rowsToLock) { + try { + RowLock rowLock = region.getRowLockInternal(row, false); // write lock + acquiredRowLocks.add(rowLock); + } catch (IOException ioe) { + LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(row), ioe); + throw ioe; + } + } + return createMiniBatch(size(), size()); + } + }); } /** @@ -7193,8 +7207,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void processRowsWithLocks(RowProcessor processor) throws IOException { - processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE, - HConstants.NO_NONCE); + processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE, HConstants.NO_NONCE); } @Override @@ -8094,6 +8107,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long mutationSize = 0; for (List cells: familyMap.values()) { + // Optimization: 'foreach' loop is not used. See: + // HBASE-12023 HRegion.applyFamilyMapToMemstore creates too many iterator objects assert cells instanceof RandomAccess; int listSize = cells.size(); for (int i=0; i < listSize; i++) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java deleted file mode 100644 index 0d9d149ed7f..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest; -import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * A MultiRowProcessor that performs multiple puts and deletes. - */ -@InterfaceAudience.Private -class MultiRowMutationProcessor extends BaseRowProcessor { - Collection rowsToLock; - Collection mutations; - MiniBatchOperationInProgress miniBatch; - - MultiRowMutationProcessor(Collection mutations, - Collection rowsToLock) { - this.rowsToLock = rowsToLock; - this.mutations = mutations; - } - - @Override - public Collection getRowsToLock() { - return rowsToLock; - } - - @Override - public boolean readOnly() { - return false; - } - - @Override - public MultiRowMutationProcessorResponse getResult() { - return MultiRowMutationProcessorResponse.getDefaultInstance(); - } - - @Override - public void process(long now, - HRegion region, - List mutationsToApply, - WALEdit walEdit) throws IOException { - byte[] byteNow = Bytes.toBytes(now); - // Check mutations - for (Mutation m : this.mutations) { - if (m instanceof Put) { - Map> familyMap = m.getFamilyCellMap(); - region.checkFamilies(familyMap.keySet()); - region.checkTimestamps(familyMap, now); - region.updateCellTimestamps(familyMap.values(), byteNow); - } else if (m instanceof Delete) { - Delete d = (Delete) m; - region.prepareDelete(d); - region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow); - } else { - throw new DoNotRetryIOException("Action must be Put or Delete. But was: " - + m.getClass().getName()); - } - mutationsToApply.add(m); - } - // Apply edits to a single WALEdit - for (Mutation m : mutations) { - for (List cells : m.getFamilyCellMap().values()) { - boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL; - for (Cell cell : cells) { - if (writeToWAL) walEdit.add(cell); - } - } - } - } - - @Override - public void preProcess(HRegion region, WALEdit walEdit) throws IOException { - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - if (coprocessorHost != null) { - for (Mutation m : mutations) { - if (m instanceof Put) { - if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { - // by pass everything - // Is this right? Bypass everything and not just this individual put? - // This class is going away in hbase2 so lets not sweat it. - return; - } - } else if (m instanceof Delete) { - Delete d = (Delete) m; - region.prepareDelete(d); - if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) { - // by pass everything - // Is this right? Bypass everything and not just this individual put? - // This class is going away in hbase2 so lets not sweat it. - return; - } - } - } - } - } - - @Override - public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { - // TODO we should return back the status of this hook run to HRegion so that those Mutations - // with OperationStatus as SUCCESS or FAILURE should not get applied to memstore. - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - OperationStatus[] opStatus = new OperationStatus[mutations.size()]; - Arrays.fill(opStatus, OperationStatus.NOT_RUN); - WALEdit[] walEditsFromCP = new WALEdit[mutations.size()]; - if (coprocessorHost != null) { - miniBatch = new MiniBatchOperationInProgress<>( - mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0, - mutations.size(), mutations.size()); - coprocessorHost.preBatchMutate(miniBatch); - } - // Apply edits to a single WALEdit - for (int i = 0; i < mutations.size(); i++) { - if (opStatus[i] == OperationStatus.NOT_RUN) { - // Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook - // itself. No need to apply again to region - if (walEditsFromCP[i] != null) { - // Add the WALEdit created by CP hook - for (Cell walCell : walEditsFromCP[i].getCells()) { - walEdit.add(walCell); - } - } - } - } - } - - @Override - public void postBatchMutate(HRegion region) throws IOException { - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - if (coprocessorHost != null) { - assert miniBatch != null; - // Use the same miniBatch state used to call the preBatchMutate() - coprocessorHost.postBatchMutate(miniBatch); - } - } - - @Override - public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException { - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - if (coprocessorHost != null) { - for (Mutation m : mutations) { - if (m instanceof Put) { - coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); - } else if (m instanceof Delete) { - coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability()); - } - } - // At the end call the CP hook postBatchMutateIndispensably - if (miniBatch != null) { - // Directly calling this hook, with out calling pre/postBatchMutate() when Processor do a - // read only process. Then no need to call this batch based CP hook also. - coprocessorHost.postBatchMutateIndispensably(miniBatch, success); - } - } - } - - @Override - public MultiRowMutationProcessorRequest getRequestData() { - return MultiRowMutationProcessorRequest.getDefaultInstance(); - } - - @Override - public void initialize(MultiRowMutationProcessorRequest msg) { - //nothing - } - - @Override - public Durability useDurability() { - // return true when at least one mutation requested a WAL flush (default) - Durability durability = Durability.USE_DEFAULT; - for (Mutation m : mutations) { - if (m.getDurability().ordinal() > durability.ordinal()) { - durability = m.getDurability(); - } - } - return durability; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 68e7049cdbb..3f2dd271adc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -551,51 +551,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * Mutate a list of rows atomically. - * - * @param region - * @param actions * @param cellScanner if non-null, the mutation data -- the Cell content. - * @throws IOException */ - private void mutateRows(final HRegion region, - final List actions, - final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException { - if (!region.getRegionInfo().isMetaRegion()) { - regionServer.cacheFlusher.reclaimMemStoreMemory(); - } - RowMutations rm = null; - int i = 0; - ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = - ClientProtos.ResultOrException.newBuilder(); + private void mutateRows(final HRegion region, final OperationQuota quota, + final List actions, final CellScanner cellScanner, + RegionActionResult.Builder builder, final ActivePolicyEnforcement spaceQuotaEnforcement) + throws IOException { for (ClientProtos.Action action: actions) { if (action.hasGet()) { throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + action.getGet()); } - MutationType type = action.getMutation().getMutateType(); - if (rm == null) { - rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); - } - switch (type) { - case PUT: - Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); - checkCellSizeLimit(region, put); - rm.add(put); - break; - case DELETE: - rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); - break; - default: - throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); - } - // To unify the response format with doNonAtomicRegionMutation and read through client's - // AsyncProcess we have to add an empty result instance per operation - resultOrExceptionOrBuilder.clear(); - resultOrExceptionOrBuilder.setIndex(i++); - builder.addResultOrException( - resultOrExceptionOrBuilder.build()); } - region.mutateRow(rm); + doBatchOp(builder, region, quota, actions, cellScanner, spaceQuotaEnforcement, true); } /** @@ -991,15 +959,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // HBASE-17924 - // sort to improve lock efficiency - Arrays.sort(mArray); + // Sort to improve lock efficiency for non-atomic batch of operations. If atomic (mostly + // called from mutateRows()), order is preserved as its expected from the client + if (!atomic) { + Arrays.sort(mArray); + } OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE); for (i = 0; i < codes.length; i++) { Mutation currentMutation = mArray[i]; ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); - int index = currentAction.getIndex(); + int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i; Exception e = null; switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: @@ -1027,8 +998,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (atomic) { throw ie; } - for (int i = 0; i < mutations.size(); i++) { - builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex())); + for (Action mutation : mutations) { + builder.addResultOrException(getResultOrException(ie, mutation.getIndex())); } } if (regionServer.metricsRegionServer != null) { @@ -1165,9 +1136,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, SimpleRpcSchedulerFactory.class); rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance()); - } catch (InstantiationException e) { - throw new IllegalArgumentException(e); - } catch (IllegalAccessException e) { + } catch (InstantiationException | IllegalAccessException e) { throw new IllegalArgumentException(e); } // Server to handle client requests. @@ -2580,8 +2549,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner, row, family, qualifier, op, comparator, regionActionResultBuilder, spaceQuotaEnforcement); } else { - mutateRows(region, regionAction.getActionList(), cellScanner, - regionActionResultBuilder); + mutateRows(region, quota, regionAction.getActionList(), cellScanner, + regionActionResultBuilder, spaceQuotaEnforcement); processed = Boolean.TRUE; } } catch (IOException e) { @@ -2604,7 +2573,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); quota.close(); - ClientProtos.RegionLoadStats regionLoadStats = ((HRegion)region).getLoadStatistics(); + ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); if(regionLoadStats != null) { regionStats.put(regionSpecifier, regionLoadStats); } @@ -2656,7 +2625,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * * @param rpcc the RPC controller * @param request the mutate request - * @throws ServiceException */ @Override public MutateResponse mutate(final RpcController rpcc,