HBASE-18960 A few bug fixes and minor improvements around batchMutate

* batch validation and preparation is done before we start iterating over operations for writes
* durability, familyCellMaps and observedExceptions are batch wide and are now sotred in BatchOperation,
  as a result durability is consistent across all operations in a batch
* for all operations done by preBatchMutate() CP hook, operation status is updated to success
* doWALAppend() is modified to habdle replay and is used from doMiniBatchMutate()
* minor improvements

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Umesh Agashe 2017-10-06 15:40:05 -07:00 committed by Michael Stack
parent 2cb64fb467
commit 08dff49620
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
2 changed files with 207 additions and 234 deletions

View File

@ -661,7 +661,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final MetricsRegion metricsRegion; private final MetricsRegion metricsRegion;
private final MetricsRegionWrapperImpl metricsRegionWrapper; private final MetricsRegionWrapperImpl metricsRegionWrapper;
private final Durability durability; private final Durability regionDurability;
private final boolean regionStatsEnabled; private final boolean regionStatsEnabled;
// Stores the replication scope of the various column families of the table // Stores the replication scope of the various column families of the table
// that has non-default scope // that has non-default scope
@ -787,9 +787,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/ */
this.rowProcessorTimeout = conf.getLong( this.rowProcessorTimeout = conf.getLong(
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
this.durability = htd.getDurability() == Durability.USE_DEFAULT this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ?
? DEFAULT_DURABILITY DEFAULT_DURABILITY : htd.getDurability();
: htd.getDurability();
if (rsServices != null) { if (rsServices != null) {
this.rsAccounting = this.rsServices.getRegionServerAccounting(); this.rsAccounting = this.rsServices.getRegionServerAccounting();
// don't initialize coprocessors if not running within a regionserver // don't initialize coprocessors if not running within a regionserver
@ -1944,13 +1943,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// These methods are meant to be called periodically by the HRegionServer for // These methods are meant to be called periodically by the HRegionServer for
// upkeep. // upkeep.
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/**
* @return returns size of largest HStore.
*/
public long getLargestHStoreSize() {
return stores.values().stream().mapToLong(HStore::getSize).max().orElse(0L);
}
/** /**
* Do preparation for pending compaction. * Do preparation for pending compaction.
* @throws IOException * @throws IOException
@ -3018,21 +3010,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
/** /**
* Struct-like class that tracks the progress of a batch operation, * Struct-like class that tracks the progress of a batch operation, accumulating status codes
* accumulating status codes and tracking the index at which processing * and tracking the index at which processing is proceeding. These batch operations may get
* is proceeding. * split into mini-batches for processing.
*/ */
private abstract static class BatchOperation<T> { private abstract static class BatchOperation<T> {
T[] operations; T[] operations;
int nextIndexToProcess = 0; int nextIndexToProcess = 0;
OperationStatus[] retCodeDetails; OperationStatus[] retCodeDetails;
WALEdit[] walEditsFromCoprocessors; WALEdit[] walEditsFromCoprocessors;
// reference family cell maps directly so coprocessors can mutate them if desired
Map<byte[], List<Cell>>[] familyCellMaps;
ObservedExceptionsInBatch observedExceptions;
Durability durability; //Durability of the batch (highest durability of all operations)
public BatchOperation(T[] operations) { public BatchOperation(T[] operations) {
this.operations = operations; this.operations = operations;
this.retCodeDetails = new OperationStatus[operations.length]; this.retCodeDetails = new OperationStatus[operations.length];
this.walEditsFromCoprocessors = new WALEdit[operations.length]; this.walEditsFromCoprocessors = new WALEdit[operations.length];
Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
familyCellMaps = new Map[operations.length];
observedExceptions = new ObservedExceptionsInBatch();
durability = Durability.USE_DEFAULT;
} }
public abstract Mutation getMutation(int index); public abstract Mutation getMutation(int index);
@ -3046,12 +3045,69 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public boolean isDone() { public boolean isDone() {
return nextIndexToProcess == operations.length; return nextIndexToProcess == operations.length;
} }
/**
* Validates each mutation and prepares a batch for write.
* NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called
* after prePut()/ preDelete() CP hooks are run for all mutations in a batch.
*/
public void checkAndPrepare(final HRegion region) throws IOException {
long now = EnvironmentEdgeManager.currentTime();
for (int i = 0 ; i < operations.length; i++) {
// Skip anything that "ran" already
if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
Mutation mutation = getMutation(i);
try {
region.checkAndPrepareMutation(mutation, isInReplay(), now);
// store the family map reference to allow for mutations
familyCellMaps[i] = mutation.getFamilyCellMap();
// store durability for the batch (highest durability of all operations in the batch)
Durability tmpDur = region.getEffectiveDurability(mutation.getDurability());
if (tmpDur.ordinal() > durability.ordinal()) {
durability = tmpDur;
}
} catch (NoSuchColumnFamilyException nscf) {
final String msg = "No such column family in batch mutation. ";
if (observedExceptions.hasSeenNoSuchFamily()) {
LOG.warn(msg + nscf.getMessage());
} else {
LOG.warn(msg, nscf);
observedExceptions.sawNoSuchFamily();
}
retCodeDetails[i] = new OperationStatus(
OperationStatusCode.BAD_FAMILY, nscf.getMessage());
} catch (FailedSanityCheckException fsce) {
final String msg = "Batch Mutation did not pass sanity check. ";
if (observedExceptions.hasSeenFailedSanityCheck()) {
LOG.warn(msg + fsce.getMessage());
} else {
LOG.warn(msg, fsce);
observedExceptions.sawFailedSanityCheck();
}
retCodeDetails[i] = new OperationStatus(
OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
} catch (WrongRegionException we) {
final String msg = "Batch mutation had a row that does not belong to this region. ";
if (observedExceptions.hasSeenWrongRegion()) {
LOG.warn(msg + we.getMessage());
} else {
LOG.warn(msg, we);
observedExceptions.sawWrongRegion();
}
retCodeDetails[i] = new OperationStatus(
OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
}
}
}
}
} }
private static class MutationBatch extends BatchOperation<Mutation> { private static class MutationBatchOperation extends BatchOperation<Mutation> {
private long nonceGroup; private long nonceGroup;
private long nonce; private long nonce;
public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) { public MutationBatchOperation(Mutation[] operations, long nonceGroup, long nonce) {
super(operations); super(operations);
this.nonceGroup = nonceGroup; this.nonceGroup = nonceGroup;
this.nonce = nonce; this.nonce = nonce;
@ -3088,9 +3144,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
private static class ReplayBatch extends BatchOperation<MutationReplay> { private static class ReplayBatchOperation extends BatchOperation<MutationReplay> {
private long replaySeqId = 0; private long replaySeqId = 0;
public ReplayBatch(MutationReplay[] operations, long seqId) { public ReplayBatchOperation(MutationReplay[] operations, long seqId) {
super(operations); super(operations);
this.replaySeqId = seqId; this.replaySeqId = seqId;
} }
@ -3133,10 +3189,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// * batchMutate with single mutation - put/delete, separate or from checkAndMutate. // * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
// * coprocessor calls (see ex. BulkDeleteEndpoint). // * coprocessor calls (see ex. BulkDeleteEndpoint).
// So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
return batchMutate(new MutationBatch(mutations, nonceGroup, nonce)); return batchMutate(new MutationBatchOperation(mutations, nonceGroup, nonce));
} }
@Override
public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
} }
@ -3162,12 +3217,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
return statuses; return statuses;
} }
return batchMutate(new ReplayBatch(mutations, replaySeqId)); return batchMutate(new ReplayBatchOperation(mutations, replaySeqId));
} }
/** /**
* Perform a batch of mutations. * Perform a batch of mutations.
* It supports only Put and Delete mutations and will ignore other types passed. * It supports only Put and Delete mutations and will ignore other types passed. Operations in
* a batch are stored with highest durability specified of for all operations in a batch,
* except for {@link Durability#SKIP_WAL}.
* @param batchOp contains the list of mutations * @param batchOp contains the list of mutations
* @return an array of OperationStatus which internally contains the * @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any. * OperationStatusCode and the exceptionMessage if any.
@ -3187,8 +3244,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!initialized) { if (!initialized) {
this.writeRequestsCount.add(batchOp.operations.length); this.writeRequestsCount.add(batchOp.operations.length);
if (!batchOp.isInReplay()) { if (!batchOp.isInReplay()) {
doPreBatchMutateHook(batchOp); callPreMutateCPHooks(batchOp);
} }
// validate and prepare batch for write, after CP pre-hooks
batchOp.checkAndPrepare(this);
initialized = true; initialized = true;
} }
doMiniBatchMutate(batchOp); doMiniBatchMutate(batchOp);
@ -3201,8 +3260,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return batchOp.retCodeDetails; return batchOp.retCodeDetails;
} }
private void doPreBatchMutateHook(BatchOperation<?> batchOp) /**
throws IOException { * Runs prePut/ preDelete coprocessor hooks for each mutation in a batch.
* @param batchOp
*/
private void callPreMutateCPHooks(BatchOperation<?> batchOp) throws IOException {
/* Run coprocessor pre hook outside of locks to avoid deadlock */ /* Run coprocessor pre hook outside of locks to avoid deadlock */
WALEdit walEdit = new WALEdit(); WALEdit walEdit = new WALEdit();
if (coprocessorHost != null) { if (coprocessorHost != null) {
@ -3252,27 +3314,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long currentNonce = HConstants.NO_NONCE; long currentNonce = HConstants.NO_NONCE;
WALEdit walEdit = null; WALEdit walEdit = null;
boolean locked = false; boolean locked = false;
// reference family maps directly so coprocessors can mutate them if desired
Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
// We try to set up a batch in the range [firstIndex,lastIndexExclusive) // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess; int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex; int lastIndexExclusive = firstIndex;
boolean success = false; boolean success = false;
boolean doneByCoprocessor = false;
int noOfPuts = 0; int noOfPuts = 0;
int noOfDeletes = 0; int noOfDeletes = 0;
WriteEntry writeEntry = null; WriteEntry writeEntry = null;
int cellCount = 0; int cellCount = 0;
/** Keep track of the locks we hold so we can release them in finally clause */ /** Keep track of the locks we hold so we can release them in finally clause */
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
MemStoreSize memstoreSize = new MemStoreSize(); MemStoreSize memStoreSize = new MemStoreSize();
final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch();
try { try {
// STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
int numReadyToWrite = 0; int numReadyToWrite = 0;
long now = EnvironmentEdgeManager.currentTime(); for (; lastIndexExclusive < batchOp.operations.length; lastIndexExclusive++) {
while (lastIndexExclusive < batchOp.operations.length) { if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) { != OperationStatusCode.NOT_RUN) {
lastIndexExclusive++;
continue; continue;
} }
Mutation mutation = batchOp.getMutation(lastIndexExclusive); Mutation mutation = batchOp.getMutation(lastIndexExclusive);
@ -3293,9 +3352,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
acquiredRowLocks.add(rowLock); acquiredRowLocks.add(rowLock);
} }
lastIndexExclusive++;
numReadyToWrite++; numReadyToWrite++;
if (replay) { if (replay || getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {
for (List<Cell> cells : mutation.getFamilyCellMap().values()) { for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
cellCount += cells.size(); cellCount += cells.size();
} }
@ -3303,42 +3361,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
// We've now grabbed as many mutations off the list as we can // We've now grabbed as many mutations off the list as we can
// STEP 2. Update any LATEST_TIMESTAMP timestamps
// We should record the timestamp only after we have acquired the rowLock,
// otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
now = EnvironmentEdgeManager.currentTime();
byte[] byteNow = Bytes.toBytes(now);
// Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
if (numReadyToWrite <= 0) { if (numReadyToWrite <= 0) {
return; return;
} }
for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) { // STEP 2. Update any LATEST_TIMESTAMP timestamps
// skip invalid // We should record the timestamp only after we have acquired the rowLock,
if (batchOp.retCodeDetails[i].getOperationStatusCode() // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
!= OperationStatusCode.NOT_RUN) { long now = EnvironmentEdgeManager.currentTime();
// lastIndexExclusive was incremented above. if (!replay) {
continue; byte[] byteNow = Bytes.toBytes(now);
} for (int i = firstIndex; i < lastIndexExclusive; i++) {
// skip invalid
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
// lastIndexExclusive was incremented above.
continue;
}
Mutation mutation = batchOp.getMutation(i); Mutation mutation = batchOp.getMutation(i);
if (mutation instanceof Put) { if (mutation instanceof Put) {
updateCellTimestamps(familyMaps[i].values(), byteNow); updateCellTimestamps(batchOp.familyCellMaps[i].values(), byteNow);
noOfPuts++; noOfPuts++;
} else { } else {
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); prepareDeleteTimestamps(mutation, batchOp.familyCellMaps[i], byteNow);
noOfDeletes++; noOfDeletes++;
} }
rewriteCellTags(familyMaps[i], mutation); rewriteCellTags(batchOp.familyCellMaps[i], mutation);
WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
if (fromCP != null) { if (fromCP != null) {
cellCount += fromCP.size(); cellCount += fromCP.size();
}
if (getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {
for (List<Cell> cells : familyMaps[i].values()) {
cellCount += cells.size();
} }
} }
} }
@ -3351,6 +3403,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
if (coprocessorHost.preBatchMutate(miniBatchOp)) { if (coprocessorHost.preBatchMutate(miniBatchOp)) {
doneByCoprocessor = true;
return; return;
} else { } else {
for (int i = firstIndex; i < lastIndexExclusive; i++) { for (int i = firstIndex; i < lastIndexExclusive; i++) {
@ -3368,15 +3421,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Else Coprocessor added more Mutations corresponding to the Mutation at this index. // Else Coprocessor added more Mutations corresponding to the Mutation at this index.
for (int j = 0; j < cpMutations.length; j++) { for (int j = 0; j < cpMutations.length; j++) {
Mutation cpMutation = cpMutations[j]; Mutation cpMutation = cpMutations[j];
Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap(); checkAndPrepareMutation(cpMutation, replay, now);
checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now);
// Acquire row locks. If not, the whole batch will fail. // Acquire row locks. If not, the whole batch will fail.
acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
// Returned mutations from coprocessor correspond to the Mutation at index i. We can // Returned mutations from coprocessor correspond to the Mutation at index i. We can
// directly add the cells from those mutations to the familyMaps of this mutation. // directly add the cells from those mutations to the familyMaps of this mutation.
mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
// will get added to the memStore later
mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap);
// The durability of returned mutation is replaced by the corresponding mutation. // The durability of returned mutation is replaced by the corresponding mutation.
// If the corresponding mutation contains the SKIP_WAL, we shouldn't count the // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the
@ -3393,7 +3447,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// STEP 3. Build WAL edit // STEP 3. Build WAL edit
walEdit = new WALEdit(cellCount, replay); walEdit = new WALEdit(cellCount, replay);
Durability durability = Durability.USE_DEFAULT;
for (int i = firstIndex; i < lastIndexExclusive; i++) { for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing // Skip puts that were determined to be invalid during preprocessing
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
@ -3401,12 +3454,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
Mutation m = batchOp.getMutation(i); Mutation m = batchOp.getMutation(i);
Durability tmpDur = getEffectiveDurability(m.getDurability());
if (tmpDur.ordinal() > durability.ordinal()) {
durability = tmpDur;
}
// we use durability of the original mutation for the mutation passed by CP. // we use durability of the original mutation for the mutation passed by CP.
if (tmpDur == Durability.SKIP_WAL) { if (getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
recordMutationWithoutWal(m.getFamilyCellMap()); recordMutationWithoutWal(m.getFamilyCellMap());
continue; continue;
} }
@ -3431,58 +3480,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
walEdit.add(cell); walEdit.add(cell);
} }
} }
addFamilyMapToWALEdit(familyMaps[i], walEdit); addFamilyMapToWALEdit(batchOp.familyCellMaps[i], walEdit);
} }
// STEP 4. Append the final edit to WAL and sync. // STEP 4. Append the final edit to WAL and sync.
Mutation mutation = batchOp.getMutation(firstIndex); Mutation mutation = batchOp.getMutation(firstIndex);
WALKey walKey = null; writeEntry = doWALAppend(walEdit, batchOp.durability, mutation.getClusterIds(), now,
long txid; currentNonceGroup, currentNonce,
if (replay) { replay ? batchOp.getReplaySequenceId() : WALKey.NO_SEQUENCE_ID);
// use wal key from the original if (!replay && writeEntry == null) {
walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), // If no writeEntry, then not in replay and skipping WAL or some such. Begin an MVCC
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, // transaction to get sequence id.
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
if (!walEdit.isEmpty()) {
txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
if (txid != 0) {
sync(txid, durability);
}
}
} else {
try {
if (!walEdit.isEmpty()) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc,
this.getReplicationScope());
// TODO: Use the doAppend methods below... complicated by the replay stuff above.
txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
if (txid != 0) {
sync(txid, durability);
}
if (writeEntry == null) {
// if MVCC not preassigned, wait here until assigned
writeEntry = walKey.getWriteEntry();
}
}
} catch (IOException ioe) {
if (walKey != null && writeEntry == null) {
// the writeEntry is not preassigned and error occurred during append or sync
mvcc.complete(walKey.getWriteEntry());
}
throw ioe;
}
}
if (walKey == null) {
// If no walKey, then not in replay and skipping WAL or some such. Begin an MVCC transaction
// to get sequence id.
writeEntry = mvcc.begin(); writeEntry = mvcc.begin();
} }
// STEP 5. Write back to memstore // STEP 5. Write back to memStore
for (int i = firstIndex; i < lastIndexExclusive; i++) { for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
continue; continue;
@ -3493,14 +3505,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// we use durability of the original mutation for the mutation passed by CP. // we use durability of the original mutation for the mutation passed by CP.
boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
if (updateSeqId) { if (updateSeqId) {
this.updateSequenceId(familyMaps[i].values(), this.updateSequenceId(batchOp.familyCellMaps[i].values(),
replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
} }
applyFamilyMapToMemStore(familyMaps[i], memstoreSize); applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreSize);
} }
// update memstore size // update memstore size
this.addAndGetMemStoreSize(memstoreSize); this.addAndGetMemStoreSize(memStoreSize);
// calling the post CP hook for batch mutation // calling the post CP hook for batch mutation
if (!replay && coprocessorHost != null) { if (!replay && coprocessorHost != null) {
@ -3511,30 +3523,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
// STEP 6. Complete mvcc. // STEP 6. Complete mvcc.
if (replay) { if (writeEntry != null) {
this.mvcc.advanceTo(batchOp.getReplaySequenceId());
} else {
// writeEntry won't be empty if not in replay mode
mvcc.completeAndWait(writeEntry); mvcc.completeAndWait(writeEntry);
writeEntry = null; writeEntry = null;
} }
if (replay) {
this.mvcc.advanceTo(batchOp.getReplaySequenceId());
}
success = true;
} finally {
// Call complete rather than completeAndWait because we probably had error if walKey != null
if (writeEntry != null) mvcc.complete(writeEntry);
// STEP 7. Release row locks, etc.
if (locked) { if (locked) {
this.updatesLock.readLock().unlock(); this.updatesLock.readLock().unlock();
locked = false;
} }
releaseRowLocks(acquiredRowLocks); releaseRowLocks(acquiredRowLocks);
for (int i = firstIndex; i < lastIndexExclusive; i ++) { for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; batchOp.retCodeDetails[i] =
success || doneByCoprocessor ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
} }
} }
// STEP 8. Run coprocessor post hooks. This should be done after the wal is
// synced so that the coprocessor contract is adhered to. // synced so that the coprocessor contract is adhered to.
if (!replay && coprocessorHost != null) { if (!replay && coprocessorHost != null && !doneByCoprocessor) {
for (int i = firstIndex; i < lastIndexExclusive; i++) { for (int i = firstIndex; i < lastIndexExclusive; i++) {
// only for successful puts // only for successful puts
if (batchOp.retCodeDetails[i].getOperationStatusCode() if (batchOp.retCodeDetails[i].getOperationStatusCode()
@ -3550,15 +3565,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
success = true;
} finally {
// Call complete rather than completeAndWait because we probably had error if walKey != null
if (writeEntry != null) mvcc.complete(writeEntry);
if (locked) {
this.updatesLock.readLock().unlock();
}
releaseRowLocks(acquiredRowLocks);
// See if the column families were consistent through the whole thing. // See if the column families were consistent through the whole thing.
// if they were then keep them. If they were not then pass a null. // if they were then keep them. If they were not then pass a null.
// null will be treated as unknown. // null will be treated as unknown.
@ -3577,13 +3583,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.metricsRegion.updateDelete(); this.metricsRegion.updateDelete();
} }
} }
if (!success) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
}
}
}
if (coprocessorHost != null && !batchOp.isInReplay()) { if (coprocessorHost != null && !batchOp.isInReplay()) {
// call the coprocessor hook to do any finalization steps // call the coprocessor hook to do any finalization steps
// after the put is done // after the put is done
@ -3622,75 +3622,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.mvcc.complete(walKey.getWriteEntry()); this.mvcc.complete(walKey.getWriteEntry());
} }
private boolean checkBatchOp(BatchOperation<?> batchOp, final int lastIndexExclusive, private void checkAndPrepareMutation(Mutation mutation, boolean replay, final long now)
final Map<byte[], List<Cell>>[] familyMaps, final long now, throws IOException {
final ObservedExceptionsInBatch observedExceptions) checkRow(mutation.getRow(), "doMiniBatchMutation");
throws IOException {
boolean skip = false;
// Skip anything that "ran" already
if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) {
return true;
}
Mutation mutation = batchOp.getMutation(lastIndexExclusive);
Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
// store the family map reference to allow for mutations
familyMaps[lastIndexExclusive] = familyMap;
try {
checkAndPrepareMutation(mutation, batchOp.isInReplay(), familyMap, now);
} catch (NoSuchColumnFamilyException nscf) {
final String msg = "No such column family in batch mutation. ";
if (observedExceptions.hasSeenNoSuchFamily()) {
LOG.warn(msg + nscf.getMessage());
} else {
LOG.warn(msg, nscf);
observedExceptions.sawNoSuchFamily();
}
batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
OperationStatusCode.BAD_FAMILY, nscf.getMessage());
skip = true;
} catch (FailedSanityCheckException fsce) {
final String msg = "Batch Mutation did not pass sanity check. ";
if (observedExceptions.hasSeenFailedSanityCheck()) {
LOG.warn(msg + fsce.getMessage());
} else {
LOG.warn(msg, fsce);
observedExceptions.sawFailedSanityCheck();
}
batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
skip = true;
} catch (WrongRegionException we) {
final String msg = "Batch mutation had a row that does not belong to this region. ";
if (observedExceptions.hasSeenWrongRegion()) {
LOG.warn(msg + we.getMessage());
} else {
LOG.warn(msg, we);
observedExceptions.sawWrongRegion();
}
batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
skip = true;
}
return skip;
}
private void checkAndPrepareMutation(Mutation mutation, boolean replay,
final Map<byte[], List<Cell>> familyMap, final long now)
throws IOException {
if (mutation instanceof Put) { if (mutation instanceof Put) {
// Check the families in the put. If bad, skip this one. // Check the families in the put. If bad, skip this one.
if (replay) { if (replay) {
removeNonExistentColumnFamilyForReplay(familyMap); removeNonExistentColumnFamilyForReplay(mutation.getFamilyCellMap());
} else { } else {
checkFamilies(familyMap.keySet()); checkFamilies(mutation.getFamilyCellMap().keySet());
} }
checkTimestamps(mutation.getFamilyCellMap(), now); checkTimestamps(mutation.getFamilyCellMap(), now);
} else { } else {
prepareDelete((Delete)mutation); prepareDelete((Delete)mutation);
} }
checkRow(mutation.getRow(), "doMiniBatchMutation");
} }
/** /**
@ -3721,7 +3666,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* the table descriptor. * the table descriptor.
*/ */
protected Durability getEffectiveDurability(Durability d) { protected Durability getEffectiveDurability(Durability d) {
return d == Durability.USE_DEFAULT ? this.durability : d; return d == Durability.USE_DEFAULT ? this.regionDurability : d;
} }
@Override @Override
@ -7430,28 +7375,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
nonceGroup, nonce); nonceGroup, nonce);
} }
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
long now, long nonceGroup, long nonce) throws IOException {
return doWALAppend(walEdit, durability, clusterIds, now, nonceGroup, nonce,
WALKey.NO_SEQUENCE_ID);
}
/** /**
* @return writeEntry associated with this append * @return writeEntry associated with this append
*/ */
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds, private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
long now, long nonceGroup, long nonce) long now, long nonceGroup, long nonce, long replaySeqId) throws IOException {
throws IOException { Preconditions.checkArgument(!walEdit.isReplay() || replaySeqId != WALKey.NO_SEQUENCE_ID,
"Invalid replay sequence Id for replay WALEdit!");
WriteEntry writeEntry = null; WriteEntry writeEntry = null;
// Using default cluster id, as this can only happen in the originating cluster. if (!walEdit.isEmpty()) {
// A slave cluster receives the final value (not the delta) as a Put. We use HLogKey // Using default cluster id, as this can only happen in the originating cluster.
// here instead of WALKey directly to support legacy coprocessors. // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), // here instead of WALKey directly to support legacy coprocessors.
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
nonceGroup, nonce, mvcc, this.getReplicationScope()); this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup,
try { nonce, mvcc) :
long txid = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.wal.append(this.getRegionInfo(), walKey, walEdit, true); this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
// Call sync on our edit. nonceGroup, nonce, mvcc, this.getReplicationScope());
if (txid != 0) sync(txid, durability); if (walEdit.isReplay()) {
writeEntry = walKey.getWriteEntry(); walKey.setOrigLogSeqNum(replaySeqId);
} catch (IOException ioe) { }
if (walKey != null) mvcc.complete(walKey.getWriteEntry()); try {
throw ioe; long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
// Call sync on our edit.
if (txid != 0) sync(txid, durability);
writeEntry = walKey.getWriteEntry();
} catch (IOException ioe) {
if (walKey != null) mvcc.complete(walKey.getWriteEntry());
throw ioe;
}
} }
return writeEntry; return writeEntry;
} }
@ -7845,13 +7804,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.explicitSplitPoint = null; this.explicitSplitPoint = null;
} }
/**
* Give the region a chance to prepare before it is split.
*/
protected void prepareToSplit() {
// nothing
}
/** /**
* Return the splitpoint. null indicates the region isn't splittable * Return the splitpoint. null indicates the region isn't splittable
* If the splitpoint isn't explicitly specified, it will go over the stores * If the splitpoint isn't explicitly specified, it will go over the stores
@ -8115,7 +8067,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Check whether we should sync the wal from the table's durability settings * Check whether we should sync the wal from the table's durability settings
*/ */
private boolean shouldSyncWAL() { private boolean shouldSyncWAL() {
return durability.ordinal() > Durability.ASYNC_WAL.ordinal(); return regionDurability.ordinal() > Durability.ASYNC_WAL.ordinal();
} }
/** /**

View File

@ -309,6 +309,27 @@ public class TestHRegionReplayEvents {
TEST_UTIL.getConfiguration()); TEST_UTIL.getConfiguration());
} }
@Test
public void testBatchReplayWithMultipleNonces() throws IOException {
try {
MutationReplay[] mutations = new MutationReplay[100];
for (int i = 0; i < 100; i++) {
Put put = new Put(Bytes.toBytes(i));
put.setDurability(Durability.SYNC_WAL);
for (byte[] familly : this.families) {
put.addColumn(familly, this.cq, null);
long nonceNum = i / 10;
mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
}
}
primaryRegion.batchReplay(mutations, 20);
} catch (Exception e) {
String msg = "Error while replay of batch with multiple nonces. ";
LOG.error(msg, e);
fail(msg + e.getMessage());
}
}
@Test @Test
public void testReplayFlushesAndCompactions() throws IOException { public void testReplayFlushesAndCompactions() throws IOException {
// initiate a secondary region with some data. // initiate a secondary region with some data.