diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index cbe58547a47..43987946e68 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -218,5 +218,20 @@ public interface MetricsRegionServerSource extends BaseSource { String SLOW_APPEND_DESC = "The number of Appends that took over 1000ms to complete"; + String FLUSHED_CELLS = "flushedCellsCount"; + String FLUSHED_CELLS_DESC = "The number of cells flushed to disk"; + String FLUSHED_CELLS_SIZE = "flushedCellsSize"; + String FLUSHED_CELLS_SIZE_DESC = "The total amount of data flushed to disk, in bytes"; + String COMPACTED_CELLS = "compactedCellsCount"; + String COMPACTED_CELLS_DESC = "The number of cells processed during minor compactions"; + String COMPACTED_CELLS_SIZE = "compactedCellsSize"; + String COMPACTED_CELLS_SIZE_DESC = + "The total amount of data processed during minor compactions, in bytes"; + String MAJOR_COMPACTED_CELLS = "majorCompactedCellsCount"; + String MAJOR_COMPACTED_CELLS_DESC = + "The number of cells processed during major compactions"; + String MAJOR_COMPACTED_CELLS_SIZE = "majorCompactedCellsSize"; + String MAJOR_COMPACTED_CELLS_SIZE_DESC = + "The total amount of data processed during major compactions, in bytes"; } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index 057c48d1f02..998bd17e67a 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -216,4 +216,34 @@ public interface MetricsRegionServerWrapper { * Get the amount of time that updates were blocked. */ long getUpdatesBlockedTime(); + + /** + * Get the number of cells flushed to disk. + */ + long getFlushedCellsCount(); + + /** + * Get the number of cells processed during minor compactions. + */ + long getCompactedCellsCount(); + + /** + * Get the number of cells processed during major compactions. + */ + long getMajorCompactedCellsCount(); + + /** + * Get the total amount of data flushed to disk, in bytes. + */ + long getFlushedCellsSize(); + + /** + * Get the total amount of data processed during minor compactions, in bytes. + */ + long getCompactedCellsSize(); + + /** + * Get the total amount of data processed during major compactions, in bytes. + */ + long getMajorCompactedCellsSize(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index 1964b42b928..365d2b2b4ab 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -207,6 +207,18 @@ public class MetricsRegionServerSourceImpl BLOCK_CACHE_EXPRESS_HIT_PERCENT_DESC), rsWrap.getBlockCacheHitCachingPercent()) .addCounter(Interns.info(UPDATES_BLOCKED_TIME, UPDATES_BLOCKED_DESC), rsWrap.getUpdatesBlockedTime()) + .addCounter(Interns.info(FLUSHED_CELLS, FLUSHED_CELLS_DESC), + rsWrap.getFlushedCellsCount()) + .addCounter(Interns.info(COMPACTED_CELLS, COMPACTED_CELLS_DESC), + rsWrap.getCompactedCellsCount()) + .addCounter(Interns.info(MAJOR_COMPACTED_CELLS, MAJOR_COMPACTED_CELLS_DESC), + rsWrap.getMajorCompactedCellsCount()) + .addCounter(Interns.info(FLUSHED_CELLS_SIZE, FLUSHED_CELLS_SIZE_DESC), + rsWrap.getFlushedCellsSize()) + .addCounter(Interns.info(COMPACTED_CELLS_SIZE, COMPACTED_CELLS_SIZE_DESC), + rsWrap.getCompactedCellsSize()) + .addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC), + rsWrap.getMajorCompactedCellsSize()) .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), rsWrap.getZookeeperQuorum()) .tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName()) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 3f5729adda5..05a56c6bc36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -189,6 +189,13 @@ public class HStore implements Store { private Encryption.Context cryptoContext = Encryption.Context.NONE; + private volatile long flushedCellsCount = 0; + private volatile long compactedCellsCount = 0; + private volatile long majorCompactedCellsCount = 0; + private volatile long flushedCellsSize = 0; + private volatile long compactedCellsSize = 0; + private volatile long majorCompactedCellsSize = 0; + /** * Constructor * @param region @@ -1157,6 +1164,13 @@ public class HStore implements Store { sfs = moveCompatedFilesIntoPlace(cr, newFiles); writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); + if (cr.isMajor()) { + majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; + majorCompactedCellsSize += getCompactionProgress().totalCompactedSize; + } else { + compactedCellsCount += getCompactionProgress().totalCompactingKVs; + compactedCellsSize += getCompactionProgress().totalCompactedSize; + } // At this point the store will use new files for all new scanners. completeCompaction(filesToCompact, true); // Archive old files & update store size. } finally { @@ -2036,6 +2050,8 @@ public class HStore implements Store { private MemStoreSnapshot snapshot; private List tempFiles; private List committedFiles; + private long cacheFlushCount; + private long cacheFlushSize; private StoreFlusherImpl(long cacheFlushSeqNum) { this.cacheFlushSeqNum = cacheFlushSeqNum; @@ -2048,6 +2064,8 @@ public class HStore implements Store { @Override public void prepare() { this.snapshot = memstore.snapshot(); + this.cacheFlushCount = snapshot.getCellsCount(); + this.cacheFlushSize = snapshot.getSize(); committedFiles = new ArrayList(1); } @@ -2087,6 +2105,10 @@ public class HStore implements Store { } committedFiles.add(sf.getPath()); } + + HStore.this.flushedCellsCount += cacheFlushCount; + HStore.this.flushedCellsSize += cacheFlushSize; + // Add new file to store files. Clear snapshot too while we have the Store write lock. return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); } @@ -2108,7 +2130,7 @@ public class HStore implements Store { } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) + ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG) + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD @@ -2144,4 +2166,34 @@ public class HStore implements Store { public boolean hasTooManyStoreFiles() { return getStorefilesCount() > this.blockingFileCount; } + + @Override + public long getFlushedCellsCount() { + return flushedCellsCount; + } + + @Override + public long getFlushedCellsSize() { + return flushedCellsSize; + } + + @Override + public long getCompactedCellsCount() { + return compactedCellsCount; + } + + @Override + public long getCompactedCellsSize() { + return compactedCellsSize; + } + + @Override + public long getMajorCompactedCellsCount() { + return majorCompactedCellsCount; + } + + @Override + public long getMajorCompactedCellsSize() { + return majorCompactedCellsSize; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 36548351bb9..5da1ea1e751 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -66,6 +66,12 @@ class MetricsRegionServerWrapperImpl private volatile long numMutationsWithoutWAL = 0; private volatile long dataInMemoryWithoutWAL = 0; private volatile int percentFileLocal = 0; + private volatile long flushedCellsCount = 0; + private volatile long compactedCellsCount = 0; + private volatile long majorCompactedCellsCount = 0; + private volatile long flushedCellsSize = 0; + private volatile long compactedCellsSize = 0; + private volatile long majorCompactedCellsSize = 0; private CacheStats cacheStats; private ScheduledExecutorService executor; @@ -353,6 +359,35 @@ class MetricsRegionServerWrapperImpl return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().get(); } + @Override + public long getFlushedCellsCount() { + return flushedCellsCount; + } + + @Override + public long getCompactedCellsCount() { + return compactedCellsCount; + } + + @Override + public long getMajorCompactedCellsCount() { + return majorCompactedCellsCount; + } + + @Override + public long getFlushedCellsSize() { + return flushedCellsSize; + } + + @Override + public long getCompactedCellsSize() { + return compactedCellsSize; + } + + @Override + public long getMajorCompactedCellsSize() { + return majorCompactedCellsSize; + } /** * This is the runnable that will be executed on the executor every PERIOD number of seconds @@ -386,7 +421,12 @@ class MetricsRegionServerWrapperImpl long tempNumMutationsWithoutWAL = 0; long tempDataInMemoryWithoutWAL = 0; int tempPercentFileLocal = 0; - + long tempFlushedCellsCount = 0; + long tempCompactedCellsCount = 0; + long tempMajorCompactedCellsCount = 0; + long tempFlushedCellsSize = 0; + long tempCompactedCellsSize = 0; + long tempMajorCompactedCellsSize = 0; for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get(); @@ -403,6 +443,12 @@ class MetricsRegionServerWrapperImpl tempStorefileIndexSize += store.getStorefilesIndexSize(); tempTotalStaticBloomSize += store.getTotalStaticBloomSize(); tempTotalStaticIndexSize += store.getTotalStaticIndexSize(); + tempFlushedCellsCount += store.getFlushedCellsCount(); + tempCompactedCellsCount += store.getCompactedCellsCount(); + tempMajorCompactedCellsCount += store.getMajorCompactedCellsCount(); + tempFlushedCellsSize += store.getFlushedCellsSize(); + tempCompactedCellsSize += store.getCompactedCellsSize(); + tempMajorCompactedCellsSize += store.getMajorCompactedCellsSize(); } hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution()); @@ -459,6 +505,12 @@ class MetricsRegionServerWrapperImpl numMutationsWithoutWAL = tempNumMutationsWithoutWAL; dataInMemoryWithoutWAL = tempDataInMemoryWithoutWAL; percentFileLocal = tempPercentFileLocal; + flushedCellsCount = tempFlushedCellsCount; + compactedCellsCount = tempCompactedCellsCount; + majorCompactedCellsCount = tempMajorCompactedCellsCount; + flushedCellsSize = tempFlushedCellsSize; + compactedCellsSize = tempCompactedCellsSize; + majorCompactedCellsSize = tempMajorCompactedCellsSize; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 280a1b81733..d7829133216 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -343,6 +343,36 @@ public interface Store extends HeapSize, StoreConfigInformation { TableName getTableName(); + /** + * @return The number of cells flushed to disk + */ + long getFlushedCellsCount(); + + /** + * @return The total size of data flushed to disk, in bytes + */ + long getFlushedCellsSize(); + + /** + * @return The number of cells processed during minor compactions + */ + long getCompactedCellsCount(); + + /** + * @return The total amount of data processed during minor compactions, in bytes + */ + long getCompactedCellsSize(); + + /** + * @return The number of cells processed during major compactions + */ + long getMajorCompactedCellsCount(); + + /** + * @return The total amount of data processed during major compactions, in bytes + */ + long getMajorCompactedCellsSize(); + /* * @param o Observer who wants to know about changes in set of Readers */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java index 67eb6222730..d9d74ee097a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java @@ -37,6 +37,8 @@ public class CompactionProgress { public long totalCompactingKVs; /** the completed count of key values in currently running compaction */ public long currentCompactedKVs = 0; + /** the total size of data processed by the currently running compaction, in bytes */ + public long totalCompactedSize = 0; /** Constructor * @param totalCompactingKVs the total Key/Value pairs to be compacted @@ -66,4 +68,31 @@ public class CompactionProgress { public void complete() { this.totalCompactingKVs = this.currentCompactedKVs; } + + /** + * @return the total compacting key values in currently running compaction + */ + public long getTotalCompactingKvs() { + return totalCompactingKVs; + } + + /** + * @return the completed count of key values in currently running compaction + */ + public long getCurrentCompactedKvs() { + return currentCompactedKVs; + } + + /** + * @return the total data size processed by the currently running compaction, in bytes + */ + public long getTotalCompactedSize() { + return totalCompactedSize; + } + + @Override + public String toString() { + return String.format("%d/%d (%.2f%%)", currentCompactedKVs, totalCompactingKVs, + 100 * getProgressPct()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index a1d629a3f70..fdc38c54624 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -227,8 +227,13 @@ public abstract class Compactor { // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. List kvs = new ArrayList(); - // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME int closeCheckInterval = HStore.getCloseCheckInterval(); + long lastMillis; + if (LOG.isDebugEnabled()) { + lastMillis = System.currentTimeMillis(); + } else { + lastMillis = 0; + } boolean hasMore; do { hasMore = scanner.next(kvs, compactionKVMax); @@ -240,11 +245,22 @@ public abstract class Compactor { } writer.append(kv); ++progress.currentCompactedKVs; + progress.totalCompactedSize += kv.getLength(); // check periodically to see if a system stop is requested if (closeCheckInterval > 0) { bytesWritten += kv.getLength(); if (bytesWritten > closeCheckInterval) { + // Log the progress of long running compactions every minute if + // logging at DEBUG level + if (LOG.isDebugEnabled()) { + long now = System.currentTimeMillis(); + if ((now - lastMillis) >= 60 * 1000) { + LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec", + (bytesWritten / 1024.0) / ((now - lastMillis) / 1000.0))); + lastMillis = now; + } + } bytesWritten = 0; if (!store.areWritesEnabled()) { progress.cancel(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index 875fe3572e0..036af480493 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -211,4 +211,34 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe return 1024000; } + @Override + public long getFlushedCellsCount() { + return 100000000; + } + + @Override + public long getCompactedCellsCount() { + return 10000000; + } + + @Override + public long getMajorCompactedCellsCount() { + return 1000000; + } + + @Override + public long getFlushedCellsSize() { + return 1024000000; + } + + @Override + public long getCompactedCellsSize() { + return 102400000; + } + + @Override + public long getMajorCompactedCellsSize() { + return 10240000; + } + } \ No newline at end of file