HBASE-15204 Try to estimate the cell count for adding into WALEdit (Ram)
This commit is contained in:
parent
38e79b09c9
commit
f9ce069e15
|
@ -2929,25 +2929,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
* OperationStatusCode and the exceptionMessage if any.
|
* OperationStatusCode and the exceptionMessage if any.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
|
OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp)
|
||||||
|
throws IOException {
|
||||||
boolean initialized = false;
|
boolean initialized = false;
|
||||||
Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
|
Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
|
||||||
startRegionOperation(op);
|
startRegionOperation(op);
|
||||||
|
int cellCountFromCP = 0;
|
||||||
try {
|
try {
|
||||||
while (!batchOp.isDone()) {
|
while (!batchOp.isDone()) {
|
||||||
if (!batchOp.isInReplay()) {
|
if (!batchOp.isInReplay()) {
|
||||||
checkReadOnly();
|
checkReadOnly();
|
||||||
}
|
}
|
||||||
checkResources();
|
checkResources();
|
||||||
|
|
||||||
if (!initialized) {
|
if (!initialized) {
|
||||||
this.writeRequestsCount.add(batchOp.operations.length);
|
this.writeRequestsCount.add(batchOp.operations.length);
|
||||||
if (!batchOp.isInReplay()) {
|
if (!batchOp.isInReplay()) {
|
||||||
doPreMutationHook(batchOp);
|
cellCountFromCP = doPreMutationHook(batchOp);
|
||||||
}
|
}
|
||||||
initialized = true;
|
initialized = true;
|
||||||
}
|
}
|
||||||
long addedSize = doMiniBatchMutation(batchOp);
|
long addedSize = doMiniBatchMutation(batchOp, cellCountFromCP);
|
||||||
long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
|
long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
|
||||||
if (isFlushSize(newSize)) {
|
if (isFlushSize(newSize)) {
|
||||||
requestFlush();
|
requestFlush();
|
||||||
|
@ -2960,10 +2961,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
|
private int doPreMutationHook(BatchOperationInProgress<?> batchOp)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
/* Run coprocessor pre hook outside of locks to avoid deadlock */
|
/* Run coprocessor pre hook outside of locks to avoid deadlock */
|
||||||
WALEdit walEdit = new WALEdit();
|
WALEdit walEdit = new WALEdit();
|
||||||
|
int cellCount = 0;
|
||||||
if (coprocessorHost != null) {
|
if (coprocessorHost != null) {
|
||||||
for (int i = 0 ; i < batchOp.operations.length; i++) {
|
for (int i = 0 ; i < batchOp.operations.length; i++) {
|
||||||
Mutation m = batchOp.getMutation(i);
|
Mutation m = batchOp.getMutation(i);
|
||||||
|
@ -2993,14 +2995,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
if (!walEdit.isEmpty()) {
|
if (!walEdit.isEmpty()) {
|
||||||
batchOp.walEditsFromCoprocessors[i] = walEdit;
|
batchOp.walEditsFromCoprocessors[i] = walEdit;
|
||||||
|
cellCount += walEdit.size();
|
||||||
walEdit = new WALEdit();
|
walEdit = new WALEdit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return cellCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
|
private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp, int cellCount)
|
||||||
|
throws IOException {
|
||||||
boolean isInReplay = batchOp.isInReplay();
|
boolean isInReplay = batchOp.isInReplay();
|
||||||
// variable to note if all Put items are for the same CF -- metrics related
|
// variable to note if all Put items are for the same CF -- metrics related
|
||||||
boolean putsCfSetConsistent = true;
|
boolean putsCfSetConsistent = true;
|
||||||
|
@ -3012,7 +3017,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
Set<byte[]> deletesCfSet = null;
|
Set<byte[]> deletesCfSet = null;
|
||||||
|
|
||||||
long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
|
long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
|
||||||
WALEdit walEdit = new WALEdit(isInReplay);
|
WALEdit walEdit = null;
|
||||||
MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
|
MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
|
||||||
long txid = 0;
|
long txid = 0;
|
||||||
boolean doRollBackMemstore = false;
|
boolean doRollBackMemstore = false;
|
||||||
|
@ -3043,7 +3048,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
|
Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
|
||||||
// store the family map reference to allow for mutations
|
// store the family map reference to allow for mutations
|
||||||
familyMaps[lastIndexExclusive] = familyMap;
|
familyMaps[lastIndexExclusive] = familyMap;
|
||||||
|
|
||||||
// skip anything that "ran" already
|
// skip anything that "ran" already
|
||||||
if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
|
if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
|
||||||
!= OperationStatusCode.NOT_RUN) {
|
!= OperationStatusCode.NOT_RUN) {
|
||||||
|
@ -3150,8 +3154,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
noOfDeletes++;
|
noOfDeletes++;
|
||||||
}
|
}
|
||||||
rewriteCellTags(familyMaps[i], mutation);
|
rewriteCellTags(familyMaps[i], mutation);
|
||||||
|
for (List<Cell> cells : familyMaps[i].values()) {
|
||||||
|
cellCount += cells.size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
walEdit = new WALEdit(cellCount);
|
||||||
lock(this.updatesLock.readLock(), numReadyToWrite);
|
lock(this.updatesLock.readLock(), numReadyToWrite);
|
||||||
locked = true;
|
locked = true;
|
||||||
|
|
||||||
|
|
|
@ -99,7 +99,7 @@ public class WALEdit implements Writable, HeapSize {
|
||||||
private final int VERSION_2 = -1;
|
private final int VERSION_2 = -1;
|
||||||
private final boolean isReplay;
|
private final boolean isReplay;
|
||||||
|
|
||||||
private ArrayList<Cell> cells = new ArrayList<Cell>(1);
|
private ArrayList<Cell> cells = null;
|
||||||
|
|
||||||
public static final WALEdit EMPTY_WALEDIT = new WALEdit();
|
public static final WALEdit EMPTY_WALEDIT = new WALEdit();
|
||||||
|
|
||||||
|
@ -118,6 +118,12 @@ public class WALEdit implements Writable, HeapSize {
|
||||||
|
|
||||||
public WALEdit(boolean isReplay) {
|
public WALEdit(boolean isReplay) {
|
||||||
this.isReplay = isReplay;
|
this.isReplay = isReplay;
|
||||||
|
cells = new ArrayList<Cell>(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public WALEdit(int cellCount) {
|
||||||
|
this.isReplay = false;
|
||||||
|
cells = new ArrayList<Cell>(cellCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue