HBASE-15134 Add visibility into Flush and Compaction queues

This commit is contained in:
Abhishek Singh Chouhan 2017-07-27 20:41:13 +05:30
parent c5d3de0cd4
commit 2d06a06ba4
10 changed files with 163 additions and 3 deletions

View File

@ -30,11 +30,19 @@ public interface MetricsRegionSource extends Comparable<MetricsRegionSource> {
String COMPACTIONS_COMPLETED_COUNT = "compactionsCompletedCount"; String COMPACTIONS_COMPLETED_COUNT = "compactionsCompletedCount";
String COMPACTIONS_FAILED_COUNT = "compactionsFailedCount"; String COMPACTIONS_FAILED_COUNT = "compactionsFailedCount";
String LAST_MAJOR_COMPACTION_AGE = "lastMajorCompactionAge"; String LAST_MAJOR_COMPACTION_AGE = "lastMajorCompactionAge";
String COMPACTIONS_QUEUED_COUNT = "compactionsQueuedCount";
String MAX_COMPACTION_QUEUE_SIZE = "maxCompactionQueueSize";
String NUM_BYTES_COMPACTED_COUNT = "numBytesCompactedCount"; String NUM_BYTES_COMPACTED_COUNT = "numBytesCompactedCount";
String NUM_FILES_COMPACTED_COUNT = "numFilesCompactedCount"; String NUM_FILES_COMPACTED_COUNT = "numFilesCompactedCount";
String FLUSHES_QUEUED_COUNT = "flushesQueuedCount";
String MAX_FLUSH_QUEUE_SIZE = "maxFlushQueueSize";
String COMPACTIONS_COMPLETED_DESC = "Number of compactions that have completed."; String COMPACTIONS_COMPLETED_DESC = "Number of compactions that have completed.";
String COMPACTIONS_FAILED_DESC = "Number of compactions that have failed."; String COMPACTIONS_FAILED_DESC = "Number of compactions that have failed.";
String LAST_MAJOR_COMPACTION_DESC = "Age of the last major compaction in milliseconds."; String LAST_MAJOR_COMPACTION_DESC = "Age of the last major compaction in milliseconds.";
String COMPACTIONS_QUEUED_DESC = "Number of compactions that are queued/running for this region";
String MAX_COMPACTION_QUEUE_DESC = "Max number of compactions queued for this region";
String FLUSHES_QUEUED_DESC = "Number flushes requested/queued for this region";
String MAX_FLUSH_QUEUE_DESC = "Max number of flushes queued for this region";
String NUM_BYTES_COMPACTED_DESC = String NUM_BYTES_COMPACTED_DESC =
"Sum of filesize on all files entering a finished, successful or aborted, compaction"; "Sum of filesize on all files entering a finished, successful or aborted, compaction";
String NUM_FILES_COMPACTED_DESC = String NUM_FILES_COMPACTED_DESC =

View File

@ -117,6 +117,30 @@ public interface MetricsRegionWrapper {
*/ */
long getNumCompactionsFailed(); long getNumCompactionsFailed();
/**
* @return the total number of compactions that are currently queued(or being executed) at point in
* time
*/
long getNumCompactionsQueued();
/**
* @return the total number of flushes currently queued(being executed) for this region at point in
* time
*/
long getNumFlushesQueued();
/**
* @return the max number of compactions queued for this region
* Note that this metric is updated periodically and hence might miss some data points
*/
long getMaxCompactionQueueSize();
/**
* @return the max number of flushes queued for this region
* Note that this metric is updated periodically and hence might miss some data points
*/
long getMaxFlushQueueSize();
int getRegionHashCode(); int getRegionHashCode();
/** /**

View File

@ -270,9 +270,26 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
regionNamePrefix + MetricsRegionServerSource.WRITE_REQUEST_COUNT, regionNamePrefix + MetricsRegionServerSource.WRITE_REQUEST_COUNT,
MetricsRegionServerSource.WRITE_REQUEST_COUNT_DESC), MetricsRegionServerSource.WRITE_REQUEST_COUNT_DESC),
this.regionWrapper.getWriteRequestCount()); this.regionWrapper.getWriteRequestCount());
mrb.addCounter(Interns.info(regionNamePrefix + MetricsRegionSource.REPLICA_ID, mrb.addCounter(Interns.info(
regionNamePrefix + MetricsRegionSource.REPLICA_ID,
MetricsRegionSource.REPLICA_ID_DESC), MetricsRegionSource.REPLICA_ID_DESC),
this.regionWrapper.getReplicaId()); this.regionWrapper.getReplicaId());
mrb.addCounter(Interns.info(
regionNamePrefix + MetricsRegionSource.COMPACTIONS_QUEUED_COUNT,
MetricsRegionSource.COMPACTIONS_QUEUED_DESC),
this.regionWrapper.getNumCompactionsQueued());
mrb.addCounter(Interns.info(
regionNamePrefix + MetricsRegionSource.FLUSHES_QUEUED_COUNT,
MetricsRegionSource.FLUSHES_QUEUED_DESC),
this.regionWrapper.getNumFlushesQueued());
mrb.addCounter(Interns.info(
regionNamePrefix + MetricsRegionSource.MAX_COMPACTION_QUEUE_SIZE,
MetricsRegionSource.MAX_COMPACTION_QUEUE_DESC),
this.regionWrapper.getMaxCompactionQueueSize());
mrb.addCounter(Interns.info(
regionNamePrefix + MetricsRegionSource.MAX_FLUSH_QUEUE_SIZE,
MetricsRegionSource.MAX_FLUSH_QUEUE_DESC),
this.regionWrapper.getMaxFlushQueueSize());
} }
} }

View File

@ -173,5 +173,25 @@ public class TestMetricsRegionSourceImpl {
public int getReplicaId() { public int getReplicaId() {
return 0; return 0;
} }
@Override
public long getNumCompactionsQueued() {
return 0;
}
@Override
public long getNumFlushesQueued() {
return 0;
}
@Override
public long getMaxCompactionQueueSize() {
return 0;
}
@Override
public long getMaxFlushQueueSize() {
return 0;
}
} }
} }

View File

@ -344,6 +344,7 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
? longCompactions : shortCompactions; ? longCompactions : shortCompactions;
pool.execute(new CompactionRunner(s, r, compaction, pool, user)); pool.execute(new CompactionRunner(s, r, compaction, pool, user));
((HRegion)r).incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large "; String type = (pool == shortCompactions) ? "Small " : "Large ";
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
@ -484,9 +485,13 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
} catch (IOException ex) { } catch (IOException ex) {
LOG.error("Compaction selection failed " + this, ex); LOG.error("Compaction selection failed " + this, ex);
server.checkFileSystem(); server.checkFileSystem();
region.decrementCompactionsQueuedCount();
return; return;
} }
if (this.compaction == null) return; // nothing to do if (this.compaction == null) {
region.decrementCompactionsQueuedCount();
return; // nothing to do
}
// Now see if we are in correct pool for the size; if not, go to the correct one. // Now see if we are in correct pool for the size; if not, go to the correct one.
// We might end up waiting for a while, so cancel the selection. // We might end up waiting for a while, so cancel the selection.
assert this.compaction.hasSelection(); assert this.compaction.hasSelection();
@ -539,6 +544,7 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
region.reportCompactionRequestFailure(); region.reportCompactionRequestFailure();
server.checkFileSystem(); server.checkFileSystem();
} finally { } finally {
region.decrementCompactionsQueuedCount();
LOG.debug("CompactSplitThread Status: " + CompactSplit.this); LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
} }
this.compaction.getRequest().afterExecute(); this.compaction.getRequest().afterExecute();
@ -549,6 +555,7 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
Preconditions.checkNotNull(server); Preconditions.checkNotNull(server);
if (server.isStopped() if (server.isStopped()
|| (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) { || (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) {
region.decrementCompactionsQueuedCount();
return; return;
} }
doCompaction(user); doCompaction(user);

View File

@ -285,6 +285,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final AtomicLong compactionsFailed = new AtomicLong(0L); final AtomicLong compactionsFailed = new AtomicLong(0L);
final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L); final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L); final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
final AtomicLong compactionsQueued = new AtomicLong(0L);
final AtomicLong flushesQueued = new AtomicLong(0L);
private final WAL wal; private final WAL wal;
private final HRegionFileSystem fs; private final HRegionFileSystem fs;
@ -2263,6 +2265,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
coprocessorHost.postFlush(); coprocessorHost.postFlush();
} }
if(fs.isFlushSucceeded()) {
flushesQueued.set(0L);
}
status.markComplete("Flush successful"); status.markComplete("Flush successful");
return fs; return fs;
} finally { } finally {
@ -7681,7 +7687,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align( public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + ClassSize.OBJECT +
ClassSize.ARRAY + ClassSize.ARRAY +
49 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + 51 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(15 * Bytes.SIZEOF_LONG) + (15 * Bytes.SIZEOF_LONG) +
6 * Bytes.SIZEOF_BOOLEAN); 6 * Bytes.SIZEOF_BOOLEAN);
@ -8157,6 +8163,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
compactionsFailed.incrementAndGet(); compactionsFailed.incrementAndGet();
} }
public void incrementCompactionsQueuedCount() {
compactionsQueued.incrementAndGet();
}
public void decrementCompactionsQueuedCount() {
compactionsQueued.decrementAndGet();
}
public void incrementFlushesQueuedCount() {
flushesQueued.incrementAndGet();
}
@VisibleForTesting @VisibleForTesting
public long getReadPoint() { public long getReadPoint() {
return getReadPoint(IsolationLevel.READ_COMMITTED); return getReadPoint(IsolationLevel.READ_COMMITTED);

View File

@ -352,6 +352,7 @@ class MemStoreFlusher implements FlushRequester {
@Override @Override
public void requestFlush(Region r, boolean forceFlushAllStores) { public void requestFlush(Region r, boolean forceFlushAllStores) {
((HRegion)r).incrementFlushesQueuedCount();
synchronized (regionsInQueue) { synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) { if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush // This entry has no delay so it will be added at the top of the flush
@ -365,6 +366,7 @@ class MemStoreFlusher implements FlushRequester {
@Override @Override
public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) { public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) {
((HRegion)r).incrementFlushesQueuedCount();
synchronized (regionsInQueue) { synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) { if (!regionsInQueue.containsKey(r)) {
// This entry has some delay // This entry has some delay

View File

@ -52,6 +52,8 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
private long minStoreFileAge; private long minStoreFileAge;
private long avgStoreFileAge; private long avgStoreFileAge;
private long numReferenceFiles; private long numReferenceFiles;
private long maxFlushQueueSize;
private long maxCompactionQueueSize;
private ScheduledFuture<?> regionMetricsUpdateTask; private ScheduledFuture<?> regionMetricsUpdateTask;
@ -162,6 +164,26 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
return this.region.compactionsFailed.get(); return this.region.compactionsFailed.get();
} }
@Override
public long getNumCompactionsQueued() {
return this.region.compactionsQueued.get();
}
@Override
public long getNumFlushesQueued() {
return this.region.flushesQueued.get();
}
@Override
public long getMaxCompactionQueueSize() {
return maxCompactionQueueSize;
}
@Override
public long getMaxFlushQueueSize() {
return maxFlushQueueSize;
}
@Override @Override
public long getMaxStoreFileAge() { public long getMaxStoreFileAge() {
return maxStoreFileAge; return maxStoreFileAge;
@ -197,6 +219,8 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
long tempMaxStoreFileAge = 0; long tempMaxStoreFileAge = 0;
long tempMinStoreFileAge = Long.MAX_VALUE; long tempMinStoreFileAge = Long.MAX_VALUE;
long tempNumReferenceFiles = 0; long tempNumReferenceFiles = 0;
long tempMaxCompactionQueueSize = 0;
long tempMaxFlushQueueSize = 0;
long avgAgeNumerator = 0; long avgAgeNumerator = 0;
long numHFiles = 0; long numHFiles = 0;
@ -234,6 +258,14 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
} }
numReferenceFiles = tempNumReferenceFiles; numReferenceFiles = tempNumReferenceFiles;
tempMaxCompactionQueueSize = getNumCompactionsQueued();
tempMaxFlushQueueSize = getNumFlushesQueued();
if (tempMaxCompactionQueueSize > maxCompactionQueueSize) {
maxCompactionQueueSize = tempMaxCompactionQueueSize;
}
if (tempMaxFlushQueueSize > maxFlushQueueSize) {
maxFlushQueueSize = tempMaxFlushQueueSize;
}
} }
} }

View File

@ -142,4 +142,24 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper {
public int getReplicaId() { public int getReplicaId() {
return replicaid; return replicaid;
} }
@Override
public long getNumCompactionsQueued() {
return 4;
}
@Override
public long getNumFlushesQueued() {
return 6;
}
@Override
public long getMaxCompactionQueueSize() {
return 4;
}
@Override
public long getMaxFlushQueueSize() {
return 6;
}
} }

View File

@ -85,6 +85,18 @@ public class TestMetricsRegion {
HELPER.assertCounter( HELPER.assertCounter(
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_replicaid", "namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_replicaid",
1, agg); 1, agg);
HELPER.assertCounter(
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_compactionsQueuedCount",
4, agg);
HELPER.assertCounter(
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_flushesQueuedCount",
6, agg);
HELPER.assertCounter(
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_maxCompactionQueueSize",
4, agg);
HELPER.assertCounter(
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_maxFlushQueueSize",
6, agg);
mr.close(); mr.close();
} }
} }