HBASE-3900 Expose progress of a major compaction in UI and/or in shell
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1162295 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2d25412df7
commit
5ccadb10ba
|
@ -410,6 +410,8 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-4241 Optimize flushing of the Memstore (Lars Hofhansl)
|
||||
HBASE-4248 Enhancements for Filter Language exposing HBase filters through
|
||||
the Thrift API (Anirudh Todi)
|
||||
HBASE-3900 Expose progress of a major compaction in UI and/or in shell
|
||||
(Brad Anderson)
|
||||
|
||||
TASKS
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
|
@ -97,6 +97,10 @@ implements WritableComparable<HServerLoad> {
|
|||
private int readRequestsCount;
|
||||
/** the current total write requests made to region */
|
||||
private int writeRequestsCount;
|
||||
/** the total compacting key values in currently running compaction */
|
||||
private long totalCompactingKVs;
|
||||
/** the completed count of key values in currently running compaction */
|
||||
private long currentCompactedKVs;
|
||||
|
||||
/** The current total size of root-level indexes for the region, in KB. */
|
||||
private int rootIndexSizeKB;
|
||||
|
@ -121,11 +125,14 @@ implements WritableComparable<HServerLoad> {
|
|||
* @param name
|
||||
* @param stores
|
||||
* @param storefiles
|
||||
* @param storeUncompressedSizeMB
|
||||
* @param storefileSizeMB
|
||||
* @param memstoreSizeMB
|
||||
* @param storefileIndexSizeMB
|
||||
* @param readRequestsCount
|
||||
* @param writeRequestsCount
|
||||
* @param totalCompactingKVs
|
||||
* @param currentCompactedKVs
|
||||
*/
|
||||
public RegionLoad(final byte[] name, final int stores,
|
||||
final int storefiles, final int storeUncompressedSizeMB,
|
||||
|
@ -133,7 +140,8 @@ implements WritableComparable<HServerLoad> {
|
|||
final int memstoreSizeMB, final int storefileIndexSizeMB,
|
||||
final int rootIndexSizeKB, final int totalStaticIndexSizeKB,
|
||||
final int totalStaticBloomSizeKB,
|
||||
final int readRequestsCount, final int writeRequestsCount) {
|
||||
final int readRequestsCount, final int writeRequestsCount,
|
||||
final long totalCompactingKVs, final long currentCompactedKVs) {
|
||||
this.name = name;
|
||||
this.stores = stores;
|
||||
this.storefiles = storefiles;
|
||||
|
@ -146,6 +154,8 @@ implements WritableComparable<HServerLoad> {
|
|||
this.totalStaticBloomSizeKB = totalStaticBloomSizeKB;
|
||||
this.readRequestsCount = readRequestsCount;
|
||||
this.writeRequestsCount = writeRequestsCount;
|
||||
this.totalCompactingKVs = totalCompactingKVs;
|
||||
this.currentCompactedKVs = currentCompactedKVs;
|
||||
}
|
||||
|
||||
// Getters
|
||||
|
@ -198,7 +208,7 @@ implements WritableComparable<HServerLoad> {
|
|||
public int getStorefileIndexSizeMB() {
|
||||
return storefileIndexSizeMB;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the number of requests made to region
|
||||
*/
|
||||
|
@ -220,6 +230,20 @@ implements WritableComparable<HServerLoad> {
|
|||
return writeRequestsCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the total number of kvs in current compaction
|
||||
*/
|
||||
public long getTotalCompactingKVs() {
|
||||
return totalCompactingKVs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of already compacted kvs in current compaction
|
||||
*/
|
||||
public long getCurrentCompactedKVs() {
|
||||
return currentCompactedKVs;
|
||||
}
|
||||
|
||||
// Setters
|
||||
|
||||
/**
|
||||
|
@ -272,6 +296,21 @@ implements WritableComparable<HServerLoad> {
|
|||
this.writeRequestsCount = requestsCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param totalCompactingKVs the number of kvs total in current compaction
|
||||
*/
|
||||
public void setTotalCompactingKVs(int totalCompactingKVs) {
|
||||
this.totalCompactingKVs = totalCompactingKVs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param currentCompactedKVs the number of kvs already compacted in
|
||||
* current compaction
|
||||
*/
|
||||
public void setCurrentCompactedKVs(int currentCompactedKVs) {
|
||||
this.currentCompactedKVs = currentCompactedKVs;
|
||||
}
|
||||
|
||||
// Writable
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
super.readFields(in);
|
||||
|
@ -291,6 +330,8 @@ implements WritableComparable<HServerLoad> {
|
|||
this.rootIndexSizeKB = in.readInt();
|
||||
this.totalStaticIndexSizeKB = in.readInt();
|
||||
this.totalStaticBloomSizeKB = in.readInt();
|
||||
this.totalCompactingKVs = in.readInt();
|
||||
this.currentCompactedKVs = in.readInt();
|
||||
}
|
||||
|
||||
public void write(DataOutput out) throws IOException {
|
||||
|
@ -309,6 +350,8 @@ implements WritableComparable<HServerLoad> {
|
|||
out.writeInt(rootIndexSizeKB);
|
||||
out.writeInt(totalStaticIndexSizeKB);
|
||||
out.writeInt(totalStaticBloomSizeKB);
|
||||
out.writeLong(totalCompactingKVs);
|
||||
out.writeLong(currentCompactedKVs);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -327,7 +370,7 @@ implements WritableComparable<HServerLoad> {
|
|||
if (this.storeUncompressedSizeMB != 0) {
|
||||
sb = Strings.appendKeyValue(sb, "compressionRatio",
|
||||
String.format("%.4f", (float)this.storefileSizeMB/
|
||||
(float)this.storeUncompressedSizeMB));
|
||||
(float)this.storeUncompressedSizeMB));
|
||||
}
|
||||
sb = Strings.appendKeyValue(sb, "memstoreSizeMB",
|
||||
Integer.valueOf(this.memstoreSizeMB));
|
||||
|
@ -343,6 +386,17 @@ implements WritableComparable<HServerLoad> {
|
|||
Integer.valueOf(this.totalStaticIndexSizeKB));
|
||||
sb = Strings.appendKeyValue(sb, "totalStaticBloomSizeKB",
|
||||
Integer.valueOf(this.totalStaticBloomSizeKB));
|
||||
sb = Strings.appendKeyValue(sb, "totalCompactingKVs",
|
||||
Long.valueOf(this.totalCompactingKVs));
|
||||
sb = Strings.appendKeyValue(sb, "currentCompactedKVs",
|
||||
Long.valueOf(this.currentCompactedKVs));
|
||||
float compactionProgressPct = Float.NaN;
|
||||
if( this.totalCompactingKVs > 0 ) {
|
||||
compactionProgressPct = Float.valueOf(
|
||||
this.currentCompactedKVs / this.totalCompactingKVs);
|
||||
}
|
||||
sb = Strings.appendKeyValue(sb, "compactionProgressPct",
|
||||
compactionProgressPct);
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.ipc.Invocation;
|
|||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
|
||||
|
@ -631,9 +632,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
closeUserRegions(this.abortRequested);
|
||||
} else if (this.stopping) {
|
||||
LOG.info("Stopping meta regions, if the HRegionServer hosts any");
|
||||
|
||||
boolean allUserRegionsOffline = areAllUserRegionsOffline();
|
||||
|
||||
if (allUserRegionsOffline) {
|
||||
// Set stopped if no requests since last time we went around the loop.
|
||||
// The remaining meta regions will be closed on our way out.
|
||||
|
@ -926,6 +925,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
int rootIndexSizeKB = 0;
|
||||
int totalStaticIndexSizeKB = 0;
|
||||
int totalStaticBloomSizeKB = 0;
|
||||
long totalCompactingKVs = 0;
|
||||
long currentCompactedKVs = 0;
|
||||
synchronized (r.stores) {
|
||||
stores += r.stores.size();
|
||||
for (Store store : r.stores.values()) {
|
||||
|
@ -934,6 +935,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
/ 1024 / 1024);
|
||||
storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
|
||||
storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
|
||||
CompactionProgress progress = store.getCompactionProgress();
|
||||
if (progress != null) {
|
||||
totalCompactingKVs += progress.totalCompactingKVs;
|
||||
currentCompactedKVs += progress.currentCompactedKVs;
|
||||
}
|
||||
|
||||
rootIndexSizeKB +=
|
||||
(int) (store.getStorefilesIndexSize() / 1024);
|
||||
|
@ -949,7 +955,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
storeUncompressedSizeMB,
|
||||
storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
|
||||
totalStaticIndexSizeKB, totalStaticBloomSizeKB,
|
||||
(int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get());
|
||||
(int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(),
|
||||
totalCompactingKVs, currentCompactedKVs);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2531,6 +2538,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
int storefileSizeMB = 0;
|
||||
int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
|
||||
int storefileIndexSizeMB = 0;
|
||||
long totalCompactingKVs = 0;
|
||||
long currentCompactedKVs = 0;
|
||||
synchronized (r.stores) {
|
||||
stores += r.stores.size();
|
||||
for (Store store : r.stores.values()) {
|
||||
|
|
|
@ -37,14 +37,20 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -112,6 +118,7 @@ public class Store implements HeapSize {
|
|||
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final String storeNameStr;
|
||||
private final boolean inMemory;
|
||||
private CompactionProgress progress;
|
||||
|
||||
/*
|
||||
* List of store files inside this store. This is an immutable list that
|
||||
|
@ -766,7 +773,7 @@ public class Store implements HeapSize {
|
|||
* @param dir
|
||||
* @throws IOException
|
||||
*/
|
||||
public static long getLowestTimestamp(final List<StoreFile> candidates)
|
||||
public static long getLowestTimestamp(final List<StoreFile> candidates)
|
||||
throws IOException {
|
||||
long minTs = Long.MAX_VALUE;
|
||||
for (StoreFile storeFile : candidates) {
|
||||
|
@ -775,6 +782,13 @@ public class Store implements HeapSize {
|
|||
return minTs;
|
||||
}
|
||||
|
||||
/** getter for CompactionProgress object
|
||||
* @return CompactionProgress object
|
||||
*/
|
||||
public CompactionProgress getCompactionProgress() {
|
||||
return this.progress;
|
||||
}
|
||||
|
||||
/*
|
||||
* @return True if we should run a major compaction.
|
||||
*/
|
||||
|
@ -830,7 +844,7 @@ public class Store implements HeapSize {
|
|||
}
|
||||
} else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
|
||||
LOG.debug("Major compaction triggered on store " + this.storeNameStr +
|
||||
", because keyvalues outdated; time since last major compaction " +
|
||||
", because keyvalues outdated; time since last major compaction " +
|
||||
(now - lowTimestamp) + "ms");
|
||||
result = true;
|
||||
}
|
||||
|
@ -1090,6 +1104,9 @@ public class Store implements HeapSize {
|
|||
}
|
||||
}
|
||||
|
||||
// keep track of compaction progress
|
||||
progress = new CompactionProgress(maxKeyCount);
|
||||
|
||||
// For each file, obtain a scanner:
|
||||
List<StoreFileScanner> scanners = StoreFileScanner
|
||||
.getScannersForStoreFiles(filesToCompact, false, false);
|
||||
|
@ -1117,6 +1134,8 @@ public class Store implements HeapSize {
|
|||
// output to writer:
|
||||
for (KeyValue kv : kvs) {
|
||||
writer.append(kv);
|
||||
// update progress per key
|
||||
++progress.currentCompactedKVs;
|
||||
|
||||
// check periodically to see if a system stop is requested
|
||||
if (Store.closeCheckInterval > 0) {
|
||||
|
|
|
@ -77,7 +77,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
|
|||
/**
|
||||
* This function will define where in the priority queue the request will
|
||||
* end up. Those with the highest priorities will be first. When the
|
||||
* priorities are the same it will It will first compare priority then date
|
||||
* priorities are the same it will first compare priority then date
|
||||
* to maintain a FIFO functionality.
|
||||
*
|
||||
* <p>Note: The date is only accurate to the millisecond which means it is
|
||||
|
|
|
@ -19,9 +19,15 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -31,20 +37,18 @@ import org.apache.hadoop.hbase.HBaseTestCase;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -161,9 +165,26 @@ public class TestCompaction extends HBaseTestCase {
|
|||
Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
|
||||
assertEquals(compactionThreshold, result.size());
|
||||
|
||||
// see if CompactionProgress is in place but null
|
||||
for (Store store: this.r.stores.values()) {
|
||||
assertNull(store.getCompactionProgress());
|
||||
}
|
||||
|
||||
r.flushcache();
|
||||
r.compactStores(true);
|
||||
|
||||
// see if CompactionProgress has done its thing on at least one store
|
||||
int storeCount = 0;
|
||||
for (Store store: this.r.stores.values()) {
|
||||
CompactionProgress progress = store.getCompactionProgress();
|
||||
if( progress != null ) {
|
||||
++storeCount;
|
||||
assert(progress.currentCompactedKVs > 0);
|
||||
assert(progress.totalCompactingKVs > 0);
|
||||
}
|
||||
assert(storeCount > 0);
|
||||
}
|
||||
|
||||
// look at the second row
|
||||
// Increment the least significant character so we get to next row.
|
||||
byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
|
||||
|
|
Loading…
Reference in New Issue