HBASE-15204 Try to estimate the cell count for adding into WALEdit (Revert

for making it more cleaner)
This commit is contained in:
ramkrishna 2016-02-06 13:05:13 +05:30
parent 59b03c77de
commit 4e44f4f505
2 changed files with 10 additions and 23 deletions

View File

@ -2906,26 +2906,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* OperationStatusCode and the exceptionMessage if any. * OperationStatusCode and the exceptionMessage if any.
* @throws IOException * @throws IOException
*/ */
OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
throws IOException {
boolean initialized = false; boolean initialized = false;
Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
startRegionOperation(op); startRegionOperation(op);
int cellCountFromCP = 0;
try { try {
while (!batchOp.isDone()) { while (!batchOp.isDone()) {
if (!batchOp.isInReplay()) { if (!batchOp.isInReplay()) {
checkReadOnly(); checkReadOnly();
} }
checkResources(); checkResources();
if (!initialized) { if (!initialized) {
this.writeRequestsCount.add(batchOp.operations.length); this.writeRequestsCount.add(batchOp.operations.length);
if (!batchOp.isInReplay()) { if (!batchOp.isInReplay()) {
cellCountFromCP = doPreMutationHook(batchOp); doPreMutationHook(batchOp);
} }
initialized = true; initialized = true;
} }
long addedSize = doMiniBatchMutation(batchOp, cellCountFromCP); long addedSize = doMiniBatchMutation(batchOp);
long newSize = this.addAndGetGlobalMemstoreSize(addedSize); long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
if (isFlushSize(newSize)) { if (isFlushSize(newSize)) {
requestFlush(); requestFlush();
@ -2938,11 +2937,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
private int doPreMutationHook(BatchOperationInProgress<?> batchOp) private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
throws IOException { throws IOException {
/* Run coprocessor pre hook outside of locks to avoid deadlock */ /* Run coprocessor pre hook outside of locks to avoid deadlock */
WALEdit walEdit = new WALEdit(); WALEdit walEdit = new WALEdit();
int cellCount = 0;
if (coprocessorHost != null) { if (coprocessorHost != null) {
for (int i = 0 ; i < batchOp.operations.length; i++) { for (int i = 0 ; i < batchOp.operations.length; i++) {
Mutation m = batchOp.getMutation(i); Mutation m = batchOp.getMutation(i);
@ -2972,17 +2970,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
if (!walEdit.isEmpty()) { if (!walEdit.isEmpty()) {
batchOp.walEditsFromCoprocessors[i] = walEdit; batchOp.walEditsFromCoprocessors[i] = walEdit;
cellCount += walEdit.size();
walEdit = new WALEdit(); walEdit = new WALEdit();
} }
} }
} }
return cellCount;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp, int cellCount) private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
throws IOException {
boolean isInReplay = batchOp.isInReplay(); boolean isInReplay = batchOp.isInReplay();
// variable to note if all Put items are for the same CF -- metrics related // variable to note if all Put items are for the same CF -- metrics related
boolean putsCfSetConsistent = true; boolean putsCfSetConsistent = true;
@ -2994,7 +2989,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Set<byte[]> deletesCfSet = null; Set<byte[]> deletesCfSet = null;
long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
WALEdit walEdit = null; WALEdit walEdit = new WALEdit(isInReplay);
MultiVersionConcurrencyControl.WriteEntry writeEntry = null; MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
long txid = 0; long txid = 0;
boolean doRollBackMemstore = false; boolean doRollBackMemstore = false;
@ -3025,6 +3020,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap(); Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
// store the family map reference to allow for mutations // store the family map reference to allow for mutations
familyMaps[lastIndexExclusive] = familyMap; familyMaps[lastIndexExclusive] = familyMap;
// skip anything that "ran" already // skip anything that "ran" already
if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) { != OperationStatusCode.NOT_RUN) {
@ -3131,11 +3127,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
noOfDeletes++; noOfDeletes++;
} }
rewriteCellTags(familyMaps[i], mutation); rewriteCellTags(familyMaps[i], mutation);
for (List<Cell> cells : familyMaps[i].values()) {
cellCount += cells.size();
} }
}
walEdit = new WALEdit(cellCount);
lock(this.updatesLock.readLock(), numReadyToWrite); lock(this.updatesLock.readLock(), numReadyToWrite);
locked = true; locked = true;

View File

@ -99,7 +99,7 @@ public class WALEdit implements Writable, HeapSize {
private final int VERSION_2 = -1; private final int VERSION_2 = -1;
private final boolean isReplay; private final boolean isReplay;
private ArrayList<Cell> cells = null; private ArrayList<Cell> cells = new ArrayList<Cell>(1);
public static final WALEdit EMPTY_WALEDIT = new WALEdit(); public static final WALEdit EMPTY_WALEDIT = new WALEdit();
@ -118,12 +118,6 @@ public class WALEdit implements Writable, HeapSize {
public WALEdit(boolean isReplay) { public WALEdit(boolean isReplay) {
this.isReplay = isReplay; this.isReplay = isReplay;
cells = new ArrayList<Cell>(1);
}
public WALEdit(int cellCount) {
this.isReplay = false;
cells = new ArrayList<Cell>(cellCount);
} }
/** /**