HBASE-15464 Flush / Compaction metrics revisited
This commit is contained in:
parent
75252af3a9
commit
797562e6c3
|
@ -146,6 +146,53 @@ public interface MetricsRegionServerSource extends BaseSource {
|
||||||
*/
|
*/
|
||||||
void updateFlushTime(long t);
|
void updateFlushTime(long t);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the flush memstore size histogram
|
||||||
|
* @param bytes the number of bytes in the memstore
|
||||||
|
*/
|
||||||
|
void updateFlushMemstoreSize(long bytes);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the flush output file size histogram
|
||||||
|
* @param bytes the number of bytes in the output file
|
||||||
|
*/
|
||||||
|
void updateFlushOutputSize(long bytes);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the compaction time histogram, both major and minor
|
||||||
|
* @param isMajor whether compaction is a major compaction
|
||||||
|
* @param t time it took, in milliseconds
|
||||||
|
*/
|
||||||
|
void updateCompactionTime(boolean isMajor, long t);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the compaction input number of files histogram
|
||||||
|
* @param isMajor whether compaction is a major compaction
|
||||||
|
* @param c number of files
|
||||||
|
*/
|
||||||
|
void updateCompactionInputFileCount(boolean isMajor, long c);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the compaction total input file size histogram
|
||||||
|
* @param isMajor whether compaction is a major compaction
|
||||||
|
* @param bytes the number of bytes of the compaction input file
|
||||||
|
*/
|
||||||
|
void updateCompactionInputSize(boolean isMajor, long bytes);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the compaction output number of files histogram
|
||||||
|
* @param isMajor whether compaction is a major compaction
|
||||||
|
* @param c number of files
|
||||||
|
*/
|
||||||
|
void updateCompactionOutputFileCount(boolean isMajor, long c);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the compaction total output file size
|
||||||
|
* @param isMajor whether compaction is a major compaction
|
||||||
|
* @param bytes the number of bytes of the compaction input file
|
||||||
|
*/
|
||||||
|
void updateCompactionOutputSize(boolean isMajor, long bytes);
|
||||||
|
|
||||||
// Strings used for exporting to metrics system.
|
// Strings used for exporting to metrics system.
|
||||||
String REGION_COUNT = "regionCount";
|
String REGION_COUNT = "regionCount";
|
||||||
String REGION_COUNT_DESC = "Number of regions";
|
String REGION_COUNT_DESC = "Number of regions";
|
||||||
|
@ -212,6 +259,10 @@ public interface MetricsRegionServerSource extends BaseSource {
|
||||||
String LARGE_COMPACTION_QUEUE_LENGTH = "largeCompactionQueueLength";
|
String LARGE_COMPACTION_QUEUE_LENGTH = "largeCompactionQueueLength";
|
||||||
String SMALL_COMPACTION_QUEUE_LENGTH = "smallCompactionQueueLength";
|
String SMALL_COMPACTION_QUEUE_LENGTH = "smallCompactionQueueLength";
|
||||||
String COMPACTION_QUEUE_LENGTH_DESC = "Length of the queue for compactions.";
|
String COMPACTION_QUEUE_LENGTH_DESC = "Length of the queue for compactions.";
|
||||||
|
String LARGE_COMPACTION_QUEUE_LENGTH_DESC = "Length of the queue for compactions with input size "
|
||||||
|
+ "larger than throttle threshold (2.5GB by default)";
|
||||||
|
String SMALL_COMPACTION_QUEUE_LENGTH_DESC = "Length of the queue for compactions with input size "
|
||||||
|
+ "smaller than throttle threshold (2.5GB by default)";
|
||||||
String FLUSH_QUEUE_LENGTH = "flushQueueLength";
|
String FLUSH_QUEUE_LENGTH = "flushQueueLength";
|
||||||
String FLUSH_QUEUE_LENGTH_DESC = "Length of the queue for region flushes";
|
String FLUSH_QUEUE_LENGTH_DESC = "Length of the queue for region flushes";
|
||||||
String BLOCK_CACHE_FREE_SIZE = "blockCacheFreeSize";
|
String BLOCK_CACHE_FREE_SIZE = "blockCacheFreeSize";
|
||||||
|
@ -345,7 +396,61 @@ public interface MetricsRegionServerSource extends BaseSource {
|
||||||
String SPLIT_REQUEST_DESC = "Number of splits requested";
|
String SPLIT_REQUEST_DESC = "Number of splits requested";
|
||||||
String SPLIT_SUCCESS_KEY = "splitSuccessCount";
|
String SPLIT_SUCCESS_KEY = "splitSuccessCount";
|
||||||
String SPLIT_SUCCESS_DESC = "Number of successfully executed splits";
|
String SPLIT_SUCCESS_DESC = "Number of successfully executed splits";
|
||||||
String FLUSH_KEY = "flushTime";
|
|
||||||
|
String FLUSH_TIME = "flushTime";
|
||||||
|
String FLUSH_TIME_DESC = "Histogram for the time in millis for memstore flush";
|
||||||
|
String FLUSH_MEMSTORE_SIZE = "flushMemstoreSize";
|
||||||
|
String FLUSH_MEMSTORE_SIZE_DESC = "Histogram for number of bytes in the memstore for a flush";
|
||||||
|
String FLUSH_OUTPUT_SIZE = "flushOutputSize";
|
||||||
|
String FLUSH_OUTPUT_SIZE_DESC = "Histogram for number of bytes in the resulting file for a flush";
|
||||||
|
String FLUSHED_OUTPUT_BYTES = "flushedOutputBytes";
|
||||||
|
String FLUSHED_OUTPUT_BYTES_DESC = "Total number of bytes written from flush";
|
||||||
|
String FLUSHED_MEMSTORE_BYTES = "flushedMemstoreBytes";
|
||||||
|
String FLUSHED_MEMSTORE_BYTES_DESC = "Total number of bytes of cells in memstore from flush";
|
||||||
|
|
||||||
|
String COMPACTION_TIME = "compactionTime";
|
||||||
|
String COMPACTION_TIME_DESC
|
||||||
|
= "Histogram for the time in millis for compaction, both major and minor";
|
||||||
|
String COMPACTION_INPUT_FILE_COUNT = "compactionInputFileCount";
|
||||||
|
String COMPACTION_INPUT_FILE_COUNT_DESC
|
||||||
|
= "Histogram for the compaction input number of files, both major and minor";
|
||||||
|
String COMPACTION_INPUT_SIZE = "compactionInputSize";
|
||||||
|
String COMPACTION_INPUT_SIZE_DESC
|
||||||
|
= "Histogram for the compaction total input file sizes, both major and minor";
|
||||||
|
String COMPACTION_OUTPUT_FILE_COUNT = "compactionOutputFileCount";
|
||||||
|
String COMPACTION_OUTPUT_FILE_COUNT_DESC
|
||||||
|
= "Histogram for the compaction output number of files, both major and minor";
|
||||||
|
String COMPACTION_OUTPUT_SIZE = "compactionOutputSize";
|
||||||
|
String COMPACTION_OUTPUT_SIZE_DESC
|
||||||
|
= "Histogram for the compaction total output file sizes, both major and minor";
|
||||||
|
String COMPACTED_INPUT_BYTES = "compactedInputBytes";
|
||||||
|
String COMPACTED_INPUT_BYTES_DESC
|
||||||
|
= "Total number of bytes that is read for compaction, both major and minor";
|
||||||
|
String COMPACTED_OUTPUT_BYTES = "compactedOutputBytes";
|
||||||
|
String COMPACTED_OUTPUT_BYTES_DESC
|
||||||
|
= "Total number of bytes that is output from compaction, both major and minor";
|
||||||
|
|
||||||
|
String MAJOR_COMPACTION_TIME = "majorCompactionTime";
|
||||||
|
String MAJOR_COMPACTION_TIME_DESC
|
||||||
|
= "Histogram for the time in millis for compaction, major only";
|
||||||
|
String MAJOR_COMPACTION_INPUT_FILE_COUNT = "majorCompactionInputFileCount";
|
||||||
|
String MAJOR_COMPACTION_INPUT_FILE_COUNT_DESC
|
||||||
|
= "Histogram for the compaction input number of files, major only";
|
||||||
|
String MAJOR_COMPACTION_INPUT_SIZE = "majorCompactionInputSize";
|
||||||
|
String MAJOR_COMPACTION_INPUT_SIZE_DESC
|
||||||
|
= "Histogram for the compaction total input file sizes, major only";
|
||||||
|
String MAJOR_COMPACTION_OUTPUT_FILE_COUNT = "majorCompactionOutputFileCount";
|
||||||
|
String MAJOR_COMPACTION_OUTPUT_FILE_COUNT_DESC
|
||||||
|
= "Histogram for the compaction output number of files, major only";
|
||||||
|
String MAJOR_COMPACTION_OUTPUT_SIZE = "majorCompactionOutputSize";
|
||||||
|
String MAJOR_COMPACTION_OUTPUT_SIZE_DESC
|
||||||
|
= "Histogram for the compaction total output file sizes, major only";
|
||||||
|
String MAJOR_COMPACTED_INPUT_BYTES = "majorCompactedInputBytes";
|
||||||
|
String MAJOR_COMPACTED_INPUT_BYTES_DESC
|
||||||
|
= "Total number of bytes that is read for compaction, major only";
|
||||||
|
String MAJOR_COMPACTED_OUTPUT_BYTES = "majorCompactedOutputBytes";
|
||||||
|
String MAJOR_COMPACTED_OUTPUT_BYTES_DESC
|
||||||
|
= "Total number of bytes that is output from compaction, major only";
|
||||||
|
|
||||||
String RPC_GET_REQUEST_COUNT = "rpcGetRequestCount";
|
String RPC_GET_REQUEST_COUNT = "rpcGetRequestCount";
|
||||||
String RPC_GET_REQUEST_COUNT_DESC = "Number of rpc get requests this region server has answered.";
|
String RPC_GET_REQUEST_COUNT_DESC = "Number of rpc get requests this region server has answered.";
|
||||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.metrics2.lib.MutableFastCounter;
|
||||||
public class MetricsRegionServerSourceImpl
|
public class MetricsRegionServerSourceImpl
|
||||||
extends BaseSourceImpl implements MetricsRegionServerSource {
|
extends BaseSourceImpl implements MetricsRegionServerSource {
|
||||||
|
|
||||||
|
|
||||||
final MetricsRegionServerWrapper rsWrap;
|
final MetricsRegionServerWrapper rsWrap;
|
||||||
private final MetricHistogram putHisto;
|
private final MetricHistogram putHisto;
|
||||||
private final MetricHistogram deleteHisto;
|
private final MetricHistogram deleteHisto;
|
||||||
|
@ -55,7 +54,30 @@ public class MetricsRegionServerSourceImpl
|
||||||
private final MutableFastCounter splitSuccess;
|
private final MutableFastCounter splitSuccess;
|
||||||
|
|
||||||
private final MetricHistogram splitTimeHisto;
|
private final MetricHistogram splitTimeHisto;
|
||||||
|
|
||||||
|
// flush related metrics
|
||||||
private final MetricHistogram flushTimeHisto;
|
private final MetricHistogram flushTimeHisto;
|
||||||
|
private final MetricHistogram flushMemstoreSizeHisto;
|
||||||
|
private final MetricHistogram flushOutputSizeHisto;
|
||||||
|
private final MutableFastCounter flushedMemstoreBytes;
|
||||||
|
private final MutableFastCounter flushedOutputBytes;
|
||||||
|
|
||||||
|
// compaction related metrics
|
||||||
|
private final MetricHistogram compactionTimeHisto;
|
||||||
|
private final MetricHistogram compactionInputFileCountHisto;
|
||||||
|
private final MetricHistogram compactionInputSizeHisto;
|
||||||
|
private final MetricHistogram compactionOutputFileCountHisto;
|
||||||
|
private final MetricHistogram compactionOutputSizeHisto;
|
||||||
|
private final MutableFastCounter compactedInputBytes;
|
||||||
|
private final MutableFastCounter compactedOutputBytes;
|
||||||
|
|
||||||
|
private final MetricHistogram majorCompactionTimeHisto;
|
||||||
|
private final MetricHistogram majorCompactionInputFileCountHisto;
|
||||||
|
private final MetricHistogram majorCompactionInputSizeHisto;
|
||||||
|
private final MetricHistogram majorCompactionOutputFileCountHisto;
|
||||||
|
private final MetricHistogram majorCompactionOutputSizeHisto;
|
||||||
|
private final MutableFastCounter majorCompactedInputBytes;
|
||||||
|
private final MutableFastCounter majorCompactedOutputBytes;
|
||||||
|
|
||||||
public MetricsRegionServerSourceImpl(MetricsRegionServerWrapper rsWrap) {
|
public MetricsRegionServerSourceImpl(MetricsRegionServerWrapper rsWrap) {
|
||||||
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, rsWrap);
|
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, rsWrap);
|
||||||
|
@ -88,9 +110,47 @@ public class MetricsRegionServerSourceImpl
|
||||||
scanSizeHisto = getMetricsRegistry().newSizeHistogram(SCAN_SIZE_KEY);
|
scanSizeHisto = getMetricsRegistry().newSizeHistogram(SCAN_SIZE_KEY);
|
||||||
scanTimeHisto = getMetricsRegistry().newTimeHistogram(SCAN_TIME_KEY);
|
scanTimeHisto = getMetricsRegistry().newTimeHistogram(SCAN_TIME_KEY);
|
||||||
|
|
||||||
splitTimeHisto = getMetricsRegistry().newTimeHistogram(SPLIT_KEY);
|
flushTimeHisto = getMetricsRegistry().newTimeHistogram(FLUSH_TIME, FLUSH_TIME_DESC);
|
||||||
flushTimeHisto = getMetricsRegistry().newTimeHistogram(FLUSH_KEY);
|
flushMemstoreSizeHisto = getMetricsRegistry()
|
||||||
|
.newSizeHistogram(FLUSH_MEMSTORE_SIZE, FLUSH_MEMSTORE_SIZE_DESC);
|
||||||
|
flushOutputSizeHisto = getMetricsRegistry().newSizeHistogram(FLUSH_OUTPUT_SIZE,
|
||||||
|
FLUSH_OUTPUT_SIZE_DESC);
|
||||||
|
flushedOutputBytes = getMetricsRegistry().newCounter(FLUSHED_OUTPUT_BYTES,
|
||||||
|
FLUSHED_OUTPUT_BYTES_DESC, 0L);
|
||||||
|
flushedMemstoreBytes = getMetricsRegistry().newCounter(FLUSHED_MEMSTORE_BYTES,
|
||||||
|
FLUSHED_MEMSTORE_BYTES_DESC, 0L);
|
||||||
|
|
||||||
|
compactionTimeHisto = getMetricsRegistry()
|
||||||
|
.newTimeHistogram(COMPACTION_TIME, COMPACTION_TIME_DESC);
|
||||||
|
compactionInputFileCountHisto = getMetricsRegistry()
|
||||||
|
.newHistogram(COMPACTION_INPUT_FILE_COUNT, COMPACTION_INPUT_FILE_COUNT_DESC);
|
||||||
|
compactionInputSizeHisto = getMetricsRegistry()
|
||||||
|
.newSizeHistogram(COMPACTION_INPUT_SIZE, COMPACTION_INPUT_SIZE_DESC);
|
||||||
|
compactionOutputFileCountHisto = getMetricsRegistry()
|
||||||
|
.newHistogram(COMPACTION_OUTPUT_FILE_COUNT, COMPACTION_OUTPUT_FILE_COUNT_DESC);
|
||||||
|
compactionOutputSizeHisto = getMetricsRegistry()
|
||||||
|
.newSizeHistogram(COMPACTION_OUTPUT_SIZE, COMPACTION_OUTPUT_SIZE_DESC);
|
||||||
|
compactedInputBytes = getMetricsRegistry()
|
||||||
|
.newCounter(COMPACTED_INPUT_BYTES, COMPACTED_INPUT_BYTES_DESC, 0L);
|
||||||
|
compactedOutputBytes = getMetricsRegistry()
|
||||||
|
.newCounter(COMPACTED_OUTPUT_BYTES, COMPACTED_OUTPUT_BYTES_DESC, 0L);
|
||||||
|
|
||||||
|
majorCompactionTimeHisto = getMetricsRegistry()
|
||||||
|
.newTimeHistogram(MAJOR_COMPACTION_TIME, MAJOR_COMPACTION_TIME_DESC);
|
||||||
|
majorCompactionInputFileCountHisto = getMetricsRegistry()
|
||||||
|
.newHistogram(MAJOR_COMPACTION_INPUT_FILE_COUNT, MAJOR_COMPACTION_INPUT_FILE_COUNT_DESC);
|
||||||
|
majorCompactionInputSizeHisto = getMetricsRegistry()
|
||||||
|
.newSizeHistogram(MAJOR_COMPACTION_INPUT_SIZE, MAJOR_COMPACTION_INPUT_SIZE_DESC);
|
||||||
|
majorCompactionOutputFileCountHisto = getMetricsRegistry()
|
||||||
|
.newHistogram(MAJOR_COMPACTION_OUTPUT_FILE_COUNT, MAJOR_COMPACTION_OUTPUT_FILE_COUNT_DESC);
|
||||||
|
majorCompactionOutputSizeHisto = getMetricsRegistry()
|
||||||
|
.newSizeHistogram(MAJOR_COMPACTION_OUTPUT_SIZE, MAJOR_COMPACTION_OUTPUT_SIZE_DESC);
|
||||||
|
majorCompactedInputBytes = getMetricsRegistry()
|
||||||
|
.newCounter(MAJOR_COMPACTED_INPUT_BYTES, MAJOR_COMPACTED_INPUT_BYTES_DESC, 0L);
|
||||||
|
majorCompactedOutputBytes = getMetricsRegistry()
|
||||||
|
.newCounter(MAJOR_COMPACTED_OUTPUT_BYTES, MAJOR_COMPACTED_OUTPUT_BYTES_DESC, 0L);
|
||||||
|
|
||||||
|
splitTimeHisto = getMetricsRegistry().newTimeHistogram(SPLIT_KEY);
|
||||||
splitRequest = getMetricsRegistry().newCounter(SPLIT_REQUEST_KEY, SPLIT_REQUEST_DESC, 0L);
|
splitRequest = getMetricsRegistry().newCounter(SPLIT_REQUEST_KEY, SPLIT_REQUEST_DESC, 0L);
|
||||||
splitSuccess = getMetricsRegistry().newCounter(SPLIT_SUCCESS_KEY, SPLIT_SUCCESS_DESC, 0L);
|
splitSuccess = getMetricsRegistry().newCounter(SPLIT_SUCCESS_KEY, SPLIT_SUCCESS_DESC, 0L);
|
||||||
}
|
}
|
||||||
|
@ -180,6 +240,62 @@ public class MetricsRegionServerSourceImpl
|
||||||
flushTimeHisto.add(t);
|
flushTimeHisto.add(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateFlushMemstoreSize(long bytes) {
|
||||||
|
flushMemstoreSizeHisto.add(bytes);
|
||||||
|
flushedMemstoreBytes.incr(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateFlushOutputSize(long bytes) {
|
||||||
|
flushOutputSizeHisto.add(bytes);
|
||||||
|
flushedOutputBytes.incr(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateCompactionTime(boolean isMajor, long t) {
|
||||||
|
compactionTimeHisto.add(t);
|
||||||
|
if (isMajor) {
|
||||||
|
majorCompactionTimeHisto.add(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateCompactionInputFileCount(boolean isMajor, long c) {
|
||||||
|
compactionInputFileCountHisto.add(c);
|
||||||
|
if (isMajor) {
|
||||||
|
majorCompactionInputFileCountHisto.add(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateCompactionInputSize(boolean isMajor, long bytes) {
|
||||||
|
compactionInputSizeHisto.add(bytes);
|
||||||
|
compactedInputBytes.incr(bytes);
|
||||||
|
if (isMajor) {
|
||||||
|
majorCompactionInputSizeHisto.add(bytes);
|
||||||
|
majorCompactedInputBytes.incr(bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateCompactionOutputFileCount(boolean isMajor, long c) {
|
||||||
|
compactionOutputFileCountHisto.add(c);
|
||||||
|
if (isMajor) {
|
||||||
|
majorCompactionOutputFileCountHisto.add(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateCompactionOutputSize(boolean isMajor, long bytes) {
|
||||||
|
compactionOutputSizeHisto.add(bytes);
|
||||||
|
compactedOutputBytes.incr(bytes);
|
||||||
|
if (isMajor) {
|
||||||
|
majorCompactionOutputSizeHisto.add(bytes);
|
||||||
|
majorCompactedOutputBytes.incr(bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Yes this is a get function that doesn't return anything. Thanks Hadoop for breaking all
|
* Yes this is a get function that doesn't return anything. Thanks Hadoop for breaking all
|
||||||
* expectations of java programmers. Instead of returning anything Hadoop metrics expects
|
* expectations of java programmers. Instead of returning anything Hadoop metrics expects
|
||||||
|
@ -252,6 +368,12 @@ public class MetricsRegionServerSourceImpl
|
||||||
rsWrap.getSplitQueueSize())
|
rsWrap.getSplitQueueSize())
|
||||||
.addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC),
|
.addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC),
|
||||||
rsWrap.getCompactionQueueSize())
|
rsWrap.getCompactionQueueSize())
|
||||||
|
.addGauge(Interns.info(SMALL_COMPACTION_QUEUE_LENGTH, SMALL_COMPACTION_QUEUE_LENGTH_DESC),
|
||||||
|
rsWrap.getSmallCompactionQueueSize())
|
||||||
|
.addGauge(Interns.info(LARGE_COMPACTION_QUEUE_LENGTH, LARGE_COMPACTION_QUEUE_LENGTH_DESC),
|
||||||
|
rsWrap.getLargeCompactionQueueSize())
|
||||||
|
.addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC),
|
||||||
|
rsWrap.getCompactionQueueSize())
|
||||||
.addGauge(Interns.info(FLUSH_QUEUE_LENGTH, FLUSH_QUEUE_LENGTH_DESC),
|
.addGauge(Interns.info(FLUSH_QUEUE_LENGTH, FLUSH_QUEUE_LENGTH_DESC),
|
||||||
rsWrap.getFlushQueueSize())
|
rsWrap.getFlushQueueSize())
|
||||||
.addGauge(Interns.info(BLOCK_CACHE_FREE_SIZE, BLOCK_CACHE_FREE_DESC),
|
.addGauge(Interns.info(BLOCK_CACHE_FREE_SIZE, BLOCK_CACHE_FREE_DESC),
|
||||||
|
|
|
@ -66,6 +66,7 @@ public class FlushTableSubprocedure extends Subprocedure {
|
||||||
try {
|
try {
|
||||||
LOG.debug("Flush region " + region.toString() + " started...");
|
LOG.debug("Flush region " + region.toString() + " started...");
|
||||||
region.flush(true);
|
region.flush(true);
|
||||||
|
// TODO: flush result is not checked?
|
||||||
} finally {
|
} finally {
|
||||||
LOG.debug("Closing region operation on " + region);
|
LOG.debug("Closing region operation on " + region);
|
||||||
region.closeRegionOperation();
|
region.closeRegionOperation();
|
||||||
|
|
|
@ -119,12 +119,6 @@ public class DefaultStoreEngine extends StoreEngine<
|
||||||
return request != null;
|
return request != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Path> compact(ThroughputController throughputController)
|
|
||||||
throws IOException {
|
|
||||||
return compact(throughputController, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Path> compact(ThroughputController throughputController, User user)
|
public List<Path> compact(ThroughputController throughputController, User user)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -1154,6 +1154,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
return memstoreSize.get();
|
return memstoreSize.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public RegionServicesForStores getRegionServicesForStores() {
|
public RegionServicesForStores getRegionServicesForStores() {
|
||||||
return regionServicesForStores;
|
return regionServicesForStores;
|
||||||
}
|
}
|
||||||
|
@ -2444,6 +2445,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
// Otherwise, the snapshot content while backed up in the wal, it will not
|
// Otherwise, the snapshot content while backed up in the wal, it will not
|
||||||
// be part of the current running servers state.
|
// be part of the current running servers state.
|
||||||
boolean compactionRequested = false;
|
boolean compactionRequested = false;
|
||||||
|
long flushedOutputFileSize = 0;
|
||||||
try {
|
try {
|
||||||
// A. Flush memstore to all the HStores.
|
// A. Flush memstore to all the HStores.
|
||||||
// Keep running vector of all store files that includes both old and the
|
// Keep running vector of all store files that includes both old and the
|
||||||
|
@ -2470,6 +2472,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
|
if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
|
||||||
totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
|
totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
|
||||||
}
|
}
|
||||||
|
flushedOutputFileSize += flush.getOutputFileSize();
|
||||||
}
|
}
|
||||||
storeFlushCtxs.clear();
|
storeFlushCtxs.clear();
|
||||||
|
|
||||||
|
@ -2555,10 +2558,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
status.setStatus(msg);
|
status.setStatus(msg);
|
||||||
|
|
||||||
|
if (rsServices != null && rsServices.getMetrics() != null) {
|
||||||
|
rsServices.getMetrics().updateFlush(time - startTime,
|
||||||
|
totalFlushableSizeOfFlushableStores, flushedOutputFileSize);
|
||||||
|
}
|
||||||
|
|
||||||
return new FlushResultImpl(compactionRequested ?
|
return new FlushResultImpl(compactionRequested ?
|
||||||
FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
|
FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
|
||||||
FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
|
FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
|
||||||
flushOpSeqId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -620,6 +620,7 @@ public class HRegionServer extends HasThread implements
|
||||||
|
|
||||||
if (!SystemUtils.IS_OS_WINDOWS) {
|
if (!SystemUtils.IS_OS_WINDOWS) {
|
||||||
Signal.handle(new Signal("HUP"), new SignalHandler() {
|
Signal.handle(new Signal("HUP"), new SignalHandler() {
|
||||||
|
@Override
|
||||||
public void handle(Signal signal) {
|
public void handle(Signal signal) {
|
||||||
getConfiguration().reloadConfiguration();
|
getConfiguration().reloadConfiguration();
|
||||||
configurationManager.notifyAllObservers(getConfiguration());
|
configurationManager.notifyAllObservers(getConfiguration());
|
||||||
|
@ -3418,4 +3419,9 @@ public class HRegionServer extends HasThread implements
|
||||||
}
|
}
|
||||||
this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
|
this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MetricsRegionServer getMetrics() {
|
||||||
|
return metricsRegionServer;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -184,6 +184,7 @@ public class HStore implements Store {
|
||||||
private volatile long compactedCellsCount = 0;
|
private volatile long compactedCellsCount = 0;
|
||||||
private volatile long majorCompactedCellsCount = 0;
|
private volatile long majorCompactedCellsCount = 0;
|
||||||
private volatile long flushedCellsSize = 0;
|
private volatile long flushedCellsSize = 0;
|
||||||
|
private volatile long flushedOutputFileSize = 0;
|
||||||
private volatile long compactedCellsSize = 0;
|
private volatile long compactedCellsSize = 0;
|
||||||
private volatile long majorCompactedCellsSize = 0;
|
private volatile long majorCompactedCellsSize = 0;
|
||||||
|
|
||||||
|
@ -1210,6 +1211,7 @@ public class HStore implements Store {
|
||||||
// Commence the compaction.
|
// Commence the compaction.
|
||||||
List<Path> newFiles = compaction.compact(throughputController, user);
|
List<Path> newFiles = compaction.compact(throughputController, user);
|
||||||
|
|
||||||
|
long outputBytes = 0L;
|
||||||
// TODO: get rid of this!
|
// TODO: get rid of this!
|
||||||
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
|
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
|
||||||
LOG.warn("hbase.hstore.compaction.complete is set to false");
|
LOG.warn("hbase.hstore.compaction.complete is set to false");
|
||||||
|
@ -1235,10 +1237,23 @@ public class HStore implements Store {
|
||||||
compactedCellsCount += getCompactionProgress().totalCompactingKVs;
|
compactedCellsCount += getCompactionProgress().totalCompactingKVs;
|
||||||
compactedCellsSize += getCompactionProgress().totalCompactedSize;
|
compactedCellsSize += getCompactionProgress().totalCompactedSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (StoreFile sf : sfs) {
|
||||||
|
outputBytes += sf.getReader().length();
|
||||||
|
}
|
||||||
|
|
||||||
// At this point the store will use new files for all new scanners.
|
// At this point the store will use new files for all new scanners.
|
||||||
completeCompaction(filesToCompact); // update store size.
|
completeCompaction(filesToCompact); // update store size.
|
||||||
|
|
||||||
logCompactionEndMessage(cr, sfs, compactionStartTime);
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
if (region.getRegionServerServices() != null
|
||||||
|
&& region.getRegionServerServices().getMetrics() != null) {
|
||||||
|
region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
|
||||||
|
now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
|
||||||
|
outputBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
logCompactionEndMessage(cr, sfs, now, compactionStartTime);
|
||||||
return sfs;
|
return sfs;
|
||||||
} finally {
|
} finally {
|
||||||
finishCompactionRequest(cr);
|
finishCompactionRequest(cr);
|
||||||
|
@ -1330,8 +1345,7 @@ public class HStore implements Store {
|
||||||
* @param compactionStartTime Start time.
|
* @param compactionStartTime Start time.
|
||||||
*/
|
*/
|
||||||
private void logCompactionEndMessage(
|
private void logCompactionEndMessage(
|
||||||
CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
|
CompactionRequest cr, List<StoreFile> sfs, long now, long compactionStartTime) {
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
StringBuilder message = new StringBuilder(
|
StringBuilder message = new StringBuilder(
|
||||||
"Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
|
"Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
|
||||||
+ cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
|
+ cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
|
||||||
|
@ -2129,6 +2143,7 @@ public class HStore implements Store {
|
||||||
private List<Path> committedFiles;
|
private List<Path> committedFiles;
|
||||||
private long cacheFlushCount;
|
private long cacheFlushCount;
|
||||||
private long cacheFlushSize;
|
private long cacheFlushSize;
|
||||||
|
private long outputFileSize;
|
||||||
|
|
||||||
private StoreFlusherImpl(long cacheFlushSeqNum) {
|
private StoreFlusherImpl(long cacheFlushSeqNum) {
|
||||||
this.cacheFlushSeqNum = cacheFlushSeqNum;
|
this.cacheFlushSeqNum = cacheFlushSeqNum;
|
||||||
|
@ -2163,7 +2178,9 @@ public class HStore implements Store {
|
||||||
List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
|
List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
|
||||||
for (Path storeFilePath : tempFiles) {
|
for (Path storeFilePath : tempFiles) {
|
||||||
try {
|
try {
|
||||||
storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
|
StoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
|
||||||
|
outputFileSize += sf.getReader().length();
|
||||||
|
storeFiles.add(sf);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
LOG.error("Failed to commit store file " + storeFilePath, ex);
|
LOG.error("Failed to commit store file " + storeFilePath, ex);
|
||||||
// Try to delete the files we have committed before.
|
// Try to delete the files we have committed before.
|
||||||
|
@ -2189,11 +2206,17 @@ public class HStore implements Store {
|
||||||
|
|
||||||
HStore.this.flushedCellsCount += cacheFlushCount;
|
HStore.this.flushedCellsCount += cacheFlushCount;
|
||||||
HStore.this.flushedCellsSize += cacheFlushSize;
|
HStore.this.flushedCellsSize += cacheFlushSize;
|
||||||
|
HStore.this.flushedOutputFileSize += outputFileSize;
|
||||||
|
|
||||||
// Add new file to store files. Clear snapshot too while we have the Store write lock.
|
// Add new file to store files. Clear snapshot too while we have the Store write lock.
|
||||||
return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
|
return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getOutputFileSize() {
|
||||||
|
return outputFileSize;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Path> getCommittedFiles() {
|
public List<Path> getCommittedFiles() {
|
||||||
return committedFiles;
|
return committedFiles;
|
||||||
|
@ -2257,7 +2280,7 @@ public class HStore implements Store {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final long FIXED_OVERHEAD =
|
public static final long FIXED_OVERHEAD =
|
||||||
ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
|
ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG)
|
||||||
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
|
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
|
||||||
|
|
||||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
||||||
|
@ -2304,6 +2327,11 @@ public class HStore implements Store {
|
||||||
return flushedCellsSize;
|
return flushedCellsSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getFlushedOutputFileSize() {
|
||||||
|
return flushedOutputFileSize;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getCompactedCellsCount() {
|
public long getCompactedCellsCount() {
|
||||||
return compactedCellsCount;
|
return compactedCellsCount;
|
||||||
|
|
|
@ -486,25 +486,16 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
*/
|
*/
|
||||||
private boolean flushRegion(final Region region, final boolean emergencyFlush,
|
private boolean flushRegion(final Region region, final boolean emergencyFlush,
|
||||||
boolean forceFlushAllStores) {
|
boolean forceFlushAllStores) {
|
||||||
long startTime = 0;
|
|
||||||
synchronized (this.regionsInQueue) {
|
synchronized (this.regionsInQueue) {
|
||||||
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
|
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
|
||||||
// Use the start time of the FlushRegionEntry if available
|
// Use the start time of the FlushRegionEntry if available
|
||||||
if (fqe != null) {
|
|
||||||
startTime = fqe.createTime;
|
|
||||||
}
|
|
||||||
if (fqe != null && emergencyFlush) {
|
if (fqe != null && emergencyFlush) {
|
||||||
// Need to remove from region from delay queue. When NOT an
|
// Need to remove from region from delay queue. When NOT an
|
||||||
// emergencyFlush, then item was removed via a flushQueue.poll.
|
// emergencyFlush, then item was removed via a flushQueue.poll.
|
||||||
flushQueue.remove(fqe);
|
flushQueue.remove(fqe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (startTime == 0) {
|
|
||||||
// Avoid getting the system time unless we don't have a FlushRegionEntry;
|
|
||||||
// shame we can't capture the time also spent in the above synchronized
|
|
||||||
// block
|
|
||||||
startTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
}
|
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
notifyFlushRequest(region, emergencyFlush);
|
notifyFlushRequest(region, emergencyFlush);
|
||||||
|
@ -518,10 +509,6 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
server.compactSplitThread.requestSystemCompaction(
|
server.compactSplitThread.requestSystemCompaction(
|
||||||
region, Thread.currentThread().getName());
|
region, Thread.currentThread().getName());
|
||||||
}
|
}
|
||||||
if (flushResult.isFlushSucceeded()) {
|
|
||||||
long endTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
server.metricsRegionServer.updateFlushTime(endTime - startTime);
|
|
||||||
}
|
|
||||||
} catch (DroppedSnapshotException ex) {
|
} catch (DroppedSnapshotException ex) {
|
||||||
// Cache flush can fail in a few places. If it fails in a critical
|
// Cache flush can fail in a few places. If it fails in a critical
|
||||||
// section, we get a DroppedSnapshotException and a replay of wal
|
// section, we get a DroppedSnapshotException and a replay of wal
|
||||||
|
|
|
@ -117,7 +117,18 @@ public class MetricsRegionServer {
|
||||||
serverSource.incrSplitSuccess();
|
serverSource.incrSplitSuccess();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateFlushTime(long t) {
|
public void updateFlush(long t, long memstoreSize, long fileSize) {
|
||||||
serverSource.updateFlushTime(t);
|
serverSource.updateFlushTime(t);
|
||||||
|
serverSource.updateFlushMemstoreSize(memstoreSize);
|
||||||
|
serverSource.updateFlushOutputSize(fileSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateCompaction(boolean isMajor, long t, int inputFileCount, int outputFileCount,
|
||||||
|
long inputBytes, long outputBytes) {
|
||||||
|
serverSource.updateCompactionTime(isMajor, t);
|
||||||
|
serverSource.updateCompactionInputFileCount(isMajor, inputFileCount);
|
||||||
|
serverSource.updateCompactionOutputFileCount(isMajor, outputFileCount);
|
||||||
|
serverSource.updateCompactionInputSize(isMajor, inputBytes);
|
||||||
|
serverSource.updateCompactionOutputSize(isMajor, outputBytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,7 +166,6 @@ import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.Leases.Lease;
|
import org.apache.hadoop.hbase.regionserver.Leases.Lease;
|
||||||
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
|
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
|
||||||
|
@ -1421,14 +1420,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
if (shouldFlush) {
|
if (shouldFlush) {
|
||||||
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
|
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
|
||||||
request.getWriteFlushWalMarker() : false;
|
request.getWriteFlushWalMarker() : false;
|
||||||
long startTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
// Go behind the curtain so we can manage writing of the flush WAL marker
|
// Go behind the curtain so we can manage writing of the flush WAL marker
|
||||||
HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
|
HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
|
||||||
((HRegion)region).flushcache(true, writeFlushWalMarker);
|
((HRegion)region).flushcache(true, writeFlushWalMarker);
|
||||||
if (flushResult.isFlushSucceeded()) {
|
|
||||||
long endTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
|
|
||||||
}
|
|
||||||
boolean compactionNeeded = flushResult.isCompactionNeeded();
|
boolean compactionNeeded = flushResult.isCompactionNeeded();
|
||||||
if (compactionNeeded) {
|
if (compactionNeeded) {
|
||||||
regionServer.compactSplitThread.requestSystemCompaction(region,
|
regionServer.compactSplitThread.requestSystemCompaction(region,
|
||||||
|
@ -1567,18 +1561,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
LOG.info("Receiving merging request for " + regionA + ", " + regionB
|
LOG.info("Receiving merging request for " + regionA + ", " + regionB
|
||||||
+ ",forcible=" + forcible);
|
+ ",forcible=" + forcible);
|
||||||
long startTime = EnvironmentEdgeManager.currentTime();
|
regionA.flush(true);
|
||||||
FlushResult flushResult = regionA.flush(true);
|
regionB.flush(true);
|
||||||
if (flushResult.isFlushSucceeded()) {
|
|
||||||
long endTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
|
|
||||||
}
|
|
||||||
startTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
flushResult = regionB.flush(true);
|
|
||||||
if (flushResult.isFlushSucceeded()) {
|
|
||||||
long endTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
|
|
||||||
}
|
|
||||||
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
|
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
|
||||||
masterSystemTime, RpcServer.getRequestUser());
|
masterSystemTime, RpcServer.getRequestUser());
|
||||||
return MergeRegionsResponse.newBuilder().build();
|
return MergeRegionsResponse.newBuilder().build();
|
||||||
|
@ -1991,12 +1975,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
+ "Replicas are auto-split when their primary is split.");
|
+ "Replicas are auto-split when their primary is split.");
|
||||||
}
|
}
|
||||||
LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
|
LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
|
||||||
long startTime = EnvironmentEdgeManager.currentTime();
|
region.flush(true);
|
||||||
FlushResult flushResult = region.flush(true);
|
|
||||||
if (flushResult.isFlushSucceeded()) {
|
|
||||||
long endTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
|
|
||||||
}
|
|
||||||
byte[] splitPoint = null;
|
byte[] splitPoint = null;
|
||||||
if (request.hasSplitPoint()) {
|
if (request.hasSplitPoint()) {
|
||||||
splitPoint = request.getSplitPoint().toByteArray();
|
splitPoint = request.getSplitPoint().toByteArray();
|
||||||
|
|
|
@ -244,4 +244,9 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
|
||||||
* global memstore size already exceeds lower limit.
|
* global memstore size already exceeds lower limit.
|
||||||
*/
|
*/
|
||||||
double getFlushPressure();
|
double getFlushPressure();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the metrics tracker for the region server
|
||||||
|
*/
|
||||||
|
MetricsRegionServer getMetrics();
|
||||||
}
|
}
|
||||||
|
|
|
@ -438,6 +438,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
|
||||||
*/
|
*/
|
||||||
long getFlushedCellsSize();
|
long getFlushedCellsSize();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The total size of out output files on disk, in bytes
|
||||||
|
*/
|
||||||
|
long getFlushedOutputFileSize();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The number of cells processed during minor compactions
|
* @return The number of cells processed during minor compactions
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -85,4 +85,9 @@ interface StoreFlushContext {
|
||||||
* @return a list of Paths for new files
|
* @return a list of Paths for new files
|
||||||
*/
|
*/
|
||||||
List<Path> getCommittedFiles();
|
List<Path> getCommittedFiles();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the total file size for flush output files, in bytes
|
||||||
|
*/
|
||||||
|
long getOutputFileSize();
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,13 +99,6 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Path> compact(ThroughputController throughputController)
|
|
||||||
throws IOException {
|
|
||||||
Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
|
|
||||||
return this.stripeRequest.execute(compactor, throughputController, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Path> compact(ThroughputController throughputController, User user)
|
public List<Path> compact(ThroughputController throughputController, User user)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -66,13 +66,6 @@ public abstract class CompactionContext {
|
||||||
this.request = request;
|
this.request = request;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Runs the compaction based on current selection. select/forceSelect must have been called.
|
|
||||||
* @return The new file paths resulting from compaction.
|
|
||||||
*/
|
|
||||||
public abstract List<Path> compact(ThroughputController throughputController)
|
|
||||||
throws IOException;
|
|
||||||
|
|
||||||
public abstract List<Path> compact(ThroughputController throughputController, User user)
|
public abstract List<Path> compact(ThroughputController throughputController, User user)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
|
|
@ -103,6 +103,7 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
|
||||||
this.regionName = other.regionName;
|
this.regionName = other.regionName;
|
||||||
this.storeName = other.storeName;
|
this.storeName = other.storeName;
|
||||||
this.totalSize = other.totalSize;
|
this.totalSize = other.totalSize;
|
||||||
|
recalculateSize();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,10 +226,12 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
|
||||||
Collections2.transform(Collections2.filter(
|
Collections2.transform(Collections2.filter(
|
||||||
this.getFiles(),
|
this.getFiles(),
|
||||||
new Predicate<StoreFile>() {
|
new Predicate<StoreFile>() {
|
||||||
|
@Override
|
||||||
public boolean apply(StoreFile sf) {
|
public boolean apply(StoreFile sf) {
|
||||||
return sf.getReader() != null;
|
return sf.getReader() != null;
|
||||||
}
|
}
|
||||||
}), new Function<StoreFile, String>() {
|
}), new Function<StoreFile, String>() {
|
||||||
|
@Override
|
||||||
public String apply(StoreFile sf) {
|
public String apply(StoreFile sf) {
|
||||||
return StringUtils.humanReadableInt(
|
return StringUtils.humanReadableInt(
|
||||||
(sf.getReader() == null) ? 0 : sf.getReader().length());
|
(sf.getReader() == null) ? 0 : sf.getReader().length());
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
|
||||||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||||
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.Leases;
|
import org.apache.hadoop.hbase.regionserver.Leases;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
@ -319,6 +320,7 @@ public class MockRegionServerServices implements RegionServerServices {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ThroughputController getFlushThroughputController() {
|
public ThroughputController getFlushThroughputController() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -327,4 +329,9 @@ public class MockRegionServerServices implements RegionServerServices {
|
||||||
public double getFlushPressure() {
|
public double getFlushPressure() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MetricsRegionServer getMetrics() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.Leases;
|
import org.apache.hadoop.hbase.regionserver.Leases;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
@ -664,6 +665,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ThroughputController getFlushThroughputController() {
|
public ThroughputController getFlushThroughputController() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -672,4 +674,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
public double getFlushPressure() {
|
public double getFlushPressure() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MetricsRegionServer getMetrics() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -158,6 +158,7 @@ public class TestCompaction {
|
||||||
|
|
||||||
HRegion spyR = spy(r);
|
HRegion spyR = spy(r);
|
||||||
doAnswer(new Answer() {
|
doAnswer(new Answer() {
|
||||||
|
@Override
|
||||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||||
r.writestate.writesEnabled = false;
|
r.writestate.writesEnabled = false;
|
||||||
return invocation.callRealMethod();
|
return invocation.callRealMethod();
|
||||||
|
@ -362,12 +363,6 @@ public class TestCompaction {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Path> compact(ThroughputController throughputController)
|
|
||||||
throws IOException {
|
|
||||||
return compact(throughputController, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Path> compact(ThroughputController throughputController, User user)
|
public List<Path> compact(ThroughputController throughputController, User user)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -420,12 +415,6 @@ public class TestCompaction {
|
||||||
synchronized (this) { this.notifyAll(); }
|
synchronized (this) { this.notifyAll(); }
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Path> compact(ThroughputController throughputController)
|
|
||||||
throws IOException {
|
|
||||||
return compact(throughputController, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Path> compact(ThroughputController throughputController, User user)
|
public List<Path> compact(ThroughputController throughputController, User user)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -467,6 +456,7 @@ public class TestCompaction {
|
||||||
@Override
|
@Override
|
||||||
public void cancelCompaction(Object object) {}
|
public void cancelCompaction(Object object) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getPriority() {
|
public int getPriority() {
|
||||||
return Integer.MIN_VALUE; // some invalid value, see createStoreMock
|
return Integer.MIN_VALUE; // some invalid value, see createStoreMock
|
||||||
}
|
}
|
||||||
|
@ -511,9 +501,10 @@ public class TestCompaction {
|
||||||
when(
|
when(
|
||||||
r.compact(any(CompactionContext.class), any(Store.class),
|
r.compact(any(CompactionContext.class), any(Store.class),
|
||||||
any(ThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
|
any(ThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
|
||||||
|
@Override
|
||||||
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
||||||
invocation.getArgumentAt(0, CompactionContext.class).compact(
|
invocation.getArgumentAt(0, CompactionContext.class).compact(
|
||||||
invocation.getArgumentAt(2, ThroughputController.class));
|
invocation.getArgumentAt(2, ThroughputController.class), null);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -138,5 +138,69 @@ public class TestMetricsRegionServer {
|
||||||
HELPER.assertCounter("slowIncrementCount", 15, serverSource);
|
HELPER.assertCounter("slowIncrementCount", 15, serverSource);
|
||||||
HELPER.assertCounter("slowPutCount", 16, serverSource);
|
HELPER.assertCounter("slowPutCount", 16, serverSource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String FLUSH_TIME = "flushTime";
|
||||||
|
String FLUSH_TIME_DESC = "Histogram for the time in millis for memstore flush";
|
||||||
|
String FLUSH_MEMSTORE_SIZE = "flushMemstoreSize";
|
||||||
|
String FLUSH_MEMSTORE_SIZE_DESC = "Histogram for number of bytes in the memstore for a flush";
|
||||||
|
String FLUSH_FILE_SIZE = "flushFileSize";
|
||||||
|
String FLUSH_FILE_SIZE_DESC = "Histogram for number of bytes in the resulting file for a flush";
|
||||||
|
String FLUSHED_OUTPUT_BYTES = "flushedOutputBytes";
|
||||||
|
String FLUSHED_OUTPUT_BYTES_DESC = "Total number of bytes written from flush";
|
||||||
|
String FLUSHED_MEMSTORE_BYTES = "flushedMemstoreBytes";
|
||||||
|
String FLUSHED_MEMSTORE_BYTES_DESC = "Total number of bytes of cells in memstore from flush";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFlush() {
|
||||||
|
rsm.updateFlush(1, 2, 3);
|
||||||
|
HELPER.assertCounter("flushTime_num_ops", 1, serverSource);
|
||||||
|
HELPER.assertCounter("flushMemstoreSize_num_ops", 1, serverSource);
|
||||||
|
HELPER.assertCounter("flushOutputSize_num_ops", 1, serverSource);
|
||||||
|
HELPER.assertCounter("flushedMemstoreBytes", 2, serverSource);
|
||||||
|
HELPER.assertCounter("flushedOutputBytes", 3, serverSource);
|
||||||
|
|
||||||
|
rsm.updateFlush(10, 20, 30);
|
||||||
|
HELPER.assertCounter("flushTimeNumOps", 2, serverSource);
|
||||||
|
HELPER.assertCounter("flushMemstoreSize_num_ops", 2, serverSource);
|
||||||
|
HELPER.assertCounter("flushOutputSize_num_ops", 2, serverSource);
|
||||||
|
HELPER.assertCounter("flushedMemstoreBytes", 22, serverSource);
|
||||||
|
HELPER.assertCounter("flushedOutputBytes", 33, serverSource);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompaction() {
|
||||||
|
rsm.updateCompaction(false, 1, 2, 3, 4, 5);
|
||||||
|
HELPER.assertCounter("compactionTime_num_ops", 1, serverSource);
|
||||||
|
HELPER.assertCounter("compactionInputFileCount_num_ops", 1, serverSource);
|
||||||
|
HELPER.assertCounter("compactionInputSize_num_ops", 1, serverSource);
|
||||||
|
HELPER.assertCounter("compactionOutputFileCount_num_ops", 1, serverSource);
|
||||||
|
HELPER.assertCounter("compactedInputBytes", 4, serverSource);
|
||||||
|
HELPER.assertCounter("compactedoutputBytes", 5, serverSource);
|
||||||
|
|
||||||
|
rsm.updateCompaction(false, 10, 20, 30, 40, 50);
|
||||||
|
HELPER.assertCounter("compactionTime_num_ops", 2, serverSource);
|
||||||
|
HELPER.assertCounter("compactionInputFileCount_num_ops", 2, serverSource);
|
||||||
|
HELPER.assertCounter("compactionInputSize_num_ops", 2, serverSource);
|
||||||
|
HELPER.assertCounter("compactionOutputFileCount_num_ops", 2, serverSource);
|
||||||
|
HELPER.assertCounter("compactedInputBytes", 44, serverSource);
|
||||||
|
HELPER.assertCounter("compactedoutputBytes", 55, serverSource);
|
||||||
|
|
||||||
|
// do major compaction
|
||||||
|
rsm.updateCompaction(true, 100, 200, 300, 400, 500);
|
||||||
|
|
||||||
|
HELPER.assertCounter("compactionTime_num_ops", 3, serverSource);
|
||||||
|
HELPER.assertCounter("compactionInputFileCount_num_ops", 3, serverSource);
|
||||||
|
HELPER.assertCounter("compactionInputSize_num_ops", 3, serverSource);
|
||||||
|
HELPER.assertCounter("compactionOutputFileCount_num_ops", 3, serverSource);
|
||||||
|
HELPER.assertCounter("compactedInputBytes", 444, serverSource);
|
||||||
|
HELPER.assertCounter("compactedoutputBytes", 555, serverSource);
|
||||||
|
|
||||||
|
HELPER.assertCounter("majorCompactionTime_num_ops", 1, serverSource);
|
||||||
|
HELPER.assertCounter("majorCompactionInputFileCount_num_ops", 1, serverSource);
|
||||||
|
HELPER.assertCounter("majorCompactionInputSize_num_ops", 1, serverSource);
|
||||||
|
HELPER.assertCounter("majorCompactionOutputFileCount_num_ops", 1, serverSource);
|
||||||
|
HELPER.assertCounter("majorCompactedInputBytes", 400, serverSource);
|
||||||
|
HELPER.assertCounter("majorCompactedoutputBytes", 500, serverSource);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class TestStripeStoreEngine {
|
||||||
assertEquals(2, compaction.getRequest().getFiles().size());
|
assertEquals(2, compaction.getRequest().getFiles().size());
|
||||||
assertFalse(compaction.getRequest().getFiles().contains(sf));
|
assertFalse(compaction.getRequest().getFiles().contains(sf));
|
||||||
// Make sure the correct method it called on compactor.
|
// Make sure the correct method it called on compactor.
|
||||||
compaction.compact(NoLimitThroughputController.INSTANCE);
|
compaction.compact(NoLimitThroughputController.INSTANCE, null);
|
||||||
verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
|
verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
|
||||||
StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null,
|
StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null,
|
||||||
NoLimitThroughputController.INSTANCE, null);
|
NoLimitThroughputController.INSTANCE, null);
|
||||||
|
|
Loading…
Reference in New Issue