HBASE-15134 Add visibility into Flush and Compaction queues
This commit is contained in:
parent
34a1ae875c
commit
12b9a151e6
|
@ -30,11 +30,19 @@ public interface MetricsRegionSource extends Comparable<MetricsRegionSource> {
|
|||
String COMPACTIONS_COMPLETED_COUNT = "compactionsCompletedCount";
|
||||
String COMPACTIONS_FAILED_COUNT = "compactionsFailedCount";
|
||||
String LAST_MAJOR_COMPACTION_AGE = "lastMajorCompactionAge";
|
||||
String COMPACTIONS_QUEUED_COUNT = "compactionsQueuedCount";
|
||||
String MAX_COMPACTION_QUEUE_SIZE = "maxCompactionQueueSize";
|
||||
String NUM_BYTES_COMPACTED_COUNT = "numBytesCompactedCount";
|
||||
String NUM_FILES_COMPACTED_COUNT = "numFilesCompactedCount";
|
||||
String FLUSHES_QUEUED_COUNT = "flushesQueuedCount";
|
||||
String MAX_FLUSH_QUEUE_SIZE = "maxFlushQueueSize";
|
||||
String COMPACTIONS_COMPLETED_DESC = "Number of compactions that have completed.";
|
||||
String COMPACTIONS_FAILED_DESC = "Number of compactions that have failed.";
|
||||
String LAST_MAJOR_COMPACTION_DESC = "Age of the last major compaction in milliseconds.";
|
||||
String COMPACTIONS_QUEUED_DESC = "Number of compactions that are queued/running for this region";
|
||||
String MAX_COMPACTION_QUEUE_DESC = "Max number of compactions queued for this region";
|
||||
String FLUSHES_QUEUED_DESC = "Number flushes requested/queued for this region";
|
||||
String MAX_FLUSH_QUEUE_DESC = "Max number of flushes queued for this region";
|
||||
String NUM_BYTES_COMPACTED_DESC =
|
||||
"Sum of filesize on all files entering a finished, successful or aborted, compaction";
|
||||
String NUM_FILES_COMPACTED_DESC =
|
||||
|
|
|
@ -117,6 +117,30 @@ public interface MetricsRegionWrapper {
|
|||
*/
|
||||
long getNumCompactionsFailed();
|
||||
|
||||
/**
|
||||
* @return the total number of compactions that are currently queued(or being executed) at point in
|
||||
* time
|
||||
*/
|
||||
long getNumCompactionsQueued();
|
||||
|
||||
/**
|
||||
* @return the total number of flushes currently queued(being executed) for this region at point in
|
||||
* time
|
||||
*/
|
||||
long getNumFlushesQueued();
|
||||
|
||||
/**
|
||||
* @return the max number of compactions queued for this region
|
||||
* Note that this metric is updated periodically and hence might miss some data points
|
||||
*/
|
||||
long getMaxCompactionQueueSize();
|
||||
|
||||
/**
|
||||
* @return the max number of flushes queued for this region
|
||||
* Note that this metric is updated periodically and hence might miss some data points
|
||||
*/
|
||||
long getMaxFlushQueueSize();
|
||||
|
||||
int getRegionHashCode();
|
||||
|
||||
/**
|
||||
|
|
|
@ -270,9 +270,26 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
regionNamePrefix + MetricsRegionServerSource.WRITE_REQUEST_COUNT,
|
||||
MetricsRegionServerSource.WRITE_REQUEST_COUNT_DESC),
|
||||
this.regionWrapper.getWriteRequestCount());
|
||||
mrb.addCounter(Interns.info(regionNamePrefix + MetricsRegionSource.REPLICA_ID,
|
||||
mrb.addCounter(Interns.info(
|
||||
regionNamePrefix + MetricsRegionSource.REPLICA_ID,
|
||||
MetricsRegionSource.REPLICA_ID_DESC),
|
||||
this.regionWrapper.getReplicaId());
|
||||
mrb.addCounter(Interns.info(
|
||||
regionNamePrefix + MetricsRegionSource.COMPACTIONS_QUEUED_COUNT,
|
||||
MetricsRegionSource.COMPACTIONS_QUEUED_DESC),
|
||||
this.regionWrapper.getNumCompactionsQueued());
|
||||
mrb.addCounter(Interns.info(
|
||||
regionNamePrefix + MetricsRegionSource.FLUSHES_QUEUED_COUNT,
|
||||
MetricsRegionSource.FLUSHES_QUEUED_DESC),
|
||||
this.regionWrapper.getNumFlushesQueued());
|
||||
mrb.addCounter(Interns.info(
|
||||
regionNamePrefix + MetricsRegionSource.MAX_COMPACTION_QUEUE_SIZE,
|
||||
MetricsRegionSource.MAX_COMPACTION_QUEUE_DESC),
|
||||
this.regionWrapper.getMaxCompactionQueueSize());
|
||||
mrb.addCounter(Interns.info(
|
||||
regionNamePrefix + MetricsRegionSource.MAX_FLUSH_QUEUE_SIZE,
|
||||
MetricsRegionSource.MAX_FLUSH_QUEUE_DESC),
|
||||
this.regionWrapper.getMaxFlushQueueSize());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -173,5 +173,25 @@ public class TestMetricsRegionSourceImpl {
|
|||
public int getReplicaId() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumCompactionsQueued() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumFlushesQueued() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxCompactionQueueSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxFlushQueueSize() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -344,6 +344,7 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
|
|||
ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
|
||||
? longCompactions : shortCompactions;
|
||||
pool.execute(new CompactionRunner(s, r, compaction, pool, user));
|
||||
((HRegion)r).incrementCompactionsQueuedCount();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
String type = (pool == shortCompactions) ? "Small " : "Large ";
|
||||
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
|
||||
|
@ -484,9 +485,13 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
|
|||
} catch (IOException ex) {
|
||||
LOG.error("Compaction selection failed " + this, ex);
|
||||
server.checkFileSystem();
|
||||
region.decrementCompactionsQueuedCount();
|
||||
return;
|
||||
}
|
||||
if (this.compaction == null) return; // nothing to do
|
||||
if (this.compaction == null) {
|
||||
region.decrementCompactionsQueuedCount();
|
||||
return; // nothing to do
|
||||
}
|
||||
// Now see if we are in correct pool for the size; if not, go to the correct one.
|
||||
// We might end up waiting for a while, so cancel the selection.
|
||||
assert this.compaction.hasSelection();
|
||||
|
@ -539,6 +544,7 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
|
|||
region.reportCompactionRequestFailure();
|
||||
server.checkFileSystem();
|
||||
} finally {
|
||||
region.decrementCompactionsQueuedCount();
|
||||
LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
|
||||
}
|
||||
this.compaction.getRequest().afterExecute();
|
||||
|
@ -549,6 +555,7 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
|
|||
Preconditions.checkNotNull(server);
|
||||
if (server.isStopped()
|
||||
|| (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) {
|
||||
region.decrementCompactionsQueuedCount();
|
||||
return;
|
||||
}
|
||||
doCompaction(user);
|
||||
|
|
|
@ -285,6 +285,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
final AtomicLong compactionsFailed = new AtomicLong(0L);
|
||||
final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
|
||||
final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
|
||||
final AtomicLong compactionsQueued = new AtomicLong(0L);
|
||||
final AtomicLong flushesQueued = new AtomicLong(0L);
|
||||
|
||||
private final WAL wal;
|
||||
private final HRegionFileSystem fs;
|
||||
|
@ -2263,6 +2265,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
coprocessorHost.postFlush();
|
||||
}
|
||||
|
||||
if(fs.isFlushSucceeded()) {
|
||||
flushesQueued.set(0L);
|
||||
}
|
||||
|
||||
status.markComplete("Flush successful");
|
||||
return fs;
|
||||
} finally {
|
||||
|
@ -7681,7 +7687,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT +
|
||||
ClassSize.ARRAY +
|
||||
49 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||
51 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||
(15 * Bytes.SIZEOF_LONG) +
|
||||
6 * Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
|
@ -8157,6 +8163,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
compactionsFailed.incrementAndGet();
|
||||
}
|
||||
|
||||
public void incrementCompactionsQueuedCount() {
|
||||
compactionsQueued.incrementAndGet();
|
||||
}
|
||||
|
||||
public void decrementCompactionsQueuedCount() {
|
||||
compactionsQueued.decrementAndGet();
|
||||
}
|
||||
|
||||
public void incrementFlushesQueuedCount() {
|
||||
flushesQueued.incrementAndGet();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getReadPoint() {
|
||||
return getReadPoint(IsolationLevel.READ_COMMITTED);
|
||||
|
|
|
@ -352,6 +352,7 @@ class MemStoreFlusher implements FlushRequester {
|
|||
|
||||
@Override
|
||||
public void requestFlush(Region r, boolean forceFlushAllStores) {
|
||||
((HRegion)r).incrementFlushesQueuedCount();
|
||||
synchronized (regionsInQueue) {
|
||||
if (!regionsInQueue.containsKey(r)) {
|
||||
// This entry has no delay so it will be added at the top of the flush
|
||||
|
@ -365,6 +366,7 @@ class MemStoreFlusher implements FlushRequester {
|
|||
|
||||
@Override
|
||||
public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) {
|
||||
((HRegion)r).incrementFlushesQueuedCount();
|
||||
synchronized (regionsInQueue) {
|
||||
if (!regionsInQueue.containsKey(r)) {
|
||||
// This entry has some delay
|
||||
|
|
|
@ -52,6 +52,8 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
private long minStoreFileAge;
|
||||
private long avgStoreFileAge;
|
||||
private long numReferenceFiles;
|
||||
private long maxFlushQueueSize;
|
||||
private long maxCompactionQueueSize;
|
||||
|
||||
private ScheduledFuture<?> regionMetricsUpdateTask;
|
||||
|
||||
|
@ -162,6 +164,26 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
return this.region.compactionsFailed.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumCompactionsQueued() {
|
||||
return this.region.compactionsQueued.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumFlushesQueued() {
|
||||
return this.region.flushesQueued.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxCompactionQueueSize() {
|
||||
return maxCompactionQueueSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxFlushQueueSize() {
|
||||
return maxFlushQueueSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxStoreFileAge() {
|
||||
return maxStoreFileAge;
|
||||
|
@ -197,6 +219,8 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
long tempMaxStoreFileAge = 0;
|
||||
long tempMinStoreFileAge = Long.MAX_VALUE;
|
||||
long tempNumReferenceFiles = 0;
|
||||
long tempMaxCompactionQueueSize = 0;
|
||||
long tempMaxFlushQueueSize = 0;
|
||||
|
||||
long avgAgeNumerator = 0;
|
||||
long numHFiles = 0;
|
||||
|
@ -234,6 +258,14 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
}
|
||||
|
||||
numReferenceFiles = tempNumReferenceFiles;
|
||||
tempMaxCompactionQueueSize = getNumCompactionsQueued();
|
||||
tempMaxFlushQueueSize = getNumFlushesQueued();
|
||||
if (tempMaxCompactionQueueSize > maxCompactionQueueSize) {
|
||||
maxCompactionQueueSize = tempMaxCompactionQueueSize;
|
||||
}
|
||||
if (tempMaxFlushQueueSize > maxFlushQueueSize) {
|
||||
maxFlushQueueSize = tempMaxFlushQueueSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -142,4 +142,24 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper {
|
|||
public int getReplicaId() {
|
||||
return replicaid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumCompactionsQueued() {
|
||||
return 4;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumFlushesQueued() {
|
||||
return 6;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxCompactionQueueSize() {
|
||||
return 4;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxFlushQueueSize() {
|
||||
return 6;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,6 +85,18 @@ public class TestMetricsRegion {
|
|||
HELPER.assertCounter(
|
||||
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_replicaid",
|
||||
1, agg);
|
||||
HELPER.assertCounter(
|
||||
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_compactionsQueuedCount",
|
||||
4, agg);
|
||||
HELPER.assertCounter(
|
||||
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_flushesQueuedCount",
|
||||
6, agg);
|
||||
HELPER.assertCounter(
|
||||
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_maxCompactionQueueSize",
|
||||
4, agg);
|
||||
HELPER.assertCounter(
|
||||
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_maxFlushQueueSize",
|
||||
6, agg);
|
||||
mr.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue