From 08dff49620adee39714fe8466c66a159489599be Mon Sep 17 00:00:00 2001 From: Umesh Agashe Date: Fri, 6 Oct 2017 15:40:05 -0700 Subject: [PATCH] HBASE-18960 A few bug fixes and minor improvements around batchMutate * batch validation and preparation is done before we start iterating over operations for writes * durability, familyCellMaps and observedExceptions are batch wide and are now sotred in BatchOperation, as a result durability is consistent across all operations in a batch * for all operations done by preBatchMutate() CP hook, operation status is updated to success * doWALAppend() is modified to habdle replay and is used from doMiniBatchMutate() * minor improvements Signed-off-by: Michael Stack --- .../hadoop/hbase/regionserver/HRegion.java | 420 ++++++++---------- .../regionserver/TestHRegionReplayEvents.java | 21 + 2 files changed, 207 insertions(+), 234 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0bef925a9e3..1cbb689231f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -661,7 +661,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final MetricsRegion metricsRegion; private final MetricsRegionWrapperImpl metricsRegionWrapper; - private final Durability durability; + private final Durability regionDurability; private final boolean regionStatsEnabled; // Stores the replication scope of the various column families of the table // that has non-default scope @@ -787,9 +787,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ this.rowProcessorTimeout = conf.getLong( "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); - this.durability = htd.getDurability() == Durability.USE_DEFAULT - ? DEFAULT_DURABILITY - : htd.getDurability(); + this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ? + DEFAULT_DURABILITY : htd.getDurability(); if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -1944,13 +1943,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // These methods are meant to be called periodically by the HRegionServer for // upkeep. ////////////////////////////////////////////////////////////////////////////// - /** - * @return returns size of largest HStore. - */ - public long getLargestHStoreSize() { - return stores.values().stream().mapToLong(HStore::getSize).max().orElse(0L); - } - /** * Do preparation for pending compaction. * @throws IOException @@ -3018,21 +3010,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** - * Struct-like class that tracks the progress of a batch operation, - * accumulating status codes and tracking the index at which processing - * is proceeding. + * Struct-like class that tracks the progress of a batch operation, accumulating status codes + * and tracking the index at which processing is proceeding. These batch operations may get + * split into mini-batches for processing. */ private abstract static class BatchOperation { T[] operations; int nextIndexToProcess = 0; OperationStatus[] retCodeDetails; WALEdit[] walEditsFromCoprocessors; + // reference family cell maps directly so coprocessors can mutate them if desired + Map>[] familyCellMaps; + ObservedExceptionsInBatch observedExceptions; + Durability durability; //Durability of the batch (highest durability of all operations) public BatchOperation(T[] operations) { this.operations = operations; this.retCodeDetails = new OperationStatus[operations.length]; this.walEditsFromCoprocessors = new WALEdit[operations.length]; Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); + familyCellMaps = new Map[operations.length]; + observedExceptions = new ObservedExceptionsInBatch(); + durability = Durability.USE_DEFAULT; } public abstract Mutation getMutation(int index); @@ -3046,12 +3045,69 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public boolean isDone() { return nextIndexToProcess == operations.length; } + + /** + * Validates each mutation and prepares a batch for write. + * NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called + * after prePut()/ preDelete() CP hooks are run for all mutations in a batch. + */ + public void checkAndPrepare(final HRegion region) throws IOException { + long now = EnvironmentEdgeManager.currentTime(); + for (int i = 0 ; i < operations.length; i++) { + // Skip anything that "ran" already + if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) { + Mutation mutation = getMutation(i); + + try { + region.checkAndPrepareMutation(mutation, isInReplay(), now); + + // store the family map reference to allow for mutations + familyCellMaps[i] = mutation.getFamilyCellMap(); + // store durability for the batch (highest durability of all operations in the batch) + Durability tmpDur = region.getEffectiveDurability(mutation.getDurability()); + if (tmpDur.ordinal() > durability.ordinal()) { + durability = tmpDur; + } + } catch (NoSuchColumnFamilyException nscf) { + final String msg = "No such column family in batch mutation. "; + if (observedExceptions.hasSeenNoSuchFamily()) { + LOG.warn(msg + nscf.getMessage()); + } else { + LOG.warn(msg, nscf); + observedExceptions.sawNoSuchFamily(); + } + retCodeDetails[i] = new OperationStatus( + OperationStatusCode.BAD_FAMILY, nscf.getMessage()); + } catch (FailedSanityCheckException fsce) { + final String msg = "Batch Mutation did not pass sanity check. "; + if (observedExceptions.hasSeenFailedSanityCheck()) { + LOG.warn(msg + fsce.getMessage()); + } else { + LOG.warn(msg, fsce); + observedExceptions.sawFailedSanityCheck(); + } + retCodeDetails[i] = new OperationStatus( + OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); + } catch (WrongRegionException we) { + final String msg = "Batch mutation had a row that does not belong to this region. "; + if (observedExceptions.hasSeenWrongRegion()) { + LOG.warn(msg + we.getMessage()); + } else { + LOG.warn(msg, we); + observedExceptions.sawWrongRegion(); + } + retCodeDetails[i] = new OperationStatus( + OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); + } + } + } + } } - private static class MutationBatch extends BatchOperation { + private static class MutationBatchOperation extends BatchOperation { private long nonceGroup; private long nonce; - public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) { + public MutationBatchOperation(Mutation[] operations, long nonceGroup, long nonce) { super(operations); this.nonceGroup = nonceGroup; this.nonce = nonce; @@ -3088,9 +3144,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private static class ReplayBatch extends BatchOperation { + private static class ReplayBatchOperation extends BatchOperation { private long replaySeqId = 0; - public ReplayBatch(MutationReplay[] operations, long seqId) { + public ReplayBatchOperation(MutationReplay[] operations, long seqId) { super(operations); this.replaySeqId = seqId; } @@ -3133,10 +3189,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. // * coprocessor calls (see ex. BulkDeleteEndpoint). // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... - return batchMutate(new MutationBatch(mutations, nonceGroup, nonce)); + return batchMutate(new MutationBatchOperation(mutations, nonceGroup, nonce)); } - @Override public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -3162,12 +3217,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } return statuses; } - return batchMutate(new ReplayBatch(mutations, replaySeqId)); + return batchMutate(new ReplayBatchOperation(mutations, replaySeqId)); } /** * Perform a batch of mutations. - * It supports only Put and Delete mutations and will ignore other types passed. + * It supports only Put and Delete mutations and will ignore other types passed. Operations in + * a batch are stored with highest durability specified of for all operations in a batch, + * except for {@link Durability#SKIP_WAL}. * @param batchOp contains the list of mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. @@ -3187,8 +3244,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!initialized) { this.writeRequestsCount.add(batchOp.operations.length); if (!batchOp.isInReplay()) { - doPreBatchMutateHook(batchOp); + callPreMutateCPHooks(batchOp); } + // validate and prepare batch for write, after CP pre-hooks + batchOp.checkAndPrepare(this); initialized = true; } doMiniBatchMutate(batchOp); @@ -3201,8 +3260,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return batchOp.retCodeDetails; } - private void doPreBatchMutateHook(BatchOperation batchOp) - throws IOException { + /** + * Runs prePut/ preDelete coprocessor hooks for each mutation in a batch. + * @param batchOp + */ + private void callPreMutateCPHooks(BatchOperation batchOp) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); if (coprocessorHost != null) { @@ -3252,27 +3314,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long currentNonce = HConstants.NO_NONCE; WALEdit walEdit = null; boolean locked = false; - // reference family maps directly so coprocessors can mutate them if desired - Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; boolean success = false; + boolean doneByCoprocessor = false; int noOfPuts = 0; int noOfDeletes = 0; WriteEntry writeEntry = null; int cellCount = 0; /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); - MemStoreSize memstoreSize = new MemStoreSize(); - final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch(); + MemStoreSize memStoreSize = new MemStoreSize(); try { // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. int numReadyToWrite = 0; - long now = EnvironmentEdgeManager.currentTime(); - while (lastIndexExclusive < batchOp.operations.length) { - if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) { - lastIndexExclusive++; + for (; lastIndexExclusive < batchOp.operations.length; lastIndexExclusive++) { + if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { continue; } Mutation mutation = batchOp.getMutation(lastIndexExclusive); @@ -3293,9 +3352,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi acquiredRowLocks.add(rowLock); } - lastIndexExclusive++; numReadyToWrite++; - if (replay) { + if (replay || getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { for (List cells : mutation.getFamilyCellMap().values()) { cellCount += cells.size(); } @@ -3303,42 +3361,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // We've now grabbed as many mutations off the list as we can - - // STEP 2. Update any LATEST_TIMESTAMP timestamps - // We should record the timestamp only after we have acquired the rowLock, - // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp - now = EnvironmentEdgeManager.currentTime(); - byte[] byteNow = Bytes.toBytes(now); - // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? if (numReadyToWrite <= 0) { return; } - for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) { - // skip invalid - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - // lastIndexExclusive was incremented above. - continue; - } + // STEP 2. Update any LATEST_TIMESTAMP timestamps + // We should record the timestamp only after we have acquired the rowLock, + // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp + long now = EnvironmentEdgeManager.currentTime(); + if (!replay) { + byte[] byteNow = Bytes.toBytes(now); + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // skip invalid + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + // lastIndexExclusive was incremented above. + continue; + } - Mutation mutation = batchOp.getMutation(i); - if (mutation instanceof Put) { - updateCellTimestamps(familyMaps[i].values(), byteNow); - noOfPuts++; - } else { - prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); - noOfDeletes++; - } - rewriteCellTags(familyMaps[i], mutation); - WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; - if (fromCP != null) { - cellCount += fromCP.size(); - } - if (getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { - for (List cells : familyMaps[i].values()) { - cellCount += cells.size(); + Mutation mutation = batchOp.getMutation(i); + if (mutation instanceof Put) { + updateCellTimestamps(batchOp.familyCellMaps[i].values(), byteNow); + noOfPuts++; + } else { + prepareDeleteTimestamps(mutation, batchOp.familyCellMaps[i], byteNow); + noOfDeletes++; + } + rewriteCellTags(batchOp.familyCellMaps[i], mutation); + WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; + if (fromCP != null) { + cellCount += fromCP.size(); } } } @@ -3351,6 +3403,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) { + doneByCoprocessor = true; return; } else { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -3368,15 +3421,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Else Coprocessor added more Mutations corresponding to the Mutation at this index. for (int j = 0; j < cpMutations.length; j++) { Mutation cpMutation = cpMutations[j]; - Map> cpFamilyMap = cpMutation.getFamilyCellMap(); - checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now); + checkAndPrepareMutation(cpMutation, replay, now); // Acquire row locks. If not, the whole batch will fail. acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); // Returned mutations from coprocessor correspond to the Mutation at index i. We can // directly add the cells from those mutations to the familyMaps of this mutation. - mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later + Map> cpFamilyMap = cpMutation.getFamilyCellMap(); + // will get added to the memStore later + mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap); // The durability of returned mutation is replaced by the corresponding mutation. // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the @@ -3393,7 +3447,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // STEP 3. Build WAL edit walEdit = new WALEdit(cellCount, replay); - Durability durability = Durability.USE_DEFAULT; for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { @@ -3401,12 +3454,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } Mutation m = batchOp.getMutation(i); - Durability tmpDur = getEffectiveDurability(m.getDurability()); - if (tmpDur.ordinal() > durability.ordinal()) { - durability = tmpDur; - } // we use durability of the original mutation for the mutation passed by CP. - if (tmpDur == Durability.SKIP_WAL) { + if (getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) { recordMutationWithoutWal(m.getFamilyCellMap()); continue; } @@ -3431,58 +3480,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi walEdit.add(cell); } } - addFamilyMapToWALEdit(familyMaps[i], walEdit); + addFamilyMapToWALEdit(batchOp.familyCellMaps[i], walEdit); } // STEP 4. Append the final edit to WAL and sync. Mutation mutation = batchOp.getMutation(firstIndex); - WALKey walKey = null; - long txid; - if (replay) { - // use wal key from the original - walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); - walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId()); - if (!walEdit.isEmpty()) { - txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - if (txid != 0) { - sync(txid, durability); - } - } - } else { - try { - if (!walEdit.isEmpty()) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, - this.getReplicationScope()); - // TODO: Use the doAppend methods below... complicated by the replay stuff above. - txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - if (txid != 0) { - sync(txid, durability); - } - if (writeEntry == null) { - // if MVCC not preassigned, wait here until assigned - writeEntry = walKey.getWriteEntry(); - } - } - } catch (IOException ioe) { - if (walKey != null && writeEntry == null) { - // the writeEntry is not preassigned and error occurred during append or sync - mvcc.complete(walKey.getWriteEntry()); - } - throw ioe; - } - } - if (walKey == null) { - // If no walKey, then not in replay and skipping WAL or some such. Begin an MVCC transaction - // to get sequence id. + writeEntry = doWALAppend(walEdit, batchOp.durability, mutation.getClusterIds(), now, + currentNonceGroup, currentNonce, + replay ? batchOp.getReplaySequenceId() : WALKey.NO_SEQUENCE_ID); + if (!replay && writeEntry == null) { + // If no writeEntry, then not in replay and skipping WAL or some such. Begin an MVCC + // transaction to get sequence id. writeEntry = mvcc.begin(); } - // STEP 5. Write back to memstore + // STEP 5. Write back to memStore for (int i = firstIndex; i < lastIndexExclusive; i++) { if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; @@ -3493,14 +3505,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // we use durability of the original mutation for the mutation passed by CP. boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; if (updateSeqId) { - this.updateSequenceId(familyMaps[i].values(), + this.updateSequenceId(batchOp.familyCellMaps[i].values(), replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); } - applyFamilyMapToMemStore(familyMaps[i], memstoreSize); + applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreSize); } // update memstore size - this.addAndGetMemStoreSize(memstoreSize); + this.addAndGetMemStoreSize(memStoreSize); // calling the post CP hook for batch mutation if (!replay && coprocessorHost != null) { @@ -3511,30 +3523,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // STEP 6. Complete mvcc. - if (replay) { - this.mvcc.advanceTo(batchOp.getReplaySequenceId()); - } else { - // writeEntry won't be empty if not in replay mode + if (writeEntry != null) { mvcc.completeAndWait(writeEntry); writeEntry = null; } + if (replay) { + this.mvcc.advanceTo(batchOp.getReplaySequenceId()); + } + + success = true; + } finally { + // Call complete rather than completeAndWait because we probably had error if walKey != null + if (writeEntry != null) mvcc.complete(writeEntry); - // STEP 7. Release row locks, etc. if (locked) { this.updatesLock.readLock().unlock(); - locked = false; } releaseRowLocks(acquiredRowLocks); - for (int i = firstIndex; i < lastIndexExclusive; i ++) { + for (int i = firstIndex; i < lastIndexExclusive; i++) { if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + batchOp.retCodeDetails[i] = + success || doneByCoprocessor ? OperationStatus.SUCCESS : OperationStatus.FAILURE; } } - // STEP 8. Run coprocessor post hooks. This should be done after the wal is // synced so that the coprocessor contract is adhered to. - if (!replay && coprocessorHost != null) { + if (!replay && coprocessorHost != null && !doneByCoprocessor) { for (int i = firstIndex; i < lastIndexExclusive; i++) { // only for successful puts if (batchOp.retCodeDetails[i].getOperationStatusCode() @@ -3550,15 +3565,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - success = true; - } finally { - // Call complete rather than completeAndWait because we probably had error if walKey != null - if (writeEntry != null) mvcc.complete(writeEntry); - if (locked) { - this.updatesLock.readLock().unlock(); - } - releaseRowLocks(acquiredRowLocks); - // See if the column families were consistent through the whole thing. // if they were then keep them. If they were not then pass a null. // null will be treated as unknown. @@ -3577,13 +3583,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegion.updateDelete(); } } - if (!success) { - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) { - batchOp.retCodeDetails[i] = OperationStatus.FAILURE; - } - } - } + if (coprocessorHost != null && !batchOp.isInReplay()) { // call the coprocessor hook to do any finalization steps // after the put is done @@ -3622,75 +3622,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.mvcc.complete(walKey.getWriteEntry()); } - private boolean checkBatchOp(BatchOperation batchOp, final int lastIndexExclusive, - final Map>[] familyMaps, final long now, - final ObservedExceptionsInBatch observedExceptions) - throws IOException { - boolean skip = false; - // Skip anything that "ran" already - if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - return true; - } - Mutation mutation = batchOp.getMutation(lastIndexExclusive); - Map> familyMap = mutation.getFamilyCellMap(); - // store the family map reference to allow for mutations - familyMaps[lastIndexExclusive] = familyMap; - - try { - checkAndPrepareMutation(mutation, batchOp.isInReplay(), familyMap, now); - } catch (NoSuchColumnFamilyException nscf) { - final String msg = "No such column family in batch mutation. "; - if (observedExceptions.hasSeenNoSuchFamily()) { - LOG.warn(msg + nscf.getMessage()); - } else { - LOG.warn(msg, nscf); - observedExceptions.sawNoSuchFamily(); - } - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.BAD_FAMILY, nscf.getMessage()); - skip = true; - } catch (FailedSanityCheckException fsce) { - final String msg = "Batch Mutation did not pass sanity check. "; - if (observedExceptions.hasSeenFailedSanityCheck()) { - LOG.warn(msg + fsce.getMessage()); - } else { - LOG.warn(msg, fsce); - observedExceptions.sawFailedSanityCheck(); - } - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); - skip = true; - } catch (WrongRegionException we) { - final String msg = "Batch mutation had a row that does not belong to this region. "; - if (observedExceptions.hasSeenWrongRegion()) { - LOG.warn(msg + we.getMessage()); - } else { - LOG.warn(msg, we); - observedExceptions.sawWrongRegion(); - } - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); - skip = true; - } - return skip; - } - - private void checkAndPrepareMutation(Mutation mutation, boolean replay, - final Map> familyMap, final long now) - throws IOException { + private void checkAndPrepareMutation(Mutation mutation, boolean replay, final long now) + throws IOException { + checkRow(mutation.getRow(), "doMiniBatchMutation"); if (mutation instanceof Put) { // Check the families in the put. If bad, skip this one. if (replay) { - removeNonExistentColumnFamilyForReplay(familyMap); + removeNonExistentColumnFamilyForReplay(mutation.getFamilyCellMap()); } else { - checkFamilies(familyMap.keySet()); + checkFamilies(mutation.getFamilyCellMap().keySet()); } checkTimestamps(mutation.getFamilyCellMap(), now); } else { prepareDelete((Delete)mutation); } - checkRow(mutation.getRow(), "doMiniBatchMutation"); } /** @@ -3721,7 +3666,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * the table descriptor. */ protected Durability getEffectiveDurability(Durability d) { - return d == Durability.USE_DEFAULT ? this.durability : d; + return d == Durability.USE_DEFAULT ? this.regionDurability : d; } @Override @@ -7430,28 +7375,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi nonceGroup, nonce); } + private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, + long now, long nonceGroup, long nonce) throws IOException { + return doWALAppend(walEdit, durability, clusterIds, now, nonceGroup, nonce, + WALKey.NO_SEQUENCE_ID); + } + /** * @return writeEntry associated with this append */ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, - long now, long nonceGroup, long nonce) - throws IOException { + long now, long nonceGroup, long nonce, long replaySeqId) throws IOException { + Preconditions.checkArgument(!walEdit.isReplay() || replaySeqId != WALKey.NO_SEQUENCE_ID, + "Invalid replay sequence Id for replay WALEdit!"); WriteEntry writeEntry = null; - // Using default cluster id, as this can only happen in the originating cluster. - // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey - // here instead of WALKey directly to support legacy coprocessors. - WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, - nonceGroup, nonce, mvcc, this.getReplicationScope()); - try { - long txid = - this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - // Call sync on our edit. - if (txid != 0) sync(txid, durability); - writeEntry = walKey.getWriteEntry(); - } catch (IOException ioe) { - if (walKey != null) mvcc.complete(walKey.getWriteEntry()); - throw ioe; + if (!walEdit.isEmpty()) { + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, + nonce, mvcc) : + new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, + nonceGroup, nonce, mvcc, this.getReplicationScope()); + if (walEdit.isReplay()) { + walKey.setOrigLogSeqNum(replaySeqId); + } + try { + long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); + // Call sync on our edit. + if (txid != 0) sync(txid, durability); + writeEntry = walKey.getWriteEntry(); + } catch (IOException ioe) { + if (walKey != null) mvcc.complete(walKey.getWriteEntry()); + throw ioe; + } } return writeEntry; } @@ -7845,13 +7804,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.explicitSplitPoint = null; } - /** - * Give the region a chance to prepare before it is split. - */ - protected void prepareToSplit() { - // nothing - } - /** * Return the splitpoint. null indicates the region isn't splittable * If the splitpoint isn't explicitly specified, it will go over the stores @@ -8115,7 +8067,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Check whether we should sync the wal from the table's durability settings */ private boolean shouldSyncWAL() { - return durability.ordinal() > Durability.ASYNC_WAL.ordinal(); + return regionDurability.ordinal() > Durability.ASYNC_WAL.ordinal(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 3665ed8dbd0..22b5064dc42 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -309,6 +309,27 @@ public class TestHRegionReplayEvents { TEST_UTIL.getConfiguration()); } + @Test + public void testBatchReplayWithMultipleNonces() throws IOException { + try { + MutationReplay[] mutations = new MutationReplay[100]; + for (int i = 0; i < 100; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.setDurability(Durability.SYNC_WAL); + for (byte[] familly : this.families) { + put.addColumn(familly, this.cq, null); + long nonceNum = i / 10; + mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum); + } + } + primaryRegion.batchReplay(mutations, 20); + } catch (Exception e) { + String msg = "Error while replay of batch with multiple nonces. "; + LOG.error(msg, e); + fail(msg + e.getMessage()); + } + } + @Test public void testReplayFlushesAndCompactions() throws IOException { // initiate a secondary region with some data.