From 5ccadb10ba8747eaab87b03bf97c47afedebbf4b Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sat, 27 Aug 2011 04:26:03 +0000 Subject: [PATCH] HBASE-3900 Expose progress of a major compaction in UI and/or in shell git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1162295 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../org/apache/hadoop/hbase/HServerLoad.java | 60 ++++++++++++++++++- .../hbase/regionserver/HRegionServer.java | 15 ++++- .../hadoop/hbase/regionserver/Store.java | 27 +++++++-- .../compactions/CompactionRequest.java | 2 +- .../hbase/regionserver/TestCompaction.java | 29 +++++++-- 6 files changed, 120 insertions(+), 15 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 689f1d34ab1..0814a09a7c9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -410,6 +410,8 @@ Release 0.91.0 - Unreleased HBASE-4241 Optimize flushing of the Memstore (Lars Hofhansl) HBASE-4248 Enhancements for Filter Language exposing HBase filters through the Thrift API (Anirudh Todi) + HBASE-3900 Expose progress of a major compaction in UI and/or in shell + (Brad Anderson) TASKS HBASE-3559 Move report of split to master OFF the heartbeat channel diff --git a/src/main/java/org/apache/hadoop/hbase/HServerLoad.java b/src/main/java/org/apache/hadoop/hbase/HServerLoad.java index d7820340db8..2fd2d0f20b8 100644 --- a/src/main/java/org/apache/hadoop/hbase/HServerLoad.java +++ b/src/main/java/org/apache/hadoop/hbase/HServerLoad.java @@ -97,6 +97,10 @@ implements WritableComparable { private int readRequestsCount; /** the current total write requests made to region */ private int writeRequestsCount; + /** the total compacting key values in currently running compaction */ + private long totalCompactingKVs; + /** the completed count of key values in currently running compaction */ + private long currentCompactedKVs; /** The current total size of root-level indexes for the region, in KB. */ private int rootIndexSizeKB; @@ -121,11 +125,14 @@ implements WritableComparable { * @param name * @param stores * @param storefiles + * @param storeUncompressedSizeMB * @param storefileSizeMB * @param memstoreSizeMB * @param storefileIndexSizeMB * @param readRequestsCount * @param writeRequestsCount + * @param totalCompactingKVs + * @param currentCompactedKVs */ public RegionLoad(final byte[] name, final int stores, final int storefiles, final int storeUncompressedSizeMB, @@ -133,7 +140,8 @@ implements WritableComparable { final int memstoreSizeMB, final int storefileIndexSizeMB, final int rootIndexSizeKB, final int totalStaticIndexSizeKB, final int totalStaticBloomSizeKB, - final int readRequestsCount, final int writeRequestsCount) { + final int readRequestsCount, final int writeRequestsCount, + final long totalCompactingKVs, final long currentCompactedKVs) { this.name = name; this.stores = stores; this.storefiles = storefiles; @@ -146,6 +154,8 @@ implements WritableComparable { this.totalStaticBloomSizeKB = totalStaticBloomSizeKB; this.readRequestsCount = readRequestsCount; this.writeRequestsCount = writeRequestsCount; + this.totalCompactingKVs = totalCompactingKVs; + this.currentCompactedKVs = currentCompactedKVs; } // Getters @@ -198,7 +208,7 @@ implements WritableComparable { public int getStorefileIndexSizeMB() { return storefileIndexSizeMB; } - + /** * @return the number of requests made to region */ @@ -220,6 +230,20 @@ implements WritableComparable { return writeRequestsCount; } + /** + * @return the total number of kvs in current compaction + */ + public long getTotalCompactingKVs() { + return totalCompactingKVs; + } + + /** + * @return the number of already compacted kvs in current compaction + */ + public long getCurrentCompactedKVs() { + return currentCompactedKVs; + } + // Setters /** @@ -272,6 +296,21 @@ implements WritableComparable { this.writeRequestsCount = requestsCount; } + /** + * @param totalCompactingKVs the number of kvs total in current compaction + */ + public void setTotalCompactingKVs(int totalCompactingKVs) { + this.totalCompactingKVs = totalCompactingKVs; + } + + /** + * @param currentCompactedKVs the number of kvs already compacted in + * current compaction + */ + public void setCurrentCompactedKVs(int currentCompactedKVs) { + this.currentCompactedKVs = currentCompactedKVs; + } + // Writable public void readFields(DataInput in) throws IOException { super.readFields(in); @@ -291,6 +330,8 @@ implements WritableComparable { this.rootIndexSizeKB = in.readInt(); this.totalStaticIndexSizeKB = in.readInt(); this.totalStaticBloomSizeKB = in.readInt(); + this.totalCompactingKVs = in.readInt(); + this.currentCompactedKVs = in.readInt(); } public void write(DataOutput out) throws IOException { @@ -309,6 +350,8 @@ implements WritableComparable { out.writeInt(rootIndexSizeKB); out.writeInt(totalStaticIndexSizeKB); out.writeInt(totalStaticBloomSizeKB); + out.writeLong(totalCompactingKVs); + out.writeLong(currentCompactedKVs); } /** @@ -327,7 +370,7 @@ implements WritableComparable { if (this.storeUncompressedSizeMB != 0) { sb = Strings.appendKeyValue(sb, "compressionRatio", String.format("%.4f", (float)this.storefileSizeMB/ - (float)this.storeUncompressedSizeMB)); + (float)this.storeUncompressedSizeMB)); } sb = Strings.appendKeyValue(sb, "memstoreSizeMB", Integer.valueOf(this.memstoreSizeMB)); @@ -343,6 +386,17 @@ implements WritableComparable { Integer.valueOf(this.totalStaticIndexSizeKB)); sb = Strings.appendKeyValue(sb, "totalStaticBloomSizeKB", Integer.valueOf(this.totalStaticBloomSizeKB)); + sb = Strings.appendKeyValue(sb, "totalCompactingKVs", + Long.valueOf(this.totalCompactingKVs)); + sb = Strings.appendKeyValue(sb, "currentCompactedKVs", + Long.valueOf(this.currentCompactedKVs)); + float compactionProgressPct = Float.NaN; + if( this.totalCompactingKVs > 0 ) { + compactionProgressPct = Float.valueOf( + this.currentCompactedKVs / this.totalCompactingKVs); + } + sb = Strings.appendKeyValue(sb, "compactionProgressPct", + compactionProgressPct); return sb.toString(); } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2990fa95873..d86d15b4cac 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.ipc.Invocation; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler; @@ -631,9 +632,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, closeUserRegions(this.abortRequested); } else if (this.stopping) { LOG.info("Stopping meta regions, if the HRegionServer hosts any"); - boolean allUserRegionsOffline = areAllUserRegionsOffline(); - if (allUserRegionsOffline) { // Set stopped if no requests since last time we went around the loop. // The remaining meta regions will be closed on our way out. @@ -926,6 +925,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, int rootIndexSizeKB = 0; int totalStaticIndexSizeKB = 0; int totalStaticBloomSizeKB = 0; + long totalCompactingKVs = 0; + long currentCompactedKVs = 0; synchronized (r.stores) { stores += r.stores.size(); for (Store store : r.stores.values()) { @@ -934,6 +935,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, / 1024 / 1024); storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024); + CompactionProgress progress = store.getCompactionProgress(); + if (progress != null) { + totalCompactingKVs += progress.totalCompactingKVs; + currentCompactedKVs += progress.currentCompactedKVs; + } rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024); @@ -949,7 +955,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, storeUncompressedSizeMB, storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB, totalStaticIndexSizeKB, totalStaticBloomSizeKB, - (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get()); + (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(), + totalCompactingKVs, currentCompactedKVs); } /** @@ -2531,6 +2538,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, int storefileSizeMB = 0; int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024); int storefileIndexSizeMB = 0; + long totalCompactingKVs = 0; + long currentCompactedKVs = 0; synchronized (r.stores) { stores += r.stores.size(); for (Store store : r.stores.values()) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 778a328455d..cfd402ba252 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -37,14 +37,20 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -112,6 +118,7 @@ public class Store implements HeapSize { final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final String storeNameStr; private final boolean inMemory; + private CompactionProgress progress; /* * List of store files inside this store. This is an immutable list that @@ -766,7 +773,7 @@ public class Store implements HeapSize { * @param dir * @throws IOException */ - public static long getLowestTimestamp(final List candidates) + public static long getLowestTimestamp(final List candidates) throws IOException { long minTs = Long.MAX_VALUE; for (StoreFile storeFile : candidates) { @@ -775,6 +782,13 @@ public class Store implements HeapSize { return minTs; } + /** getter for CompactionProgress object + * @return CompactionProgress object + */ + public CompactionProgress getCompactionProgress() { + return this.progress; + } + /* * @return True if we should run a major compaction. */ @@ -830,7 +844,7 @@ public class Store implements HeapSize { } } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) { LOG.debug("Major compaction triggered on store " + this.storeNameStr + - ", because keyvalues outdated; time since last major compaction " + + ", because keyvalues outdated; time since last major compaction " + (now - lowTimestamp) + "ms"); result = true; } @@ -1090,6 +1104,9 @@ public class Store implements HeapSize { } } + // keep track of compaction progress + progress = new CompactionProgress(maxKeyCount); + // For each file, obtain a scanner: List scanners = StoreFileScanner .getScannersForStoreFiles(filesToCompact, false, false); @@ -1117,6 +1134,8 @@ public class Store implements HeapSize { // output to writer: for (KeyValue kv : kvs) { writer.append(kv); + // update progress per key + ++progress.currentCompactedKVs; // check periodically to see if a system stop is requested if (Store.closeCheckInterval > 0) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index 6c8a185f81a..e287960a185 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -77,7 +77,7 @@ public class CompactionRequest implements Comparable, /** * This function will define where in the priority queue the request will * end up. Those with the highest priorities will be first. When the - * priorities are the same it will It will first compare priority then date + * priorities are the same it will first compare priority then date * to maintain a FIFO functionality. * *

Note: The date is only accurate to the millisecond which means it is diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 04a2d13d2e3..27ed6bf2490 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -19,9 +19,15 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,20 +37,18 @@ import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.MiniDFSCluster; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; /** @@ -161,9 +165,26 @@ public class TestCompaction extends HBaseTestCase { Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null); assertEquals(compactionThreshold, result.size()); + // see if CompactionProgress is in place but null + for (Store store: this.r.stores.values()) { + assertNull(store.getCompactionProgress()); + } + r.flushcache(); r.compactStores(true); + // see if CompactionProgress has done its thing on at least one store + int storeCount = 0; + for (Store store: this.r.stores.values()) { + CompactionProgress progress = store.getCompactionProgress(); + if( progress != null ) { + ++storeCount; + assert(progress.currentCompactedKVs > 0); + assert(progress.totalCompactingKVs > 0); + } + assert(storeCount > 0); + } + // look at the second row // Increment the least significant character so we get to next row. byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);