diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java index 6da808ed639..b658331cf87 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.util; -import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; /** @@ -31,7 +30,6 @@ public class NonceKey { private long nonce; public NonceKey(long group, long nonce) { - assert nonce != HConstants.NO_NONCE; this.group = group; this.nonce = nonce; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5cd27b86e97..82d4bd21440 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -161,6 +161,7 @@ import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; @@ -199,6 +200,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; +import edu.umd.cs.findbugs.annotations.Nullable; + /** * Regions store data for a certain region of a table. It stores all columns * for each row. A given table consists of one or more Regions. @@ -642,7 +645,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // flushPerChanges is to prevent too many changes in memstore private long flushPerChanges; private long blockingMemStoreSize; - final long threadWakeFrequency; // Used to guard closes final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -757,7 +759,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } this.rsServices = rsServices; - this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); setHTableSpecificConf(); this.scannerReadPoints = new ConcurrentHashMap<>(); @@ -1271,14 +1272,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return writeRequestsCount.sum(); } - /** - * Update the write request count for this region - * @param i increment - */ - public void updateWriteRequestsCount(long i) { - writeRequestsCount.add(i); - } - @Override public long getMemStoreSize() { return memstoreDataSize.get(); @@ -2218,7 +2211,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return flushcache(force, false, FlushLifeCycleTracker.DUMMY); } - public static interface FlushResult { + public interface FlushResult { enum Result { FLUSHED_NO_COMPACTION_NEEDED, FLUSHED_COMPACTION_NEEDED, @@ -3025,105 +3018,355 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** - * Struct-like class that tracks the progress of a batch operation, accumulating status codes - * and tracking the index at which processing is proceeding. These batch operations may get - * split into mini-batches for processing. + * Class that tracks the progress of a batch operations, accumulating status codes and tracking + * the index at which processing is proceeding. These batch operations may get split into + * mini-batches for processing. */ private abstract static class BatchOperation { - T[] operations; - int nextIndexToProcess = 0; - OperationStatus[] retCodeDetails; - WALEdit[] walEditsFromCoprocessors; + protected final T[] operations; + protected final OperationStatus[] retCodeDetails; + protected final WALEdit[] walEditsFromCoprocessors; // reference family cell maps directly so coprocessors can mutate them if desired - Map>[] familyCellMaps; - ObservedExceptionsInBatch observedExceptions; - Durability durability; //Durability of the batch (highest durability of all operations) + protected final Map>[] familyCellMaps; - public BatchOperation(T[] operations) { + protected final HRegion region; + protected int nextIndexToProcess = 0; + protected final ObservedExceptionsInBatch observedExceptions; + //Durability of the batch (highest durability of all operations) + protected Durability durability; + + public BatchOperation(final HRegion region, T[] operations) { this.operations = operations; this.retCodeDetails = new OperationStatus[operations.length]; - this.walEditsFromCoprocessors = new WALEdit[operations.length]; Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); + this.walEditsFromCoprocessors = new WALEdit[operations.length]; familyCellMaps = new Map[operations.length]; + + this.region = region; observedExceptions = new ObservedExceptionsInBatch(); durability = Durability.USE_DEFAULT; } + /** + * Visitor interface for batch operations + */ + @FunctionalInterface + public interface Visitor { + /** + * @param index operation index + * @return If true continue visiting remaining entries, break otherwise + */ + boolean visit(int index) throws IOException; + } + + /** + * Helper method for visiting pending/ all batch operations + */ + public void visitBatchOperations(boolean pendingOnly, int lastIndexExclusive, Visitor visitor) + throws IOException { + assert lastIndexExclusive <= this.size(); + for (int i = nextIndexToProcess; i < lastIndexExclusive; i++) { + if (!pendingOnly || isOperationPending(i)) { + if (!visitor.visit(i)) { + break; + } + } + } + } + public abstract Mutation getMutation(int index); public abstract long getNonceGroup(int index); public abstract long getNonce(int index); - /** This method is potentially expensive and should only be used for non-replay CP path. */ + /** This method is potentially expensive and useful mostly for non-replay CP path. */ public abstract Mutation[] getMutationsForCoprocs(); public abstract boolean isInReplay(); - public abstract long getReplaySequenceId(); + public abstract long getOrigLogSeqNum(); + public abstract void startRegionOperation() throws IOException; + public abstract void closeRegionOperation() throws IOException; + + /** + * Validates each mutation and prepares a batch for write. If necessary (non-replay case), runs + * CP prePut()/ preDelete() hooks for all mutations in a batch. This is intended to operate on + * entire batch and will be called from outside of class to check and prepare batch. This can + * be implemented by calling helper method {@link #checkAndPrepareMutation(int, long)} in a + * 'for' loop over mutations. + */ + public abstract void checkAndPrepare() throws IOException; + + /** + * Implement any Put request specific check and prepare logic here. Please refer to + * {@link #checkAndPrepareMutation(Mutation, long)} for how its used. + */ + protected abstract void checkAndPreparePut(final Put p) throws IOException; + + /** + * If necessary, calls preBatchMutate() CP hook for a mini-batch and updates metrics, cell + * count, tags and timestamp for all cells of all operations in a mini-batch. + */ + public abstract void prepareMiniBatchOperations(MiniBatchOperationInProgress + miniBatchOp, long timestamp, final List acquiredRowLocks) throws IOException; + + /** + * Write mini-batch operations to MemStore + */ + public abstract WriteEntry writeMiniBatchOperationsToMemStore( + final MiniBatchOperationInProgress miniBatchOp, final WriteEntry writeEntry) + throws IOException; + + protected void writeMiniBatchOperationsToMemStore( + final MiniBatchOperationInProgress miniBatchOp, final long writeNumber) + throws IOException { + MemStoreSizing memStoreAccounting = new MemStoreSizing(); + visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> { + // We need to update the sequence id for following reasons. + // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. + // 2) If no WAL, FSWALEntry won't be used + // we use durability of the original mutation for the mutation passed by CP. + if (isInReplay() || getMutation(index).getDurability() == Durability.SKIP_WAL) { + region.updateSequenceId(familyCellMaps[index].values(), writeNumber); + } + applyFamilyMapToMemStore(familyCellMaps[index], memStoreAccounting); + return true; + }); + // update memStore size + region.addAndGetMemStoreSize(memStoreAccounting); + } public boolean isDone() { return nextIndexToProcess == operations.length; } + public int size() { + return operations.length; + } + + public boolean isOperationPending(int index) { + return retCodeDetails[index].getOperationStatusCode() == OperationStatusCode.NOT_RUN; + } + + public List getClusterIds() { + assert size() != 0; + return getMutation(0).getClusterIds(); + } + /** - * Validates each mutation and prepares a batch for write. + * Helper method that checks and prepares only one mutation. This can be used to implement + * {@link #checkAndPrepare()} for entire Batch. * NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called - * after prePut()/ preDelete() CP hooks are run for all mutations in a batch. + * after prePut()/ preDelete() CP hooks are run for the mutation */ - public void checkAndPrepare(final HRegion region) throws IOException { - long now = EnvironmentEdgeManager.currentTime(); - for (int i = 0 ; i < operations.length; i++) { - // Skip anything that "ran" already - if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) { - Mutation mutation = getMutation(i); + protected void checkAndPrepareMutation(Mutation mutation, final long timestamp) + throws IOException { + region.checkRow(mutation.getRow(), "batchMutate"); + if (mutation instanceof Put) { + // Check the families in the put. If bad, skip this one. + checkAndPreparePut((Put) mutation); + region.checkTimestamps(mutation.getFamilyCellMap(), timestamp); + } else { + region.prepareDelete((Delete) mutation); + } + } - try { - region.checkAndPrepareMutation(mutation, isInReplay(), now); + protected void checkAndPrepareMutation(int index, long timestamp) throws IOException { + Mutation mutation = getMutation(index); + try { + this.checkAndPrepareMutation(mutation, timestamp); - // store the family map reference to allow for mutations - familyCellMaps[i] = mutation.getFamilyCellMap(); - // store durability for the batch (highest durability of all operations in the batch) - Durability tmpDur = region.getEffectiveDurability(mutation.getDurability()); - if (tmpDur.ordinal() > durability.ordinal()) { - durability = tmpDur; - } - } catch (NoSuchColumnFamilyException nscf) { - final String msg = "No such column family in batch mutation. "; - if (observedExceptions.hasSeenNoSuchFamily()) { - LOG.warn(msg + nscf.getMessage()); - } else { - LOG.warn(msg, nscf); - observedExceptions.sawNoSuchFamily(); - } - retCodeDetails[i] = new OperationStatus( - OperationStatusCode.BAD_FAMILY, nscf.getMessage()); - } catch (FailedSanityCheckException fsce) { - final String msg = "Batch Mutation did not pass sanity check. "; - if (observedExceptions.hasSeenFailedSanityCheck()) { - LOG.warn(msg + fsce.getMessage()); - } else { - LOG.warn(msg, fsce); - observedExceptions.sawFailedSanityCheck(); - } - retCodeDetails[i] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); - } catch (WrongRegionException we) { - final String msg = "Batch mutation had a row that does not belong to this region. "; - if (observedExceptions.hasSeenWrongRegion()) { - LOG.warn(msg + we.getMessage()); - } else { - LOG.warn(msg, we); - observedExceptions.sawWrongRegion(); - } - retCodeDetails[i] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); + // store the family map reference to allow for mutations + familyCellMaps[index] = mutation.getFamilyCellMap(); + // store durability for the batch (highest durability of all operations in the batch) + Durability tmpDur = region.getEffectiveDurability(mutation.getDurability()); + if (tmpDur.ordinal() > durability.ordinal()) { + durability = tmpDur; + } + } catch (NoSuchColumnFamilyException nscf) { + final String msg = "No such column family in batch mutation. "; + if (observedExceptions.hasSeenNoSuchFamily()) { + LOG.warn(msg + nscf.getMessage()); + } else { + LOG.warn(msg, nscf); + observedExceptions.sawNoSuchFamily(); + } + retCodeDetails[index] = new OperationStatus( + OperationStatusCode.BAD_FAMILY, nscf.getMessage()); + } catch (FailedSanityCheckException fsce) { + final String msg = "Batch Mutation did not pass sanity check. "; + if (observedExceptions.hasSeenFailedSanityCheck()) { + LOG.warn(msg + fsce.getMessage()); + } else { + LOG.warn(msg, fsce); + observedExceptions.sawFailedSanityCheck(); + } + retCodeDetails[index] = new OperationStatus( + OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); + } catch (WrongRegionException we) { + final String msg = "Batch mutation had a row that does not belong to this region. "; + if (observedExceptions.hasSeenWrongRegion()) { + LOG.warn(msg + we.getMessage()); + } else { + LOG.warn(msg, we); + observedExceptions.sawWrongRegion(); + } + retCodeDetails[index] = new OperationStatus( + OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); + } + } + + /** + * Creates Mini-batch of all operations [nextIndexToProcess, lastIndexExclusive) for which + * a row lock can be acquired. All mutations with locked rows are considered to be + * In-progress operations and hence the name {@link MiniBatchOperationInProgress}. Mini batch + * is window over {@link BatchOperation} and contains contiguous pending operations. + * + * @param acquiredRowLocks keeps track of rowLocks acquired. + */ + public MiniBatchOperationInProgress lockRowsAndBuildMiniBatch( + List acquiredRowLocks) throws IOException { + int readyToWriteCount = 0; + int lastIndexExclusive = 0; + for (; lastIndexExclusive < size(); lastIndexExclusive++) { + if (!isOperationPending(lastIndexExclusive)) { + continue; + } + Mutation mutation = getMutation(lastIndexExclusive); + // If we haven't got any rows in our batch, we should block to get the next one. + RowLock rowLock = null; + try { + rowLock = region.getRowLockInternal(mutation.getRow(), true); + } catch (TimeoutIOException e) { + // We will retry when other exceptions, but we should stop if we timeout . + throw e; + } catch (IOException ioe) { + LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); + } + if (rowLock == null) { + // We failed to grab another lock + break; // Stop acquiring more rows for this batch + } else { + acquiredRowLocks.add(rowLock); + } + readyToWriteCount++; + } + return createMiniBatch(lastIndexExclusive, readyToWriteCount); + } + + protected MiniBatchOperationInProgress createMiniBatch(final int lastIndexExclusive, + final int readyToWriteCount) { + return new MiniBatchOperationInProgress<>(getMutationsForCoprocs(), retCodeDetails, + walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount); + } + + /** + * Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are + * present, they are merged to result WALEdit. + */ + public List> buildWALEdits( + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + List> walEdits = new ArrayList<>(); + + visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), new Visitor() { + private Pair curWALEditForNonce; + @Override + public boolean visit(int index) throws IOException { + Mutation m = getMutation(index); + // we use durability of the original mutation for the mutation passed by CP. + if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) { + region.recordMutationWithoutWal(m.getFamilyCellMap()); + return true; } + + // the batch may contain multiple nonce keys (replay case). If so, write WALEdit for each. + // Given how nonce keys are originally written, these should be contiguous. + // They don't have to be, it will still work, just write more WALEdits than needed. + long nonceGroup = getNonceGroup(index); + long nonce = getNonce(index); + if (curWALEditForNonce == null || + curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup || + curWALEditForNonce.getFirst().getNonce() != nonce) { + curWALEditForNonce = new Pair<>(new NonceKey(nonceGroup, nonce), + new WALEdit(miniBatchOp.getCellCount(), isInReplay())); + walEdits.add(curWALEditForNonce); + } + WALEdit walEdit = curWALEditForNonce.getSecond(); + + // Add WAL edits by CP + WALEdit fromCP = walEditsFromCoprocessors[index]; + if (fromCP != null) { + for (Cell cell : fromCP.getCells()) { + walEdit.add(cell); + } + } + addFamilyMapToWALEdit(familyCellMaps[index], walEdit); + + return true; + } + }); + return walEdits; + } + + /** + * This method completes mini-batch operations by calling postBatchMutate() CP hook (if + * required) and completing mvcc. + */ + public void completeMiniBatchOperations( + final MiniBatchOperationInProgress miniBatchOp, final WriteEntry writeEntry) + throws IOException { + if (writeEntry != null) { + region.mvcc.completeAndWait(writeEntry); + } + } + + public void doPostOpCleanupForMiniBatch( + final MiniBatchOperationInProgress miniBatchOp, final WALEdit walEdit, + boolean success) throws IOException {} + + /** + * Atomically apply the given map of family->edits to the memstore. + * This handles the consistency control on its own, but the caller + * should already have locked updatesLock.readLock(). This also does + * not check the families for validity. + * + * @param familyMap Map of Cells by family + */ + protected void applyFamilyMapToMemStore(Map> familyMap, + MemStoreSizing memstoreAccounting) throws IOException { + for (Map.Entry> e : familyMap.entrySet()) { + byte[] family = e.getKey(); + List cells = e.getValue(); + assert cells instanceof RandomAccess; + region.applyToMemStore(region.getStore(family), cells, false, memstoreAccounting); + } + } + + /** + * Append the given map of family->edits to a WALEdit data structure. + * This does not write to the WAL itself. + * @param familyMap map of family->edits + * @param walEdit the destination entry to append into + */ + private void addFamilyMapToWALEdit(Map> familyMap, + WALEdit walEdit) { + for (List edits : familyMap.values()) { + assert edits instanceof RandomAccess; + int listSize = edits.size(); + for (int i=0; i < listSize; i++) { + Cell cell = edits.get(i); + walEdit.add(cell); } } } } + /** + * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most + * of the logic is same. + */ private static class MutationBatchOperation extends BatchOperation { private long nonceGroup; private long nonce; - public MutationBatchOperation(Mutation[] operations, long nonceGroup, long nonce) { - super(operations); + public MutationBatchOperation(final HRegion region, Mutation[] operations, long nonceGroup, + long nonce) { + super(region, operations); this.nonceGroup = nonceGroup; this.nonce = nonce; } @@ -3154,16 +3397,279 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public long getReplaySequenceId() { - return 0; + public long getOrigLogSeqNum() { + return WALKey.NO_SEQUENCE_ID; + } + + @Override + public void startRegionOperation() throws IOException { + region.startRegionOperation(Operation.BATCH_MUTATE); + } + + @Override + public void closeRegionOperation() throws IOException { + region.closeRegionOperation(Operation.BATCH_MUTATE); + } + + @Override + public void checkAndPreparePut(Put p) throws IOException { + region.checkFamilies(p.getFamilyCellMap().keySet()); + } + + @Override + public void checkAndPrepare() throws IOException { + final int[] metrics = {0, 0}; // index 0: puts, index 1: deletes + visitBatchOperations(true, this.size(), new Visitor() { + private long now = EnvironmentEdgeManager.currentTime(); + private WALEdit walEdit; + @Override + public boolean visit(int index) throws IOException { + // Run coprocessor pre hook outside of locks to avoid deadlock + if (region.coprocessorHost != null) { + if (walEdit == null) { + walEdit = new WALEdit(); + } + callPreMutateCPHook(index, walEdit, metrics); + if (!walEdit.isEmpty()) { + walEditsFromCoprocessors[index] = walEdit; + walEdit = null; + } + } + if (isOperationPending(index)) { + // TODO: Currently validation is done with current time before acquiring locks and + // updates are done with different timestamps after acquiring locks. This behavior is + // inherited from the code prior to this change. Can this be changed? + checkAndPrepareMutation(index, now); + } + return true; + } + }); + + // FIXME: we may update metrics twice! here for all operations bypassed by CP and later in + // normal processing. + // Update metrics in same way as it is done when we go the normal processing route (we now + // update general metrics though a Coprocessor did the work). + if (region.metricsRegion != null) { + if (metrics[0] > 0) { + // There were some Puts in the batch. + region.metricsRegion.updatePut(); + } + if (metrics[1] > 0) { + // There were some Deletes in the batch. + region.metricsRegion.updateDelete(); + } + } + } + + @Override + public void prepareMiniBatchOperations(MiniBatchOperationInProgress miniBatchOp, + long timestamp, final List acquiredRowLocks) throws IOException { + byte[] byteTS = Bytes.toBytes(timestamp); + visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> { + Mutation mutation = getMutation(index); + if (mutation instanceof Put) { + region.updateCellTimestamps(familyCellMaps[index].values(), byteTS); + miniBatchOp.incrementNumOfPuts(); + } else { + region.prepareDeleteTimestamps(mutation, familyCellMaps[index], byteTS); + miniBatchOp.incrementNumOfDeletes(); + } + region.rewriteCellTags(familyCellMaps[index], mutation); + + // update cell count + if (region.getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { + for (List cells : mutation.getFamilyCellMap().values()) { + miniBatchOp.addCellCount(cells.size()); + } + } + + WALEdit fromCP = walEditsFromCoprocessors[index]; + if (fromCP != null) { + miniBatchOp.addCellCount(fromCP.size()); + } + return true; + }); + + if (region.coprocessorHost != null) { + // calling the pre CP hook for batch mutation + region.coprocessorHost.preBatchMutate(miniBatchOp); + checkAndMergeCPMutations(miniBatchOp, acquiredRowLocks, timestamp); + } + } + + @Override + public List> buildWALEdits(final MiniBatchOperationInProgress + miniBatchOp) throws IOException { + List> walEdits = super.buildWALEdits(miniBatchOp); + // for MutationBatchOperation, more than one nonce is not allowed + if (walEdits.size() > 1) { + throw new IOException("Found multiple nonce keys per batch!"); + } + return walEdits; + } + + @Override + public WriteEntry writeMiniBatchOperationsToMemStore( + final MiniBatchOperationInProgress miniBatchOp, @Nullable WriteEntry writeEntry) + throws IOException { + if (writeEntry == null) { + writeEntry = region.mvcc.begin(); + } + super.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry.getWriteNumber()); + return writeEntry; + } + + @Override + public void completeMiniBatchOperations( + final MiniBatchOperationInProgress miniBatchOp, final WriteEntry writeEntry) + throws IOException { + // TODO: can it be done after completing mvcc? + // calling the post CP hook for batch mutation + if (region.coprocessorHost != null) { + region.coprocessorHost.postBatchMutate(miniBatchOp); + } + super.completeMiniBatchOperations(miniBatchOp, writeEntry); + } + + @Override + public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress miniBatchOp, + final WALEdit walEdit, boolean success) throws IOException { + if (miniBatchOp != null) { + // synced so that the coprocessor contract is adhered to. + if (region.coprocessorHost != null) { + visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> { + // only for successful puts + if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS) { + Mutation m = getMutation(i); + if (m instanceof Put) { + region.coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); + } else { + region.coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability()); + } + } + return true; + }); + } + + // See if the column families were consistent through the whole thing. + // if they were then keep them. If they were not then pass a null. + // null will be treated as unknown. + // Total time taken might be involving Puts and Deletes. + // Split the time for puts and deletes based on the total number of Puts and Deletes. + if (region.metricsRegion != null) { + if (miniBatchOp.getNumOfPuts() > 0) { + // There were some Puts in the batch. + region.metricsRegion.updatePut(); + } + if (miniBatchOp.getNumOfDeletes() > 0) { + // There were some Deletes in the batch. + region.metricsRegion.updateDelete(); + } + } + } + + if (region.coprocessorHost != null) { + // call the coprocessor hook to do any finalization steps after the put is done + region.coprocessorHost.postBatchMutateIndispensably( + miniBatchOp != null ? miniBatchOp : createMiniBatch(size(), 0), success); + } + } + + /** + * Runs prePut/ preDelete coprocessor hook for input mutation in a batch + * @param metrics Array of 2 ints. index 0: count of puts and index 1: count of deletes + */ + private void callPreMutateCPHook(int index, final WALEdit walEdit, final int[] metrics) + throws IOException { + Mutation m = getMutation(index); + if (m instanceof Put) { + if (region.coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { + // pre hook says skip this Put + // mark as success and skip in doMiniBatchMutation + metrics[0]++; + retCodeDetails[index] = OperationStatus.SUCCESS; + } + } else if (m instanceof Delete) { + Delete curDel = (Delete) m; + if (curDel.getFamilyCellMap().isEmpty()) { + // handle deleting a row case + // TODO: prepareDelete() has been called twice, before and after preDelete() CP hook. + // Can this be avoided? + region.prepareDelete(curDel); + } + if (region.coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) { + // pre hook says skip this Delete + // mark as success and skip in doMiniBatchMutation + metrics[1]++; + retCodeDetails[index] = OperationStatus.SUCCESS; + } + } else { + // In case of passing Append mutations along with the Puts and Deletes in batchMutate + // mark the operation return code as failure so that it will not be considered in + // the doMiniBatchMutation + retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE, + "Put/Delete mutations only supported in batchMutate() now"); + } + } + + private void checkAndMergeCPMutations(final MiniBatchOperationInProgress miniBatchOp, + final List acquiredRowLocks, final long timestamp) throws IOException { + visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), (int i) -> { + // we pass (i - firstIndex) below since the call expects a relative index + Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - nextIndexToProcess); + if (cpMutations == null) { + return true; + } + // Else Coprocessor added more Mutations corresponding to the Mutation at this index. + Mutation mutation = getMutation(i); + for (Mutation cpMutation : cpMutations) { + this.checkAndPrepareMutation(cpMutation, timestamp); + + // Acquire row locks. If not, the whole batch will fail. + acquiredRowLocks.add(region.getRowLockInternal(cpMutation.getRow(), true)); + + // Returned mutations from coprocessor correspond to the Mutation at index i. We can + // directly add the cells from those mutations to the familyMaps of this mutation. + Map> cpFamilyMap = cpMutation.getFamilyCellMap(); + // will get added to the memStore later + mergeFamilyMaps(familyCellMaps[i], cpFamilyMap); + + // The durability of returned mutation is replaced by the corresponding mutation. + // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the + // cells of returned mutation. + if (region.getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { + for (List cells : cpFamilyMap.values()) { + miniBatchOp.addCellCount(cells.size()); + } + } + } + return true; + }); + } + + private void mergeFamilyMaps(Map> familyMap, + Map> toBeMerged) { + for (Map.Entry> entry : toBeMerged.entrySet()) { + List cells = familyMap.get(entry.getKey()); + if (cells == null) { + familyMap.put(entry.getKey(), entry.getValue()); + } else { + cells.addAll(entry.getValue()); + } + } } } + /** + * Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most + * of the logic is same. + */ private static class ReplayBatchOperation extends BatchOperation { - private long replaySeqId = 0; - public ReplayBatchOperation(MutationReplay[] operations, long seqId) { - super(operations); - this.replaySeqId = seqId; + private long origLogSeqNum = 0; + public ReplayBatchOperation(final HRegion region, MutationReplay[] operations, + long origLogSeqNum) { + super(region, operations); + this.origLogSeqNum = origLogSeqNum; } @Override @@ -3183,8 +3689,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public Mutation[] getMutationsForCoprocs() { - assert false; - throw new RuntimeException("Should not be called for replay batch"); + return null; } @Override @@ -3193,8 +3698,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public long getReplaySequenceId() { - return this.replaySeqId; + public long getOrigLogSeqNum() { + return this.origLogSeqNum; + } + + @Override + public void startRegionOperation() throws IOException { + region.startRegionOperation(Operation.REPLAY_BATCH_MUTATE); + } + + @Override + public void closeRegionOperation() throws IOException { + region.closeRegionOperation(Operation.REPLAY_BATCH_MUTATE); + } + + /** + * During replay, there could exist column families which are removed between region server + * failure and replay + */ + @Override + protected void checkAndPreparePut(Put p) throws IOException { + Map> familyCellMap = p.getFamilyCellMap(); + List nonExistentList = null; + for (byte[] family : familyCellMap.keySet()) { + if (!region.htableDescriptor.hasColumnFamily(family)) { + if (nonExistentList == null) { + nonExistentList = new ArrayList<>(); + } + nonExistentList.add(family); + } + } + if (nonExistentList != null) { + for (byte[] family : nonExistentList) { + // Perhaps schema was changed between crash and replay + LOG.info("No family for " + Bytes.toString(family) + " omit from reply."); + familyCellMap.remove(family); + } + } + } + + @Override + public void checkAndPrepare() throws IOException { + long now = EnvironmentEdgeManager.currentTime(); + visitBatchOperations(true, this.size(), (int index) -> { + checkAndPrepareMutation(index, now); + return true; + }); + } + + @Override + public void prepareMiniBatchOperations(MiniBatchOperationInProgress miniBatchOp, + long timestamp, final List acquiredRowLocks) throws IOException { + visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> { + // update cell count + for (List cells : getMutation(index).getFamilyCellMap().values()) { + miniBatchOp.addCellCount(cells.size()); + } + return true; + }); + } + + @Override + public WriteEntry writeMiniBatchOperationsToMemStore( + final MiniBatchOperationInProgress miniBatchOp, final WriteEntry writeEntry) + throws IOException { + super.writeMiniBatchOperationsToMemStore(miniBatchOp, getOrigLogSeqNum()); + return writeEntry; + } + + @Override + public void completeMiniBatchOperations( + final MiniBatchOperationInProgress miniBatchOp, final WriteEntry writeEntry) + throws IOException { + super.completeMiniBatchOperations(miniBatchOp, writeEntry); + region.mvcc.advanceTo(getOrigLogSeqNum()); } } @@ -3204,7 +3781,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. // * coprocessor calls (see ex. BulkDeleteEndpoint). // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... - return batchMutate(new MutationBatchOperation(mutations, nonceGroup, nonce)); + return batchMutate(new MutationBatchOperation(this, mutations, nonceGroup, nonce)); } public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { @@ -3232,14 +3809,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } return statuses; } - return batchMutate(new ReplayBatchOperation(mutations, replaySeqId)); + return batchMutate(new ReplayBatchOperation(this, mutations, replaySeqId)); } /** * Perform a batch of mutations. + * * It supports only Put and Delete mutations and will ignore other types passed. Operations in * a batch are stored with highest durability specified of for all operations in a batch, * except for {@link Durability#SKIP_WAL}. + * + *

This function is called from {@link #batchReplay(MutationReplay[], long)} with + * {@link ReplayBatchOperation} instance and {@link #batchMutate(Mutation[], long, long)} with + * {@link MutationBatchOperation} instance as an argument. As the processing of replay batch + * and mutation batch is very similar, lot of code is shared by providing generic methods in + * base class {@link BatchOperation}. The logic for this method and + * {@link #doMiniBatchMutate(BatchOperation)} is implemented using methods in base class which + * are overridden by derived classes to implement special behavior. + * * @param batchOp contains the list of mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. @@ -3247,8 +3834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ OperationStatus[] batchMutate(BatchOperation batchOp) throws IOException { boolean initialized = false; - Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; - startRegionOperation(op); + batchOp.startRegionOperation(); try { while (!batchOp.isDone()) { if (!batchOp.isInReplay()) { @@ -3257,12 +3843,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi checkResources(); if (!initialized) { - this.writeRequestsCount.add(batchOp.operations.length); - if (!batchOp.isInReplay()) { - callPreMutateCPHooks(batchOp); - } - // validate and prepare batch for write, after CP pre-hooks - batchOp.checkAndPrepare(this); + this.writeRequestsCount.add(batchOp.size()); + // validate and prepare batch for write, for MutationBatchOperation it also calls CP + // prePut()/ preDelete() hooks + batchOp.checkAndPrepare(); initialized = true; } doMiniBatchMutate(batchOp); @@ -3270,296 +3854,75 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi requestFlushIfNeeded(newSize); } } finally { - closeRegionOperation(op); + batchOp.closeRegionOperation(); } return batchOp.retCodeDetails; } - /** - * Runs prePut/ preDelete coprocessor hooks for each mutation in a batch. - * @param batchOp - */ - private void callPreMutateCPHooks(BatchOperation batchOp) throws IOException { - if (coprocessorHost == null) { - return; - } - /* Run coprocessor pre hook outside of locks to avoid deadlock */ - WALEdit walEdit = new WALEdit(); - int noOfPuts = 0; - int noOfDeletes = 0; - for (int i = 0 ; i < batchOp.operations.length; i++) { - Mutation m = batchOp.getMutation(i); - if (m instanceof Put) { - if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { - // pre hook says skip this Put - // mark as success and skip in doMiniBatchMutation - noOfPuts++; - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; - } - } else if (m instanceof Delete) { - Delete curDel = (Delete) m; - if (curDel.getFamilyCellMap().isEmpty()) { - // handle deleting a row case - prepareDelete(curDel); - } - if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) { - // pre hook says skip this Delete - // mark as success and skip in doMiniBatchMutation - noOfDeletes++; - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; - } - } else { - // In case of passing Append mutations along with the Puts and Deletes in batchMutate - // mark the operation return code as failure so that it will not be considered in - // the doMiniBatchMutation - batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE, - "Put/Delete mutations only supported in batchMutate() now"); - } - if (!walEdit.isEmpty()) { - batchOp.walEditsFromCoprocessors[i] = walEdit; - walEdit = new WALEdit(); - } - } - // Update metrics in same way as it is done when we go the normal processing route (we now - // update general metrics though a Coprocessor did the work). - if (noOfPuts > 0) { - // There were some Puts in the batch. - if (this.metricsRegion != null) { - this.metricsRegion.updatePut(); - } - } - if (noOfDeletes > 0) { - // There were some Deletes in the batch. - if (this.metricsRegion != null) { - this.metricsRegion.updateDelete(); - } - } - } - /** * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)} * In here we also handle replay of edits on region recover. * @return Change in size brought about by applying batchOp */ - // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120 private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { - boolean replay = batchOp.isInReplay(); - long currentNonceGroup = HConstants.NO_NONCE; - long currentNonce = HConstants.NO_NONCE; - WALEdit walEdit = null; - boolean locked = false; - // We try to set up a batch in the range [firstIndex,lastIndexExclusive) - int firstIndex = batchOp.nextIndexToProcess; - int lastIndexExclusive = firstIndex; boolean success = false; - int noOfPuts = 0; - int noOfDeletes = 0; + WALEdit walEdit = null; WriteEntry writeEntry = null; - int cellCount = 0; + boolean locked = false; + // We try to set up a batch in the range [batchOp.nextIndexToProcess,lastIndexExclusive) + MiniBatchOperationInProgress miniBatchOp = null; /** Keep track of the locks we hold so we can release them in finally clause */ - List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); - MemStoreSizing memStoreAccounting = new MemStoreSizing(); + List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.size()); try { - // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. - int numReadyToWrite = 0; - for (; lastIndexExclusive < batchOp.operations.length; lastIndexExclusive++) { - if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - continue; - } - Mutation mutation = batchOp.getMutation(lastIndexExclusive); - // If we haven't got any rows in our batch, we should block to get the next one. - RowLock rowLock = null; - try { - rowLock = getRowLockInternal(mutation.getRow(), true); - } catch (TimeoutIOException e) { - // We will retry when other exceptions, but we should stop if we timeout . - throw e; - } catch (IOException ioe) { - LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); - } - if (rowLock == null) { - // We failed to grab another lock - break; // Stop acquiring more rows for this batch - } else { - acquiredRowLocks.add(rowLock); - } - - numReadyToWrite++; - if (replay || getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { - for (List cells : mutation.getFamilyCellMap().values()) { - cellCount += cells.size(); - } - } - } + // STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with + // locked rows + miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks); // We've now grabbed as many mutations off the list as we can - // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? - if (numReadyToWrite <= 0) { + // Ensure we acquire at least one. + if (miniBatchOp.getReadyToWriteCount() <= 0) { + // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? return; } - // STEP 2. Update any LATEST_TIMESTAMP timestamps + lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount()); + locked = true; + + // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp long now = EnvironmentEdgeManager.currentTime(); - if (!replay) { - byte[] byteNow = Bytes.toBytes(now); - for (int i = firstIndex; i < lastIndexExclusive; i++) { - // skip invalid - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { - // lastIndexExclusive was incremented above. - continue; - } - - Mutation mutation = batchOp.getMutation(i); - if (mutation instanceof Put) { - updateCellTimestamps(batchOp.familyCellMaps[i].values(), byteNow); - noOfPuts++; - } else { - prepareDeleteTimestamps(mutation, batchOp.familyCellMaps[i], byteNow); - noOfDeletes++; - } - rewriteCellTags(batchOp.familyCellMaps[i], mutation); - WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; - if (fromCP != null) { - cellCount += fromCP.size(); - } - } - } - lock(this.updatesLock.readLock(), numReadyToWrite); - locked = true; - - // calling the pre CP hook for batch mutation - if (!replay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - coprocessorHost.preBatchMutate(miniBatchOp); - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { - // lastIndexExclusive was incremented above. - continue; - } - // we pass (i - firstIndex) below since the call expects a relative index - Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); - if (cpMutations == null) { - continue; - } - Mutation mutation = batchOp.getMutation(i); - boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL; - // Else Coprocessor added more Mutations corresponding to the Mutation at this index. - for (int j = 0; j < cpMutations.length; j++) { - Mutation cpMutation = cpMutations[j]; - checkAndPrepareMutation(cpMutation, replay, now); - - // Acquire row locks. If not, the whole batch will fail. - acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); - - // Returned mutations from coprocessor correspond to the Mutation at index i. We can - // directly add the cells from those mutations to the familyMaps of this mutation. - Map> cpFamilyMap = cpMutation.getFamilyCellMap(); - // will get added to the memStore later - mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap); - - // The durability of returned mutation is replaced by the corresponding mutation. - // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the - // cells of returned mutation. - if (!skipWal) { - for (List cells : cpFamilyMap.values()) { - cellCount += cells.size(); - } - } - } - } - } + batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks); // STEP 3. Build WAL edit - walEdit = new WALEdit(cellCount, replay); - for (int i = firstIndex; i < lastIndexExclusive; i++) { - // Skip puts that were determined to be invalid during preprocessing - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { - continue; + List> walEdits = batchOp.buildWALEdits(miniBatchOp); + + // STEP 4. Append the WALEdits to WAL and sync. + for(Iterator> it = walEdits.iterator(); it.hasNext();) { + Pair nonceKeyWALEditPair = it.next(); + walEdit = nonceKeyWALEditPair.getSecond(); + NonceKey nonceKey = nonceKeyWALEditPair.getFirst(); + + if (walEdit != null && !walEdit.isEmpty()) { + writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now, + nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum()); } - Mutation m = batchOp.getMutation(i); - // we use durability of the original mutation for the mutation passed by CP. - if (getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) { - recordMutationWithoutWal(m.getFamilyCellMap()); - continue; + // STEP 6. Complete mvcc for all but last writeEntry (for replay case) + if (it.hasNext() && writeEntry != null) { + mvcc.complete(writeEntry); + writeEntry = null; } - - long nonceGroup = batchOp.getNonceGroup(i); - long nonce = batchOp.getNonce(i); - // In replay, the batch may contain multiple nonces. If so, write WALEdit for each. - // Given how nonces are originally written, these should be contiguous. - // They don't have to be, it will still work, just write more WALEdits than needed. - if (nonceGroup != currentNonceGroup || nonce != currentNonce) { - // Write what we have so far for nonces out to WAL - appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce); - walEdit = new WALEdit(cellCount, replay); - currentNonceGroup = nonceGroup; - currentNonce = nonce; - } - - // Add WAL edits by CP - WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; - if (fromCP != null) { - for (Cell cell : fromCP.getCells()) { - walEdit.add(cell); - } - } - addFamilyMapToWALEdit(batchOp.familyCellMaps[i], walEdit); - } - - // STEP 4. Append the final edit to WAL and sync. - Mutation mutation = batchOp.getMutation(firstIndex); - writeEntry = doWALAppend(walEdit, batchOp.durability, mutation.getClusterIds(), now, - currentNonceGroup, currentNonce, - replay ? batchOp.getReplaySequenceId() : WALKey.NO_SEQUENCE_ID); - if (!replay && writeEntry == null) { - // If no writeEntry, then not in replay and skipping WAL or some such. Begin an MVCC - // transaction to get sequence id. - writeEntry = mvcc.begin(); } // STEP 5. Write back to memStore - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { - continue; - } - // We need to update the sequence id for following reasons. - // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. - // 2) If no WAL, FSWALEntry won't be used - // we use durability of the original mutation for the mutation passed by CP. - boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; - if (updateSeqId) { - this.updateSequenceId(batchOp.familyCellMaps[i].values(), - replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); - } - applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreAccounting); - } - - // update memstore size - this.addAndGetMemStoreSize(memStoreAccounting); - - // calling the post CP hook for batch mutation - if (!replay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - coprocessorHost.postBatchMutate(miniBatchOp); - } - - // STEP 6. Complete mvcc. - if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - writeEntry = null; - } - if (replay) { - this.mvcc.advanceTo(batchOp.getReplaySequenceId()); - } + // NOTE: writeEntry can be null here + writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry); + // STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and + // complete mvcc for last writeEntry + batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry); + writeEntry = null; success = true; } finally { // Call complete rather than completeAndWait because we probably had error if walKey != null @@ -3570,122 +3933,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } releaseRowLocks(acquiredRowLocks); - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { - batchOp.retCodeDetails[i] = success? OperationStatus.SUCCESS : OperationStatus.FAILURE; - } - } + final int finalLastIndexExclusive = + miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size(); + final boolean finalSuccess = success; + batchOp.visitBatchOperations(true, finalLastIndexExclusive, (int i) -> { + batchOp.retCodeDetails[i] = + finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE; + return true; + }); - // synced so that the coprocessor contract is adhered to. - if (!replay && coprocessorHost != null) { - for (int i = firstIndex; i < lastIndexExclusive; i++) { - // only for successful puts - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.SUCCESS) { - continue; - } - Mutation m = batchOp.getMutation(i); - if (m instanceof Put) { - coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); - } else { - coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability()); - } - } - } + batchOp.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, finalSuccess); - // See if the column families were consistent through the whole thing. - // if they were then keep them. If they were not then pass a null. - // null will be treated as unknown. - // Total time taken might be involving Puts and Deletes. - // Split the time for puts and deletes based on the total number of Puts and Deletes. - - if (noOfPuts > 0) { - // There were some Puts in the batch. - if (this.metricsRegion != null) { - this.metricsRegion.updatePut(); - } - } - if (noOfDeletes > 0) { - // There were some Deletes in the batch. - if (this.metricsRegion != null) { - this.metricsRegion.updateDelete(); - } - } - - if (coprocessorHost != null && !batchOp.isInReplay()) { - // call the coprocessor hook to do any finalization steps - // after the put is done - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success); - } - - batchOp.nextIndexToProcess = lastIndexExclusive; - } - } - - private void mergeFamilyMaps(Map> familyMap, - Map> toBeMerged) { - for (Map.Entry> entry : toBeMerged.entrySet()) { - List cells = familyMap.get(entry.getKey()); - if (cells == null) { - familyMap.put(entry.getKey(), entry.getValue()); - } else { - cells.addAll(entry.getValue()); - } - } - } - - private void appendCurrentNonces(final Mutation mutation, final boolean replay, - final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce) - throws IOException { - if (walEdit.isEmpty()) return; - if (!replay) throw new IOException("Multiple nonces per batch and not in replay"); - WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, mutation.getClusterIds(), - currentNonceGroup, currentNonce, mvcc, this.getReplicationScope()); - this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - // Complete the mvcc transaction started down in append else it will block others - this.mvcc.complete(walKey.getWriteEntry()); - } - - private void checkAndPrepareMutation(Mutation mutation, boolean replay, final long now) - throws IOException { - checkRow(mutation.getRow(), "doMiniBatchMutation"); - if (mutation instanceof Put) { - // Check the families in the put. If bad, skip this one. - if (replay) { - removeNonExistentColumnFamilyForReplay(mutation.getFamilyCellMap()); - } else { - checkFamilies(mutation.getFamilyCellMap().keySet()); - } - checkTimestamps(mutation.getFamilyCellMap(), now); - } else { - prepareDelete((Delete)mutation); - } - } - - /** - * During replay, there could exist column families which are removed between region server - * failure and replay - */ - private void removeNonExistentColumnFamilyForReplay(final Map> familyMap) { - List nonExistentList = null; - for (byte[] family : familyMap.keySet()) { - if (!this.htableDescriptor.hasColumnFamily(family)) { - if (nonExistentList == null) { - nonExistentList = new ArrayList<>(); - } - nonExistentList.add(family); - } - } - if (nonExistentList != null) { - for (byte[] family : nonExistentList) { - // Perhaps schema was changed between crash and replay - LOG.info("No family for " + Bytes.toString(family) + " omit from reply."); - familyMap.remove(family); - } + batchOp.nextIndexToProcess = finalLastIndexExclusive; } } @@ -4003,25 +4262,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi doBatchMutate(p); } - /* - * Atomically apply the given map of family->edits to the memstore. - * This handles the consistency control on its own, but the caller - * should already have locked updatesLock.readLock(). This also does - * not check the families for validity. - * - * @param familyMap Map of Cells by family - * @param memstoreSize - */ - private void applyFamilyMapToMemStore(Map> familyMap, - MemStoreSizing memstoreAccounting) throws IOException { - for (Map.Entry> e : familyMap.entrySet()) { - byte[] family = e.getKey(); - List cells = e.getValue(); - assert cells instanceof RandomAccess; - applyToMemStore(getStore(family), cells, false, memstoreAccounting); - } - } - /** * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be * set; when set we will run operations that make sense in the increment/append scenario @@ -4090,24 +4330,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - /** - * Append the given map of family->edits to a WALEdit data structure. - * This does not write to the WAL itself. - * @param familyMap map of family->edits - * @param walEdit the destination entry to append into - */ - private void addFamilyMapToWALEdit(Map> familyMap, - WALEdit walEdit) { - for (List edits : familyMap.values()) { - assert edits instanceof RandomAccess; - int listSize = edits.size(); - for (int i=0; i < listSize; i++) { - Cell cell = edits.get(i); - walEdit.add(cell); - } - } - } - /* * @param size * @return True if size is over the flush threshold @@ -5471,8 +5693,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private void releaseRowLocks(List rowLocks) { if (rowLocks != null) { - for (int i = 0; i < rowLocks.size(); i++) { - rowLocks.get(i).release(); + for (RowLock rowLock : rowLocks) { + rowLock.release(); } rowLocks.clear(); } @@ -5626,7 +5848,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * bulkLoadHFile() to perform any necessary * pre/post processing of a given bulkload call */ - public static interface BulkLoadListener { + public interface BulkLoadListener { /** * Called before an HFile is actually loaded * @param family family being loaded to @@ -6081,7 +6303,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // to handle scan or get operation. moreValues = nextInternal(outResults, scannerContext); } else { - List tmpList = new ArrayList(); + List tmpList = new ArrayList<>(); moreValues = nextInternal(tmpList, scannerContext); outResults.addAll(tmpList); } @@ -6860,46 +7082,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor); } - /** - * Create a daughter region from given a temp directory with the region data. - * @param hri Spec. for daughter region to open. - * @throws IOException - */ - public HRegion createDaughterRegionFromSplits(final RegionInfo hri) throws IOException { - // Move the files from the temporary .splits to the final /table/region directory - fs.commitDaughterRegion(hri); - - // Create the daughter HRegion instance - HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(), - this.getBaseConf(), hri, this.getTableDescriptor(), rsServices); - r.readRequestsCount.add(this.getReadRequestsCount() / 2); - r.filteredReadRequestsCount.add(this.getFilteredReadRequestsCount() / 2); - r.writeRequestsCount.add(this.getWriteRequestsCount() / 2); - return r; - } - - /** - * Create a merged region given a temp directory with the region data. - * @param region_b another merging region - * @return merged HRegion - * @throws IOException - */ - HRegion createMergedRegionFromMerges(final RegionInfo mergedRegionInfo, - final HRegion region_b) throws IOException { - HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), - fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo, - this.getTableDescriptor(), this.rsServices); - r.readRequestsCount.add(this.getReadRequestsCount() - + region_b.getReadRequestsCount()); - r.filteredReadRequestsCount.add(this.getFilteredReadRequestsCount() - + region_b.getFilteredReadRequestsCount()); - r.writeRequestsCount.add(this.getWriteRequestsCount() - - + region_b.getWriteRequestsCount()); - this.fs.commitMergedRegion(mergedRegionInfo); - return r; - } - /** * Computes the Path of the HRegion * @@ -6960,7 +7142,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); } - void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException { + void prepareGet(final Get get) throws IOException { checkRow(get.getRow(), "Get"); // Verify families are all valid if (get.hasFamilies()) { @@ -7396,32 +7578,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return writeEntry associated with this append */ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, - long now, long nonceGroup, long nonce, long replaySeqId) throws IOException { - Preconditions.checkArgument(!walEdit.isReplay() || replaySeqId != WALKey.NO_SEQUENCE_ID, + long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException { + Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(), + "WALEdit is null or empty!"); + Preconditions.checkArgument(!walEdit.isReplay() || origLogSeqNum != WALKey.NO_SEQUENCE_ID, "Invalid replay sequence Id for replay WALEdit!"); + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, + nonce, mvcc) : + new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, + nonceGroup, nonce, mvcc, this.getReplicationScope()); + if (walEdit.isReplay()) { + walKey.setOrigLogSeqNum(origLogSeqNum); + } WriteEntry writeEntry = null; - if (!walEdit.isEmpty()) { - // Using default cluster id, as this can only happen in the originating cluster. - // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey - // here instead of WALKey directly to support legacy coprocessors. - WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, - nonce, mvcc) : - new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, - nonceGroup, nonce, mvcc, this.getReplicationScope()); - if (walEdit.isReplay()) { - walKey.setOrigLogSeqNum(replaySeqId); + try { + long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); + // Call sync on our edit. + if (txid != 0) { + sync(txid, durability); } - try { - long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - // Call sync on our edit. - if (txid != 0) sync(txid, durability); - writeEntry = walKey.getWriteEntry(); - } catch (IOException ioe) { - if (walKey != null) mvcc.complete(walKey.getWriteEntry()); - throw ioe; + writeEntry = walKey.getWriteEntry(); + } catch (IOException ioe) { + if (walKey != null) { + mvcc.complete(walKey.getWriteEntry()); } + throw ioe; } return writeEntry; } @@ -7637,7 +7823,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return Sorted list of cells using comparator */ private static List sort(List cells, final CellComparator comparator) { - Collections.sort(cells, comparator); + cells.sort(comparator); return cells; } @@ -7658,7 +7844,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ClassSize.OBJECT + ClassSize.ARRAY + 51 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (15 * Bytes.SIZEOF_LONG) + + (14 * Bytes.SIZEOF_LONG) + 6 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java index 56a97e08355..ba847a1e12f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.wal.WALEdit; @@ -40,13 +41,22 @@ public class MiniBatchOperationInProgress { private final int firstIndex; private final int lastIndexExclusive; + private int readyToWriteCount = 0; + private int cellCount = 0; + private int numOfPuts = 0; + private int numOfDeletes = 0; + + public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails, - WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive) { + WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive, + int readyToWriteCount) { + Preconditions.checkArgument(readyToWriteCount <= (lastIndexExclusive - firstIndex)); this.operations = operations; this.retCodeDetails = retCodeDetails; this.walEditsFromCoprocessors = walEditsFromCoprocessors; this.firstIndex = firstIndex; this.lastIndexExclusive = lastIndexExclusive; + this.readyToWriteCount = readyToWriteCount; } /** @@ -127,4 +137,36 @@ public class MiniBatchOperationInProgress { return operationsFromCoprocessors == null ? null : operationsFromCoprocessors[getAbsoluteIndex(index)]; } + + public int getReadyToWriteCount() { + return readyToWriteCount; + } + + public int getLastIndexExclusive() { + return lastIndexExclusive; + } + + public int getCellCount() { + return cellCount; + } + + public void addCellCount(int cellCount) { + this.cellCount += cellCount; + } + + public int getNumOfPuts() { + return numOfPuts; + } + + public void incrementNumOfPuts() { + this.numOfPuts += 1; + } + + public int getNumOfDeletes() { + return numOfDeletes; + } + + public void incrementNumOfDeletes() { + this.numOfDeletes += 1; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java index c8e99405073..0d9d149ed7f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -137,7 +137,7 @@ MultiRowMutationProcessorResponse> { if (coprocessorHost != null) { miniBatch = new MiniBatchOperationInProgress<>( mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0, - mutations.size()); + mutations.size(), mutations.size()); coprocessorHost.preBatchMutate(miniBatch); } // Apply edits to a single WALEdit diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java index 4a593794e48..c3472b511ba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java @@ -44,7 +44,7 @@ public class TestMiniBatchOperationInProgress { } MiniBatchOperationInProgress> miniBatch = new MiniBatchOperationInProgress<>(operations, retCodeDetails, - walEditsFromCoprocessors, 0, 5); + walEditsFromCoprocessors, 0, 5, 5); assertEquals(5, miniBatch.size()); assertTrue(Bytes.equals(Bytes.toBytes(0), miniBatch.getOperation(0).getFirst().getRow())); @@ -69,7 +69,7 @@ public class TestMiniBatchOperationInProgress { } miniBatch = new MiniBatchOperationInProgress<>(operations, - retCodeDetails, walEditsFromCoprocessors, 7, 10); + retCodeDetails, walEditsFromCoprocessors, 7, 10, 3); try { miniBatch.setWalEdit(-1, new WALEdit()); fail("Should throw Exception while accessing out of range"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java index eb336fe5879..2fd3909344b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java @@ -867,7 +867,7 @@ public class TestWithDisabledAuthorization extends SecureTestUtil { @Override public Object run() throws Exception { ACCESS_CONTROLLER.preBatchMutate(ObserverContextImpl.createAndPrepare(RCP_ENV), - new MiniBatchOperationInProgress<>(null, null, null, 0, 0)); + new MiniBatchOperationInProgress<>(null, null, null, 0, 0, 0)); return null; } }, SUPERUSER, USER_ADMIN, USER_RW, USER_RO, USER_OWNER, USER_CREATE, USER_QUAL, USER_NONE);