HBASE-20542-ADDENDUM: fix TestHStore

This commit is contained in:
eshcar 2018-07-11 11:25:49 +03:00
parent 1e0650955a
commit 1804b6d059
3 changed files with 40 additions and 30 deletions

View File

@ -240,6 +240,15 @@ public class CompactingMemStore extends AbstractMemStore {
return mss.getDataSize() > 0? mss: getActive().getMemStoreSize(); return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
} }
public void setInMemoryCompactionCompleted() {
inMemoryCompactionInProgress.set(false);
}
protected boolean setInMemoryCompactionFlag() {
return inMemoryCompactionInProgress.compareAndSet(false, true);
}
@Override @Override
protected long keySize() { protected long keySize() {
// Need to consider dataSize/keySize of all segments in pipeline and active // Need to consider dataSize/keySize of all segments in pipeline and active
@ -419,7 +428,7 @@ public class CompactingMemStore extends AbstractMemStore {
if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) { if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) {
if (currActive.setInMemoryFlushed()) { if (currActive.setInMemoryFlushed()) {
flushInMemory(currActive); flushInMemory(currActive);
if (inMemoryCompactionInProgress.compareAndSet(false, true)) { if (setInMemoryCompactionFlag()) {
// The thread is dispatched to do in-memory compaction in the background // The thread is dispatched to do in-memory compaction in the background
InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable(); InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -455,21 +464,19 @@ public class CompactingMemStore extends AbstractMemStore {
// setting the inMemoryCompactionInProgress flag again for the case this method is invoked // setting the inMemoryCompactionInProgress flag again for the case this method is invoked
// directly (only in tests) in the common path setting from true to true is idempotent // directly (only in tests) in the common path setting from true to true is idempotent
inMemoryCompactionInProgress.set(true); inMemoryCompactionInProgress.set(true);
// Used by tests
if (!allowCompaction.get()) {
return;
}
try { try {
// Used by tests // Speculative compaction execution, may be interrupted if flush is forced while
if (!allowCompaction.get()) { // compaction is in progress
return; if(!compactor.start()) {
setInMemoryCompactionCompleted();
} }
try { } catch (IOException e) {
// Speculative compaction execution, may be interrupted if flush is forced while LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}",
// compaction is in progress getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e);
compactor.start();
} catch (IOException e) {
LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}",
getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e);
}
} finally {
inMemoryCompactionInProgress.set(false);
} }
} }

View File

@ -147,20 +147,19 @@ public class MemStoreCompactor {
private void doCompaction() { private void doCompaction() {
ImmutableSegment result = null; ImmutableSegment result = null;
boolean resultSwapped = false; boolean resultSwapped = false;
if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening
return; // the compaction also doesn't start when interrupted
}
MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList); MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList);
boolean merge = boolean merge = (nextStep == MemStoreCompactionStrategy.Action.MERGE ||
(nextStep == MemStoreCompactionStrategy.Action.MERGE || nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
try { try {
if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening
return; // the compaction also doesn't start when interrupted
}
if (nextStep == MemStoreCompactionStrategy.Action.NOOP) { if (nextStep == MemStoreCompactionStrategy.Action.NOOP) {
return; return;
} }
if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN || if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN
nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) { || nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
// some Segment in the pipeline is with SkipList index, make it flat // some Segment in the pipeline is with SkipList index, make it flat
compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep); compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep);
return; return;
@ -195,6 +194,7 @@ public class MemStoreCompactor {
result.close(); result.close();
} }
releaseResources(); releaseResources();
compactingMemStore.setInMemoryCompactionCompleted();
} }
} }

View File

@ -1738,15 +1738,15 @@ public class TestHStore {
@Override @Override
public boolean start() throws IOException { public boolean start() throws IOException {
boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0; boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
boolean rval = super.start();
if (isFirst) { if (isFirst) {
try { try {
START_COMPACTOR_LATCH.await(); START_COMPACTOR_LATCH.await();
return super.start();
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
} }
return rval; return super.start();
} }
} }
@ -1765,12 +1765,15 @@ public class TestHStore {
} }
@Override @Override
void inMemoryCompaction() { protected boolean setInMemoryCompactionFlag() {
RUNNER_COUNT.incrementAndGet(); boolean rval = super.setInMemoryCompactionFlag();
if (LOG.isDebugEnabled()) { if (rval) {
LOG.debug("runner count: " + RUNNER_COUNT.get()); RUNNER_COUNT.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("runner count: " + RUNNER_COUNT.get());
}
} }
super.inMemoryCompaction(); return rval;
} }
} }