HBASE-16162 Compacting Memstore : unnecessary push of active segments to pipeline.
This commit is contained in:
parent
ae92668dd6
commit
581d2b7de5
|
@ -67,7 +67,6 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
// the threshold on active size for in-memory flush
|
||||
private long inmemoryFlushSize;
|
||||
private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
|
||||
@VisibleForTesting
|
||||
private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
|
||||
|
||||
public CompactingMemStore(Configuration conf, CellComparator c,
|
||||
|
@ -199,10 +198,6 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
return list;
|
||||
}
|
||||
|
||||
public void setInMemoryFlushInProgress(boolean inMemoryFlushInProgress) {
|
||||
this.inMemoryFlushInProgress.set(inMemoryFlushInProgress);
|
||||
}
|
||||
|
||||
public boolean swapCompactedSegments(VersionedSegmentsList versionedList,
|
||||
ImmutableSegment result) {
|
||||
return pipeline.swap(versionedList, result);
|
||||
|
@ -275,31 +270,38 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
// otherwise there is a deadlock
|
||||
@VisibleForTesting
|
||||
void flushInMemory() throws IOException {
|
||||
// Phase I: Update the pipeline
|
||||
getRegionServices().blockUpdates();
|
||||
// setting the inMemoryFlushInProgress flag again for the case this method is invoked
|
||||
// directly (only in tests) in the common path setting from true to true is idempotent
|
||||
// Speculative compaction execution, may be interrupted if flush is forced while
|
||||
// compaction is in progress
|
||||
inMemoryFlushInProgress.set(true);
|
||||
try {
|
||||
MutableSegment active = getActive();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, "
|
||||
+ "and initiating compaction.");
|
||||
// Phase I: Update the pipeline
|
||||
getRegionServices().blockUpdates();
|
||||
try {
|
||||
MutableSegment active = getActive();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, "
|
||||
+ "and initiating compaction.");
|
||||
}
|
||||
pushActiveToPipeline(active);
|
||||
} finally {
|
||||
getRegionServices().unblockUpdates();
|
||||
}
|
||||
pushActiveToPipeline(active);
|
||||
} finally {
|
||||
getRegionServices().unblockUpdates();
|
||||
}
|
||||
// Phase II: Compact the pipeline
|
||||
try {
|
||||
if (allowCompaction.get() && inMemoryFlushInProgress.compareAndSet(false, true)) {
|
||||
// setting the inMemoryFlushInProgress flag again for the case this method is invoked
|
||||
// directly (only in tests) in the common path setting from true to true is idempotent
|
||||
// Speculative compaction execution, may be interrupted if flush is forced while
|
||||
// compaction is in progress
|
||||
// Used by tests
|
||||
if (!allowCompaction.get()) {
|
||||
return;
|
||||
}
|
||||
// Phase II: Compact the pipeline
|
||||
try {
|
||||
compactor.startCompaction();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to run memstore compaction. region "
|
||||
+ getRegionServices().getRegionInfo().getRegionNameAsString() + "store: "
|
||||
+ getFamilyName(), e);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to run memstore compaction. region "
|
||||
+ getRegionServices().getRegionInfo().getRegionNameAsString()
|
||||
+ "store: "+ getFamilyName(), e);
|
||||
} finally {
|
||||
inMemoryFlushInProgress.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -312,9 +314,9 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
}
|
||||
|
||||
private boolean shouldFlushInMemory() {
|
||||
if(getActive().getSize() > inmemoryFlushSize) {
|
||||
if (getActive().getSize() > inmemoryFlushSize) {
|
||||
// size above flush threshold
|
||||
return (allowCompaction.get() && !inMemoryFlushInProgress.get());
|
||||
return inMemoryFlushInProgress.compareAndSet(false, true);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -361,7 +363,8 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
*/
|
||||
private class InMemoryFlushRunnable implements Runnable {
|
||||
|
||||
@Override public void run() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
flushInMemory();
|
||||
} catch (IOException e) {
|
||||
|
@ -375,14 +378,17 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
//----------------------------------------------------------------------
|
||||
//methods for tests
|
||||
//----------------------------------------------------------------------
|
||||
@VisibleForTesting
|
||||
boolean isMemStoreFlushingInMemory() {
|
||||
return inMemoryFlushInProgress.get();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void disableCompaction() {
|
||||
allowCompaction.set(false);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void enableCompaction() {
|
||||
allowCompaction.set(true);
|
||||
}
|
||||
|
|
|
@ -153,7 +153,6 @@ class MemStoreCompactor {
|
|||
return;
|
||||
} finally {
|
||||
releaseResources();
|
||||
compactingMemStore.setInMemoryFlushInProgress(false);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue