HBASE-11702 Better introspection of long running compactions

This commit is contained in:
Andrew Purtell 2014-08-14 09:12:52 -07:00
parent 4ed32bd77e
commit 6367074381
9 changed files with 269 additions and 3 deletions

View File

@ -218,5 +218,20 @@ public interface MetricsRegionServerSource extends BaseSource {
String SLOW_APPEND_DESC = String SLOW_APPEND_DESC =
"The number of Appends that took over 1000ms to complete"; "The number of Appends that took over 1000ms to complete";
String FLUSHED_CELLS = "flushedCellsCount";
String FLUSHED_CELLS_DESC = "The number of cells flushed to disk";
String FLUSHED_CELLS_SIZE = "flushedCellsSize";
String FLUSHED_CELLS_SIZE_DESC = "The total amount of data flushed to disk, in bytes";
String COMPACTED_CELLS = "compactedCellsCount";
String COMPACTED_CELLS_DESC = "The number of cells processed during minor compactions";
String COMPACTED_CELLS_SIZE = "compactedCellsSize";
String COMPACTED_CELLS_SIZE_DESC =
"The total amount of data processed during minor compactions, in bytes";
String MAJOR_COMPACTED_CELLS = "majorCompactedCellsCount";
String MAJOR_COMPACTED_CELLS_DESC =
"The number of cells processed during major compactions";
String MAJOR_COMPACTED_CELLS_SIZE = "majorCompactedCellsSize";
String MAJOR_COMPACTED_CELLS_SIZE_DESC =
"The total amount of data processed during major compactions, in bytes";
} }

View File

@ -216,4 +216,34 @@ public interface MetricsRegionServerWrapper {
* Get the amount of time that updates were blocked. * Get the amount of time that updates were blocked.
*/ */
long getUpdatesBlockedTime(); long getUpdatesBlockedTime();
/**
* Get the number of cells flushed to disk.
*/
long getFlushedCellsCount();
/**
* Get the number of cells processed during minor compactions.
*/
long getCompactedCellsCount();
/**
* Get the number of cells processed during major compactions.
*/
long getMajorCompactedCellsCount();
/**
* Get the total amount of data flushed to disk, in bytes.
*/
long getFlushedCellsSize();
/**
* Get the total amount of data processed during minor compactions, in bytes.
*/
long getCompactedCellsSize();
/**
* Get the total amount of data processed during major compactions, in bytes.
*/
long getMajorCompactedCellsSize();
} }

View File

@ -207,6 +207,18 @@ public class MetricsRegionServerSourceImpl
BLOCK_CACHE_EXPRESS_HIT_PERCENT_DESC), rsWrap.getBlockCacheHitCachingPercent()) BLOCK_CACHE_EXPRESS_HIT_PERCENT_DESC), rsWrap.getBlockCacheHitCachingPercent())
.addCounter(Interns.info(UPDATES_BLOCKED_TIME, UPDATES_BLOCKED_DESC), .addCounter(Interns.info(UPDATES_BLOCKED_TIME, UPDATES_BLOCKED_DESC),
rsWrap.getUpdatesBlockedTime()) rsWrap.getUpdatesBlockedTime())
.addCounter(Interns.info(FLUSHED_CELLS, FLUSHED_CELLS_DESC),
rsWrap.getFlushedCellsCount())
.addCounter(Interns.info(COMPACTED_CELLS, COMPACTED_CELLS_DESC),
rsWrap.getCompactedCellsCount())
.addCounter(Interns.info(MAJOR_COMPACTED_CELLS, MAJOR_COMPACTED_CELLS_DESC),
rsWrap.getMajorCompactedCellsCount())
.addCounter(Interns.info(FLUSHED_CELLS_SIZE, FLUSHED_CELLS_SIZE_DESC),
rsWrap.getFlushedCellsSize())
.addCounter(Interns.info(COMPACTED_CELLS_SIZE, COMPACTED_CELLS_SIZE_DESC),
rsWrap.getCompactedCellsSize())
.addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC),
rsWrap.getMajorCompactedCellsSize())
.tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC),
rsWrap.getZookeeperQuorum()) rsWrap.getZookeeperQuorum())
.tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName()) .tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName())

View File

@ -189,6 +189,13 @@ public class HStore implements Store {
private Encryption.Context cryptoContext = Encryption.Context.NONE; private Encryption.Context cryptoContext = Encryption.Context.NONE;
private volatile long flushedCellsCount = 0;
private volatile long compactedCellsCount = 0;
private volatile long majorCompactedCellsCount = 0;
private volatile long flushedCellsSize = 0;
private volatile long compactedCellsSize = 0;
private volatile long majorCompactedCellsSize = 0;
/** /**
* Constructor * Constructor
* @param region * @param region
@ -1157,6 +1164,13 @@ public class HStore implements Store {
sfs = moveCompatedFilesIntoPlace(cr, newFiles); sfs = moveCompatedFilesIntoPlace(cr, newFiles);
writeCompactionWalRecord(filesToCompact, sfs); writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs);
if (cr.isMajor()) {
majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
} else {
compactedCellsCount += getCompactionProgress().totalCompactingKVs;
compactedCellsSize += getCompactionProgress().totalCompactedSize;
}
// At this point the store will use new files for all new scanners. // At this point the store will use new files for all new scanners.
completeCompaction(filesToCompact, true); // Archive old files & update store size. completeCompaction(filesToCompact, true); // Archive old files & update store size.
} finally { } finally {
@ -2037,6 +2051,8 @@ public class HStore implements Store {
private MemStoreSnapshot snapshot; private MemStoreSnapshot snapshot;
private List<Path> tempFiles; private List<Path> tempFiles;
private List<Path> committedFiles; private List<Path> committedFiles;
private long cacheFlushCount;
private long cacheFlushSize;
private StoreFlusherImpl(long cacheFlushSeqNum) { private StoreFlusherImpl(long cacheFlushSeqNum) {
this.cacheFlushSeqNum = cacheFlushSeqNum; this.cacheFlushSeqNum = cacheFlushSeqNum;
@ -2049,6 +2065,8 @@ public class HStore implements Store {
@Override @Override
public void prepare() { public void prepare() {
this.snapshot = memstore.snapshot(); this.snapshot = memstore.snapshot();
this.cacheFlushCount = snapshot.getCellsCount();
this.cacheFlushSize = snapshot.getSize();
committedFiles = new ArrayList<Path>(1); committedFiles = new ArrayList<Path>(1);
} }
@ -2088,6 +2106,10 @@ public class HStore implements Store {
} }
committedFiles.add(sf.getPath()); committedFiles.add(sf.getPath());
} }
HStore.this.flushedCellsCount += cacheFlushCount;
HStore.this.flushedCellsSize += cacheFlushSize;
// Add new file to store files. Clear snapshot too while we have the Store write lock. // Add new file to store files. Clear snapshot too while we have the Store write lock.
return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
} }
@ -2109,7 +2131,7 @@ public class HStore implements Store {
} }
public static final long FIXED_OVERHEAD = public static final long FIXED_OVERHEAD =
ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
@ -2145,4 +2167,34 @@ public class HStore implements Store {
public boolean hasTooManyStoreFiles() { public boolean hasTooManyStoreFiles() {
return getStorefilesCount() > this.blockingFileCount; return getStorefilesCount() > this.blockingFileCount;
} }
@Override
public long getFlushedCellsCount() {
return flushedCellsCount;
}
@Override
public long getFlushedCellsSize() {
return flushedCellsSize;
}
@Override
public long getCompactedCellsCount() {
return compactedCellsCount;
}
@Override
public long getCompactedCellsSize() {
return compactedCellsSize;
}
@Override
public long getMajorCompactedCellsCount() {
return majorCompactedCellsCount;
}
@Override
public long getMajorCompactedCellsSize() {
return majorCompactedCellsSize;
}
} }

View File

@ -66,6 +66,12 @@ class MetricsRegionServerWrapperImpl
private volatile long numMutationsWithoutWAL = 0; private volatile long numMutationsWithoutWAL = 0;
private volatile long dataInMemoryWithoutWAL = 0; private volatile long dataInMemoryWithoutWAL = 0;
private volatile int percentFileLocal = 0; private volatile int percentFileLocal = 0;
private volatile long flushedCellsCount = 0;
private volatile long compactedCellsCount = 0;
private volatile long majorCompactedCellsCount = 0;
private volatile long flushedCellsSize = 0;
private volatile long compactedCellsSize = 0;
private volatile long majorCompactedCellsSize = 0;
private CacheStats cacheStats; private CacheStats cacheStats;
private ScheduledExecutorService executor; private ScheduledExecutorService executor;
@ -353,6 +359,35 @@ class MetricsRegionServerWrapperImpl
return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().get(); return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().get();
} }
@Override
public long getFlushedCellsCount() {
return flushedCellsCount;
}
@Override
public long getCompactedCellsCount() {
return compactedCellsCount;
}
@Override
public long getMajorCompactedCellsCount() {
return majorCompactedCellsCount;
}
@Override
public long getFlushedCellsSize() {
return flushedCellsSize;
}
@Override
public long getCompactedCellsSize() {
return compactedCellsSize;
}
@Override
public long getMajorCompactedCellsSize() {
return majorCompactedCellsSize;
}
/** /**
* This is the runnable that will be executed on the executor every PERIOD number of seconds * This is the runnable that will be executed on the executor every PERIOD number of seconds
@ -386,7 +421,12 @@ class MetricsRegionServerWrapperImpl
long tempNumMutationsWithoutWAL = 0; long tempNumMutationsWithoutWAL = 0;
long tempDataInMemoryWithoutWAL = 0; long tempDataInMemoryWithoutWAL = 0;
int tempPercentFileLocal = 0; int tempPercentFileLocal = 0;
long tempFlushedCellsCount = 0;
long tempCompactedCellsCount = 0;
long tempMajorCompactedCellsCount = 0;
long tempFlushedCellsSize = 0;
long tempCompactedCellsSize = 0;
long tempMajorCompactedCellsSize = 0;
for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get(); tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get();
@ -403,6 +443,12 @@ class MetricsRegionServerWrapperImpl
tempStorefileIndexSize += store.getStorefilesIndexSize(); tempStorefileIndexSize += store.getStorefilesIndexSize();
tempTotalStaticBloomSize += store.getTotalStaticBloomSize(); tempTotalStaticBloomSize += store.getTotalStaticBloomSize();
tempTotalStaticIndexSize += store.getTotalStaticIndexSize(); tempTotalStaticIndexSize += store.getTotalStaticIndexSize();
tempFlushedCellsCount += store.getFlushedCellsCount();
tempCompactedCellsCount += store.getCompactedCellsCount();
tempMajorCompactedCellsCount += store.getMajorCompactedCellsCount();
tempFlushedCellsSize += store.getFlushedCellsSize();
tempCompactedCellsSize += store.getCompactedCellsSize();
tempMajorCompactedCellsSize += store.getMajorCompactedCellsSize();
} }
hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution()); hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
@ -459,6 +505,12 @@ class MetricsRegionServerWrapperImpl
numMutationsWithoutWAL = tempNumMutationsWithoutWAL; numMutationsWithoutWAL = tempNumMutationsWithoutWAL;
dataInMemoryWithoutWAL = tempDataInMemoryWithoutWAL; dataInMemoryWithoutWAL = tempDataInMemoryWithoutWAL;
percentFileLocal = tempPercentFileLocal; percentFileLocal = tempPercentFileLocal;
flushedCellsCount = tempFlushedCellsCount;
compactedCellsCount = tempCompactedCellsCount;
majorCompactedCellsCount = tempMajorCompactedCellsCount;
flushedCellsSize = tempFlushedCellsSize;
compactedCellsSize = tempCompactedCellsSize;
majorCompactedCellsSize = tempMajorCompactedCellsSize;
} }
} }
} }

View File

@ -343,6 +343,36 @@ public interface Store extends HeapSize, StoreConfigInformation {
TableName getTableName(); TableName getTableName();
/**
* @return The number of cells flushed to disk
*/
long getFlushedCellsCount();
/**
* @return The total size of data flushed to disk, in bytes
*/
long getFlushedCellsSize();
/**
* @return The number of cells processed during minor compactions
*/
long getCompactedCellsCount();
/**
* @return The total amount of data processed during minor compactions, in bytes
*/
long getCompactedCellsSize();
/**
* @return The number of cells processed during major compactions
*/
long getMajorCompactedCellsCount();
/**
* @return The total amount of data processed during major compactions, in bytes
*/
long getMajorCompactedCellsSize();
/* /*
* @param o Observer who wants to know about changes in set of Readers * @param o Observer who wants to know about changes in set of Readers
*/ */

View File

@ -37,6 +37,8 @@ public class CompactionProgress {
public long totalCompactingKVs; public long totalCompactingKVs;
/** the completed count of key values in currently running compaction */ /** the completed count of key values in currently running compaction */
public long currentCompactedKVs = 0; public long currentCompactedKVs = 0;
/** the total size of data processed by the currently running compaction, in bytes */
public long totalCompactedSize = 0;
/** Constructor /** Constructor
* @param totalCompactingKVs the total Key/Value pairs to be compacted * @param totalCompactingKVs the total Key/Value pairs to be compacted
@ -66,4 +68,31 @@ public class CompactionProgress {
public void complete() { public void complete() {
this.totalCompactingKVs = this.currentCompactedKVs; this.totalCompactingKVs = this.currentCompactedKVs;
} }
/**
* @return the total compacting key values in currently running compaction
*/
public long getTotalCompactingKvs() {
return totalCompactingKVs;
}
/**
* @return the completed count of key values in currently running compaction
*/
public long getCurrentCompactedKvs() {
return currentCompactedKVs;
}
/**
* @return the total data size processed by the currently running compaction, in bytes
*/
public long getTotalCompactedSize() {
return totalCompactedSize;
}
@Override
public String toString() {
return String.format("%d/%d (%.2f%%)", currentCompactedKVs, totalCompactingKVs,
100 * getProgressPct());
}
} }

View File

@ -227,8 +227,13 @@ public abstract class Compactor {
// Since scanner.next() can return 'false' but still be delivering data, // Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop. // we have to use a do/while loop.
List<Cell> kvs = new ArrayList<Cell>(); List<Cell> kvs = new ArrayList<Cell>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
int closeCheckInterval = HStore.getCloseCheckInterval(); int closeCheckInterval = HStore.getCloseCheckInterval();
long lastMillis;
if (LOG.isDebugEnabled()) {
lastMillis = System.currentTimeMillis();
} else {
lastMillis = 0;
}
boolean hasMore; boolean hasMore;
do { do {
hasMore = scanner.next(kvs, compactionKVMax); hasMore = scanner.next(kvs, compactionKVMax);
@ -240,11 +245,22 @@ public abstract class Compactor {
} }
writer.append(kv); writer.append(kv);
++progress.currentCompactedKVs; ++progress.currentCompactedKVs;
progress.totalCompactedSize += kv.getLength();
// check periodically to see if a system stop is requested // check periodically to see if a system stop is requested
if (closeCheckInterval > 0) { if (closeCheckInterval > 0) {
bytesWritten += kv.getLength(); bytesWritten += kv.getLength();
if (bytesWritten > closeCheckInterval) { if (bytesWritten > closeCheckInterval) {
// Log the progress of long running compactions every minute if
// logging at DEBUG level
if (LOG.isDebugEnabled()) {
long now = System.currentTimeMillis();
if ((now - lastMillis) >= 60 * 1000) {
LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec",
(bytesWritten / 1024.0) / ((now - lastMillis) / 1000.0)));
lastMillis = now;
}
}
bytesWritten = 0; bytesWritten = 0;
if (!store.areWritesEnabled()) { if (!store.areWritesEnabled()) {
progress.cancel(); progress.cancel();

View File

@ -211,4 +211,34 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe
return 1024000; return 1024000;
} }
@Override
public long getFlushedCellsCount() {
return 100000000;
}
@Override
public long getCompactedCellsCount() {
return 10000000;
}
@Override
public long getMajorCompactedCellsCount() {
return 1000000;
}
@Override
public long getFlushedCellsSize() {
return 1024000000;
}
@Override
public long getCompactedCellsSize() {
return 102400000;
}
@Override
public long getMajorCompactedCellsSize() {
return 10240000;
}
} }