From 581d2b7de517ee29b81b62c521ef5ca27c41f38d Mon Sep 17 00:00:00 2001 From: anoopsjohn Date: Wed, 6 Jul 2016 18:54:35 +0530 Subject: [PATCH] HBASE-16162 Compacting Memstore : unnecessary push of active segments to pipeline. --- .../regionserver/CompactingMemStore.java | 64 ++++++++++--------- .../hbase/regionserver/MemStoreCompactor.java | 1 - 2 files changed, 35 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 5e286def439..e27acced1bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -67,7 +67,6 @@ public class CompactingMemStore extends AbstractMemStore { // the threshold on active size for in-memory flush private long inmemoryFlushSize; private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); - @VisibleForTesting private final AtomicBoolean allowCompaction = new AtomicBoolean(true); public CompactingMemStore(Configuration conf, CellComparator c, @@ -199,10 +198,6 @@ public class CompactingMemStore extends AbstractMemStore { return list; } - public void setInMemoryFlushInProgress(boolean inMemoryFlushInProgress) { - this.inMemoryFlushInProgress.set(inMemoryFlushInProgress); - } - public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result) { return pipeline.swap(versionedList, result); @@ -275,31 +270,38 @@ public class CompactingMemStore extends AbstractMemStore { // otherwise there is a deadlock @VisibleForTesting void flushInMemory() throws IOException { - // Phase I: Update the pipeline - getRegionServices().blockUpdates(); + // setting the inMemoryFlushInProgress flag again for the case this method is invoked + // directly (only in tests) in the common path setting from true to true is idempotent + // Speculative compaction execution, may be interrupted if flush is forced while + // compaction is in progress + inMemoryFlushInProgress.set(true); try { - MutableSegment active = getActive(); - if (LOG.isDebugEnabled()) { - LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, " - + "and initiating compaction."); + // Phase I: Update the pipeline + getRegionServices().blockUpdates(); + try { + MutableSegment active = getActive(); + if (LOG.isDebugEnabled()) { + LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, " + + "and initiating compaction."); + } + pushActiveToPipeline(active); + } finally { + getRegionServices().unblockUpdates(); } - pushActiveToPipeline(active); - } finally { - getRegionServices().unblockUpdates(); - } - // Phase II: Compact the pipeline - try { - if (allowCompaction.get() && inMemoryFlushInProgress.compareAndSet(false, true)) { - // setting the inMemoryFlushInProgress flag again for the case this method is invoked - // directly (only in tests) in the common path setting from true to true is idempotent - // Speculative compaction execution, may be interrupted if flush is forced while - // compaction is in progress + // Used by tests + if (!allowCompaction.get()) { + return; + } + // Phase II: Compact the pipeline + try { compactor.startCompaction(); + } catch (IOException e) { + LOG.warn("Unable to run memstore compaction. region " + + getRegionServices().getRegionInfo().getRegionNameAsString() + "store: " + + getFamilyName(), e); } - } catch (IOException e) { - LOG.warn("Unable to run memstore compaction. region " - + getRegionServices().getRegionInfo().getRegionNameAsString() - + "store: "+ getFamilyName(), e); + } finally { + inMemoryFlushInProgress.set(false); } } @@ -312,9 +314,9 @@ public class CompactingMemStore extends AbstractMemStore { } private boolean shouldFlushInMemory() { - if(getActive().getSize() > inmemoryFlushSize) { + if (getActive().getSize() > inmemoryFlushSize) { // size above flush threshold - return (allowCompaction.get() && !inMemoryFlushInProgress.get()); + return inMemoryFlushInProgress.compareAndSet(false, true); } return false; } @@ -361,7 +363,8 @@ public class CompactingMemStore extends AbstractMemStore { */ private class InMemoryFlushRunnable implements Runnable { - @Override public void run() { + @Override + public void run() { try { flushInMemory(); } catch (IOException e) { @@ -375,14 +378,17 @@ public class CompactingMemStore extends AbstractMemStore { //---------------------------------------------------------------------- //methods for tests //---------------------------------------------------------------------- + @VisibleForTesting boolean isMemStoreFlushingInMemory() { return inMemoryFlushInProgress.get(); } + @VisibleForTesting void disableCompaction() { allowCompaction.set(false); } + @VisibleForTesting void enableCompaction() { allowCompaction.set(true); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index d86cd32841d..65d3af634a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -153,7 +153,6 @@ class MemStoreCompactor { return; } finally { releaseResources(); - compactingMemStore.setInMemoryFlushInProgress(false); } }