HBASE-12448 Fix rate reporting in compaction progress DEBUG logging

This commit is contained in:
Andrew Purtell 2014-11-07 18:36:44 -08:00
parent 0145650cb0
commit b1f7d7cd32
1 changed files with 26 additions and 19 deletions

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
@ -228,43 +229,39 @@ public abstract class Compactor {
*/ */
protected boolean performCompaction(InternalScanner scanner, protected boolean performCompaction(InternalScanner scanner,
CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException { CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
int bytesWritten = 0; long bytesWritten = 0;
long bytesWrittenProgress = 0;
// Since scanner.next() can return 'false' but still be delivering data, // Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop. // we have to use a do/while loop.
List<Cell> cells = new ArrayList<Cell>(); List<Cell> cells = new ArrayList<Cell>();
int closeCheckInterval = HStore.getCloseCheckInterval(); long closeCheckInterval = HStore.getCloseCheckInterval();
long lastMillis; long lastMillis = 0;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
lastMillis = System.currentTimeMillis(); lastMillis = EnvironmentEdgeManager.currentTime();
} else {
lastMillis = 0;
} }
long now = 0;
boolean hasMore; boolean hasMore;
do { do {
hasMore = scanner.next(cells, compactionKVMax); hasMore = scanner.next(cells, compactionKVMax);
if (LOG.isDebugEnabled()) {
now = EnvironmentEdgeManager.currentTime();
}
// output to writer: // output to writer:
for (Cell c : cells) { for (Cell c : cells) {
if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
CellUtil.setSequenceId(c, 0); CellUtil.setSequenceId(c, 0);
} }
writer.append(c); writer.append(c);
int len = KeyValueUtil.length(c);
++progress.currentCompactedKVs; ++progress.currentCompactedKVs;
progress.totalCompactedSize += KeyValueUtil.length(c); progress.totalCompactedSize += len;
if (LOG.isDebugEnabled()) {
bytesWrittenProgress += len;
}
// check periodically to see if a system stop is requested // check periodically to see if a system stop is requested
if (closeCheckInterval > 0) { if (closeCheckInterval > 0) {
bytesWritten += KeyValueUtil.length(c); bytesWritten += len;
if (bytesWritten > closeCheckInterval) { if (bytesWritten > closeCheckInterval) {
// Log the progress of long running compactions every minute if
// logging at DEBUG level
if (LOG.isDebugEnabled()) {
long now = System.currentTimeMillis();
if ((now - lastMillis) >= 60 * 1000) {
LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec",
(bytesWritten / 1024.0) / ((now - lastMillis) / 1000.0)));
lastMillis = now;
}
}
bytesWritten = 0; bytesWritten = 0;
if (!store.areWritesEnabled()) { if (!store.areWritesEnabled()) {
progress.cancel(); progress.cancel();
@ -273,6 +270,16 @@ public abstract class Compactor {
} }
} }
} }
// Log the progress of long running compactions every minute if
// logging at DEBUG level
if (LOG.isDebugEnabled()) {
if ((now - lastMillis) >= 60 * 1000) {
LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec",
(bytesWrittenProgress / 1024.0) / ((now - lastMillis) / 1000.0)));
lastMillis = now;
bytesWrittenProgress = 0;
}
}
cells.clear(); cells.clear();
} while (hasMore); } while (hasMore);
progress.complete(); progress.complete();