HBASE-18960 A few bug fixes and minor improvements around batchMutate

* batch validation and preparation is done before we start iterating over operations for writes
* durability, familyCellMaps and observedExceptions are batch wide and are now sotred in BatchOperation,
  as a result durability is consistent across all operations in a batch
* for all operations done by preBatchMutate() CP hook, operation status is updated to success
* doWALAppend() is modified to habdle replay and is used from doMiniBatchMutate()
* minor improvements

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Umesh Agashe 2017-10-06 15:40:05 -07:00 committed by Michael Stack
parent 70f4c5da47
commit e1941aa6d1
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
2 changed files with 207 additions and 234 deletions

View File

@ -661,7 +661,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final MetricsRegion metricsRegion;
private final MetricsRegionWrapperImpl metricsRegionWrapper;
private final Durability durability;
private final Durability regionDurability;
private final boolean regionStatsEnabled;
// Stores the replication scope of the various column families of the table
// that has non-default scope
@ -787,9 +787,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
this.rowProcessorTimeout = conf.getLong(
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
this.durability = htd.getDurability() == Durability.USE_DEFAULT
? DEFAULT_DURABILITY
: htd.getDurability();
this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ?
DEFAULT_DURABILITY : htd.getDurability();
if (rsServices != null) {
this.rsAccounting = this.rsServices.getRegionServerAccounting();
// don't initialize coprocessors if not running within a regionserver
@ -1944,13 +1943,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// These methods are meant to be called periodically by the HRegionServer for
// upkeep.
//////////////////////////////////////////////////////////////////////////////
/**
* @return returns size of largest HStore.
*/
public long getLargestHStoreSize() {
return stores.values().stream().mapToLong(HStore::getSize).max().orElse(0L);
}
/**
* Do preparation for pending compaction.
* @throws IOException
@ -3018,21 +3010,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/**
* Struct-like class that tracks the progress of a batch operation,
* accumulating status codes and tracking the index at which processing
* is proceeding.
* Struct-like class that tracks the progress of a batch operation, accumulating status codes
* and tracking the index at which processing is proceeding. These batch operations may get
* split into mini-batches for processing.
*/
private abstract static class BatchOperation<T> {
T[] operations;
int nextIndexToProcess = 0;
OperationStatus[] retCodeDetails;
WALEdit[] walEditsFromCoprocessors;
// reference family cell maps directly so coprocessors can mutate them if desired
Map<byte[], List<Cell>>[] familyCellMaps;
ObservedExceptionsInBatch observedExceptions;
Durability durability; //Durability of the batch (highest durability of all operations)
public BatchOperation(T[] operations) {
this.operations = operations;
this.retCodeDetails = new OperationStatus[operations.length];
this.walEditsFromCoprocessors = new WALEdit[operations.length];
Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
familyCellMaps = new Map[operations.length];
observedExceptions = new ObservedExceptionsInBatch();
durability = Durability.USE_DEFAULT;
}
public abstract Mutation getMutation(int index);
@ -3046,12 +3045,69 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public boolean isDone() {
return nextIndexToProcess == operations.length;
}
/**
* Validates each mutation and prepares a batch for write.
* NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called
* after prePut()/ preDelete() CP hooks are run for all mutations in a batch.
*/
public void checkAndPrepare(final HRegion region) throws IOException {
long now = EnvironmentEdgeManager.currentTime();
for (int i = 0 ; i < operations.length; i++) {
// Skip anything that "ran" already
if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
Mutation mutation = getMutation(i);
try {
region.checkAndPrepareMutation(mutation, isInReplay(), now);
// store the family map reference to allow for mutations
familyCellMaps[i] = mutation.getFamilyCellMap();
// store durability for the batch (highest durability of all operations in the batch)
Durability tmpDur = region.getEffectiveDurability(mutation.getDurability());
if (tmpDur.ordinal() > durability.ordinal()) {
durability = tmpDur;
}
} catch (NoSuchColumnFamilyException nscf) {
final String msg = "No such column family in batch mutation. ";
if (observedExceptions.hasSeenNoSuchFamily()) {
LOG.warn(msg + nscf.getMessage());
} else {
LOG.warn(msg, nscf);
observedExceptions.sawNoSuchFamily();
}
retCodeDetails[i] = new OperationStatus(
OperationStatusCode.BAD_FAMILY, nscf.getMessage());
} catch (FailedSanityCheckException fsce) {
final String msg = "Batch Mutation did not pass sanity check. ";
if (observedExceptions.hasSeenFailedSanityCheck()) {
LOG.warn(msg + fsce.getMessage());
} else {
LOG.warn(msg, fsce);
observedExceptions.sawFailedSanityCheck();
}
retCodeDetails[i] = new OperationStatus(
OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
} catch (WrongRegionException we) {
final String msg = "Batch mutation had a row that does not belong to this region. ";
if (observedExceptions.hasSeenWrongRegion()) {
LOG.warn(msg + we.getMessage());
} else {
LOG.warn(msg, we);
observedExceptions.sawWrongRegion();
}
retCodeDetails[i] = new OperationStatus(
OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
}
}
}
}
}
private static class MutationBatch extends BatchOperation<Mutation> {
private static class MutationBatchOperation extends BatchOperation<Mutation> {
private long nonceGroup;
private long nonce;
public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
public MutationBatchOperation(Mutation[] operations, long nonceGroup, long nonce) {
super(operations);
this.nonceGroup = nonceGroup;
this.nonce = nonce;
@ -3088,9 +3144,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
private static class ReplayBatch extends BatchOperation<MutationReplay> {
private static class ReplayBatchOperation extends BatchOperation<MutationReplay> {
private long replaySeqId = 0;
public ReplayBatch(MutationReplay[] operations, long seqId) {
public ReplayBatchOperation(MutationReplay[] operations, long seqId) {
super(operations);
this.replaySeqId = seqId;
}
@ -3133,10 +3189,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
// * coprocessor calls (see ex. BulkDeleteEndpoint).
// So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
return batchMutate(new MutationBatchOperation(mutations, nonceGroup, nonce));
}
@Override
public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
@ -3162,12 +3217,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
return statuses;
}
return batchMutate(new ReplayBatch(mutations, replaySeqId));
return batchMutate(new ReplayBatchOperation(mutations, replaySeqId));
}
/**
* Perform a batch of mutations.
* It supports only Put and Delete mutations and will ignore other types passed.
* It supports only Put and Delete mutations and will ignore other types passed. Operations in
* a batch are stored with highest durability specified of for all operations in a batch,
* except for {@link Durability#SKIP_WAL}.
* @param batchOp contains the list of mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.
@ -3187,8 +3244,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!initialized) {
this.writeRequestsCount.add(batchOp.operations.length);
if (!batchOp.isInReplay()) {
doPreBatchMutateHook(batchOp);
callPreMutateCPHooks(batchOp);
}
// validate and prepare batch for write, after CP pre-hooks
batchOp.checkAndPrepare(this);
initialized = true;
}
doMiniBatchMutate(batchOp);
@ -3201,8 +3260,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return batchOp.retCodeDetails;
}
private void doPreBatchMutateHook(BatchOperation<?> batchOp)
throws IOException {
/**
* Runs prePut/ preDelete coprocessor hooks for each mutation in a batch.
* @param batchOp
*/
private void callPreMutateCPHooks(BatchOperation<?> batchOp) throws IOException {
/* Run coprocessor pre hook outside of locks to avoid deadlock */
WALEdit walEdit = new WALEdit();
if (coprocessorHost != null) {
@ -3252,27 +3314,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long currentNonce = HConstants.NO_NONCE;
WALEdit walEdit = null;
boolean locked = false;
// reference family maps directly so coprocessors can mutate them if desired
Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex;
boolean success = false;
boolean doneByCoprocessor = false;
int noOfPuts = 0;
int noOfDeletes = 0;
WriteEntry writeEntry = null;
int cellCount = 0;
/** Keep track of the locks we hold so we can release them in finally clause */
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
MemStoreSize memstoreSize = new MemStoreSize();
final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch();
MemStoreSize memStoreSize = new MemStoreSize();
try {
// STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
int numReadyToWrite = 0;
long now = EnvironmentEdgeManager.currentTime();
while (lastIndexExclusive < batchOp.operations.length) {
if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) {
lastIndexExclusive++;
for (; lastIndexExclusive < batchOp.operations.length; lastIndexExclusive++) {
if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) {
continue;
}
Mutation mutation = batchOp.getMutation(lastIndexExclusive);
@ -3293,9 +3352,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
acquiredRowLocks.add(rowLock);
}
lastIndexExclusive++;
numReadyToWrite++;
if (replay) {
if (replay || getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {
for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
cellCount += cells.size();
}
@ -3303,42 +3361,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// We've now grabbed as many mutations off the list as we can
// STEP 2. Update any LATEST_TIMESTAMP timestamps
// We should record the timestamp only after we have acquired the rowLock,
// otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
now = EnvironmentEdgeManager.currentTime();
byte[] byteNow = Bytes.toBytes(now);
// Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
if (numReadyToWrite <= 0) {
return;
}
for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) {
// skip invalid
if (batchOp.retCodeDetails[i].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) {
// lastIndexExclusive was incremented above.
continue;
}
// STEP 2. Update any LATEST_TIMESTAMP timestamps
// We should record the timestamp only after we have acquired the rowLock,
// otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
long now = EnvironmentEdgeManager.currentTime();
if (!replay) {
byte[] byteNow = Bytes.toBytes(now);
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// skip invalid
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
// lastIndexExclusive was incremented above.
continue;
}
Mutation mutation = batchOp.getMutation(i);
if (mutation instanceof Put) {
updateCellTimestamps(familyMaps[i].values(), byteNow);
noOfPuts++;
} else {
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
noOfDeletes++;
}
rewriteCellTags(familyMaps[i], mutation);
WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
if (fromCP != null) {
cellCount += fromCP.size();
}
if (getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {
for (List<Cell> cells : familyMaps[i].values()) {
cellCount += cells.size();
Mutation mutation = batchOp.getMutation(i);
if (mutation instanceof Put) {
updateCellTimestamps(batchOp.familyCellMaps[i].values(), byteNow);
noOfPuts++;
} else {
prepareDeleteTimestamps(mutation, batchOp.familyCellMaps[i], byteNow);
noOfDeletes++;
}
rewriteCellTags(batchOp.familyCellMaps[i], mutation);
WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
if (fromCP != null) {
cellCount += fromCP.size();
}
}
}
@ -3351,6 +3403,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
if (coprocessorHost.preBatchMutate(miniBatchOp)) {
doneByCoprocessor = true;
return;
} else {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
@ -3368,15 +3421,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Else Coprocessor added more Mutations corresponding to the Mutation at this index.
for (int j = 0; j < cpMutations.length; j++) {
Mutation cpMutation = cpMutations[j];
Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now);
checkAndPrepareMutation(cpMutation, replay, now);
// Acquire row locks. If not, the whole batch will fail.
acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
// Returned mutations from coprocessor correspond to the Mutation at index i. We can
// directly add the cells from those mutations to the familyMaps of this mutation.
mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later
Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
// will get added to the memStore later
mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap);
// The durability of returned mutation is replaced by the corresponding mutation.
// If the corresponding mutation contains the SKIP_WAL, we shouldn't count the
@ -3393,7 +3447,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// STEP 3. Build WAL edit
walEdit = new WALEdit(cellCount, replay);
Durability durability = Durability.USE_DEFAULT;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
@ -3401,12 +3454,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
Mutation m = batchOp.getMutation(i);
Durability tmpDur = getEffectiveDurability(m.getDurability());
if (tmpDur.ordinal() > durability.ordinal()) {
durability = tmpDur;
}
// we use durability of the original mutation for the mutation passed by CP.
if (tmpDur == Durability.SKIP_WAL) {
if (getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
recordMutationWithoutWal(m.getFamilyCellMap());
continue;
}
@ -3431,58 +3480,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
walEdit.add(cell);
}
}
addFamilyMapToWALEdit(familyMaps[i], walEdit);
addFamilyMapToWALEdit(batchOp.familyCellMaps[i], walEdit);
}
// STEP 4. Append the final edit to WAL and sync.
Mutation mutation = batchOp.getMutation(firstIndex);
WALKey walKey = null;
long txid;
if (replay) {
// use wal key from the original
walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
if (!walEdit.isEmpty()) {
txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
if (txid != 0) {
sync(txid, durability);
}
}
} else {
try {
if (!walEdit.isEmpty()) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc,
this.getReplicationScope());
// TODO: Use the doAppend methods below... complicated by the replay stuff above.
txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
if (txid != 0) {
sync(txid, durability);
}
if (writeEntry == null) {
// if MVCC not preassigned, wait here until assigned
writeEntry = walKey.getWriteEntry();
}
}
} catch (IOException ioe) {
if (walKey != null && writeEntry == null) {
// the writeEntry is not preassigned and error occurred during append or sync
mvcc.complete(walKey.getWriteEntry());
}
throw ioe;
}
}
if (walKey == null) {
// If no walKey, then not in replay and skipping WAL or some such. Begin an MVCC transaction
// to get sequence id.
writeEntry = doWALAppend(walEdit, batchOp.durability, mutation.getClusterIds(), now,
currentNonceGroup, currentNonce,
replay ? batchOp.getReplaySequenceId() : WALKey.NO_SEQUENCE_ID);
if (!replay && writeEntry == null) {
// If no writeEntry, then not in replay and skipping WAL or some such. Begin an MVCC
// transaction to get sequence id.
writeEntry = mvcc.begin();
}
// STEP 5. Write back to memstore
// STEP 5. Write back to memStore
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
continue;
@ -3493,14 +3505,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// we use durability of the original mutation for the mutation passed by CP.
boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
if (updateSeqId) {
this.updateSequenceId(familyMaps[i].values(),
this.updateSequenceId(batchOp.familyCellMaps[i].values(),
replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
}
applyFamilyMapToMemStore(familyMaps[i], memstoreSize);
applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreSize);
}
// update memstore size
this.addAndGetMemStoreSize(memstoreSize);
this.addAndGetMemStoreSize(memStoreSize);
// calling the post CP hook for batch mutation
if (!replay && coprocessorHost != null) {
@ -3511,30 +3523,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// STEP 6. Complete mvcc.
if (replay) {
this.mvcc.advanceTo(batchOp.getReplaySequenceId());
} else {
// writeEntry won't be empty if not in replay mode
if (writeEntry != null) {
mvcc.completeAndWait(writeEntry);
writeEntry = null;
}
if (replay) {
this.mvcc.advanceTo(batchOp.getReplaySequenceId());
}
success = true;
} finally {
// Call complete rather than completeAndWait because we probably had error if walKey != null
if (writeEntry != null) mvcc.complete(writeEntry);
// STEP 7. Release row locks, etc.
if (locked) {
this.updatesLock.readLock().unlock();
locked = false;
}
releaseRowLocks(acquiredRowLocks);
for (int i = firstIndex; i < lastIndexExclusive; i ++) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
batchOp.retCodeDetails[i] =
success || doneByCoprocessor ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
}
}
// STEP 8. Run coprocessor post hooks. This should be done after the wal is
// synced so that the coprocessor contract is adhered to.
if (!replay && coprocessorHost != null) {
if (!replay && coprocessorHost != null && !doneByCoprocessor) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// only for successful puts
if (batchOp.retCodeDetails[i].getOperationStatusCode()
@ -3550,15 +3565,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
success = true;
} finally {
// Call complete rather than completeAndWait because we probably had error if walKey != null
if (writeEntry != null) mvcc.complete(writeEntry);
if (locked) {
this.updatesLock.readLock().unlock();
}
releaseRowLocks(acquiredRowLocks);
// See if the column families were consistent through the whole thing.
// if they were then keep them. If they were not then pass a null.
// null will be treated as unknown.
@ -3577,13 +3583,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.metricsRegion.updateDelete();
}
}
if (!success) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
}
}
}
if (coprocessorHost != null && !batchOp.isInReplay()) {
// call the coprocessor hook to do any finalization steps
// after the put is done
@ -3622,75 +3622,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.mvcc.complete(walKey.getWriteEntry());
}
private boolean checkBatchOp(BatchOperation<?> batchOp, final int lastIndexExclusive,
final Map<byte[], List<Cell>>[] familyMaps, final long now,
final ObservedExceptionsInBatch observedExceptions)
throws IOException {
boolean skip = false;
// Skip anything that "ran" already
if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) {
return true;
}
Mutation mutation = batchOp.getMutation(lastIndexExclusive);
Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
// store the family map reference to allow for mutations
familyMaps[lastIndexExclusive] = familyMap;
try {
checkAndPrepareMutation(mutation, batchOp.isInReplay(), familyMap, now);
} catch (NoSuchColumnFamilyException nscf) {
final String msg = "No such column family in batch mutation. ";
if (observedExceptions.hasSeenNoSuchFamily()) {
LOG.warn(msg + nscf.getMessage());
} else {
LOG.warn(msg, nscf);
observedExceptions.sawNoSuchFamily();
}
batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
OperationStatusCode.BAD_FAMILY, nscf.getMessage());
skip = true;
} catch (FailedSanityCheckException fsce) {
final String msg = "Batch Mutation did not pass sanity check. ";
if (observedExceptions.hasSeenFailedSanityCheck()) {
LOG.warn(msg + fsce.getMessage());
} else {
LOG.warn(msg, fsce);
observedExceptions.sawFailedSanityCheck();
}
batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
skip = true;
} catch (WrongRegionException we) {
final String msg = "Batch mutation had a row that does not belong to this region. ";
if (observedExceptions.hasSeenWrongRegion()) {
LOG.warn(msg + we.getMessage());
} else {
LOG.warn(msg, we);
observedExceptions.sawWrongRegion();
}
batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
skip = true;
}
return skip;
}
private void checkAndPrepareMutation(Mutation mutation, boolean replay,
final Map<byte[], List<Cell>> familyMap, final long now)
throws IOException {
private void checkAndPrepareMutation(Mutation mutation, boolean replay, final long now)
throws IOException {
checkRow(mutation.getRow(), "doMiniBatchMutation");
if (mutation instanceof Put) {
// Check the families in the put. If bad, skip this one.
if (replay) {
removeNonExistentColumnFamilyForReplay(familyMap);
removeNonExistentColumnFamilyForReplay(mutation.getFamilyCellMap());
} else {
checkFamilies(familyMap.keySet());
checkFamilies(mutation.getFamilyCellMap().keySet());
}
checkTimestamps(mutation.getFamilyCellMap(), now);
} else {
prepareDelete((Delete)mutation);
}
checkRow(mutation.getRow(), "doMiniBatchMutation");
}
/**
@ -3721,7 +3666,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* the table descriptor.
*/
protected Durability getEffectiveDurability(Durability d) {
return d == Durability.USE_DEFAULT ? this.durability : d;
return d == Durability.USE_DEFAULT ? this.regionDurability : d;
}
@Override
@ -7430,28 +7375,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
nonceGroup, nonce);
}
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
long now, long nonceGroup, long nonce) throws IOException {
return doWALAppend(walEdit, durability, clusterIds, now, nonceGroup, nonce,
WALKey.NO_SEQUENCE_ID);
}
/**
* @return writeEntry associated with this append
*/
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
long now, long nonceGroup, long nonce)
throws IOException {
long now, long nonceGroup, long nonce, long replaySeqId) throws IOException {
Preconditions.checkArgument(!walEdit.isReplay() || replaySeqId != WALKey.NO_SEQUENCE_ID,
"Invalid replay sequence Id for replay WALEdit!");
WriteEntry writeEntry = null;
// Using default cluster id, as this can only happen in the originating cluster.
// A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
// here instead of WALKey directly to support legacy coprocessors.
WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce, mvcc, this.getReplicationScope());
try {
long txid =
this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
// Call sync on our edit.
if (txid != 0) sync(txid, durability);
writeEntry = walKey.getWriteEntry();
} catch (IOException ioe) {
if (walKey != null) mvcc.complete(walKey.getWriteEntry());
throw ioe;
if (!walEdit.isEmpty()) {
// Using default cluster id, as this can only happen in the originating cluster.
// A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
// here instead of WALKey directly to support legacy coprocessors.
WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup,
nonce, mvcc) :
new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce, mvcc, this.getReplicationScope());
if (walEdit.isReplay()) {
walKey.setOrigLogSeqNum(replaySeqId);
}
try {
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
// Call sync on our edit.
if (txid != 0) sync(txid, durability);
writeEntry = walKey.getWriteEntry();
} catch (IOException ioe) {
if (walKey != null) mvcc.complete(walKey.getWriteEntry());
throw ioe;
}
}
return writeEntry;
}
@ -7845,13 +7804,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.explicitSplitPoint = null;
}
/**
* Give the region a chance to prepare before it is split.
*/
protected void prepareToSplit() {
// nothing
}
/**
* Return the splitpoint. null indicates the region isn't splittable
* If the splitpoint isn't explicitly specified, it will go over the stores
@ -8115,7 +8067,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Check whether we should sync the wal from the table's durability settings
*/
private boolean shouldSyncWAL() {
return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
return regionDurability.ordinal() > Durability.ASYNC_WAL.ordinal();
}
/**

View File

@ -309,6 +309,27 @@ public class TestHRegionReplayEvents {
TEST_UTIL.getConfiguration());
}
@Test
public void testBatchReplayWithMultipleNonces() throws IOException {
try {
MutationReplay[] mutations = new MutationReplay[100];
for (int i = 0; i < 100; i++) {
Put put = new Put(Bytes.toBytes(i));
put.setDurability(Durability.SYNC_WAL);
for (byte[] familly : this.families) {
put.addColumn(familly, this.cq, null);
long nonceNum = i / 10;
mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
}
}
primaryRegion.batchReplay(mutations, 20);
} catch (Exception e) {
String msg = "Error while replay of batch with multiple nonces. ";
LOG.error(msg, e);
fail(msg + e.getMessage());
}
}
@Test
public void testReplayFlushesAndCompactions() throws IOException {
// initiate a secondary region with some data.