diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 11cc5ff5001..a2a346f3269 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -152,7 +152,7 @@ public class CompactSplitThread implements CompactionRequestor { queueLists.append(" "+it.next().toString()); queueLists.append("\n"); } - + if( smallCompactions != null ){ queueLists.append("\n"); queueLists.append(" SmallCompation Queue:\n"); @@ -248,14 +248,21 @@ public class CompactSplitThread implements CompactionRequestor { @Override public synchronized List requestCompaction(final HRegion r, final String why, int p, List> requests) throws IOException { + return requestCompactionInternal(r, why, p, requests, true); + } + + private List requestCompactionInternal(final HRegion r, final String why, + int p, List> requests, boolean selectNow) throws IOException { // not a special compaction request, so make our own list - List ret; + List ret = null; if (requests == null) { - ret = new ArrayList(r.getStores().size()); + ret = selectNow ? new ArrayList(r.getStores().size()) : null; for (Store s : r.getStores().values()) { - ret.add(requestCompaction(r, s, why, p, null)); + CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow); + if (selectNow) ret.add(cr); } } else { + Preconditions.checkArgument(selectNow); // only system requests have selectNow == false ret = new ArrayList(requests.size()); for (Pair pair : requests) { ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst())); @@ -264,6 +271,21 @@ public class CompactSplitThread implements CompactionRequestor { return ret; } + public CompactionRequest requestCompaction(final HRegion r, final Store s, + final String why, int priority, CompactionRequest request) throws IOException { + return requestCompactionInternal(r, s, why, priority, request, true); + } + + public synchronized void requestSystemCompaction( + final HRegion r, final String why) throws IOException { + requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false); + } + + public void requestSystemCompaction( + final HRegion r, final Store s, final String why) throws IOException { + requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false); + } + /** * @param r HRegion store belongs to * @param s Store to request compaction on @@ -272,34 +294,48 @@ public class CompactSplitThread implements CompactionRequestor { * @param request custom compaction request. Can be null in which case a simple * compaction will be used. */ - public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s, - final String why, int priority, CompactionRequest request) throws IOException { + private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s, + final String why, int priority, CompactionRequest request, boolean selectNow) + throws IOException { if (this.server.isStopped()) { return null; } + + CompactionContext compaction = null; + if (selectNow) { + compaction = selectCompaction(r, s, priority, request); + if (compaction == null) return null; // message logged inside + } + + // We assume that most compactions are small. So, put system compactions into small + // pool; we will do selection there, and move to large pool if necessary. + long size = selectNow ? compaction.getRequest().getSize() : 0; + ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size)) + ? largeCompactions : smallCompactions; + pool.execute(new CompactionRunner(s, r, compaction, pool)); + if (LOG.isDebugEnabled()) { + String type = (pool == smallCompactions) ? "Small " : "Large "; + LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") + + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); + } + return selectNow ? compaction.getRequest() : null; + } + + private CompactionContext selectCompaction(final HRegion r, final Store s, + int priority, CompactionRequest request) throws IOException { CompactionContext compaction = s.requestCompaction(priority, request); if (compaction == null) { if(LOG.isDebugEnabled()) { - LOG.debug("Not compacting " + r.getRegionNameAsString() + + LOG.debug("Not compacting " + r.getRegionNameAsString() + " because compaction request was cancelled"); } return null; } - assert compaction.hasSelection(); if (priority != Store.NO_PRIORITY) { compaction.getRequest().setPriority(priority); } - ThreadPoolExecutor pool = s.throttleCompaction(compaction.getRequest().getSize()) - ? largeCompactions : smallCompactions; - pool.execute(new CompactionRunner(s, r, compaction)); - if (LOG.isDebugEnabled()) { - String type = (pool == smallCompactions) ? "Small " : "Large "; - LOG.debug(type + "Compaction requested: " + compaction - + (why != null && !why.isEmpty() ? "; Because: " + why : "") - + "; " + this); - } - return compaction.getRequest(); + return compaction; } /** @@ -358,18 +394,25 @@ public class CompactSplitThread implements CompactionRequestor { private class CompactionRunner implements Runnable, Comparable { private final Store store; private final HRegion region; - private final CompactionContext compaction; + private CompactionContext compaction; + private int queuedPriority; + private ThreadPoolExecutor parent; - public CompactionRunner(Store store, HRegion region, CompactionContext compaction) { + public CompactionRunner(Store store, HRegion region, + CompactionContext compaction, ThreadPoolExecutor parent) { super(); this.store = store; this.region = region; this.compaction = compaction; + this.queuedPriority = (this.compaction == null) + ? store.getCompactPriority() : compaction.getRequest().getPriority(); + this.parent = parent; } - + @Override public String toString() { - return "Request = " + compaction.getRequest(); + return (this.compaction != null) ? ("Request = " + compaction.getRequest()) + : ("Store = " + store.toString() + ", pri = " + queuedPriority); } @Override @@ -378,6 +421,40 @@ public class CompactSplitThread implements CompactionRequestor { if (server.isStopped()) { return; } + // Common case - system compaction without a file selection. Select now. + if (this.compaction == null) { + int oldPriority = this.queuedPriority; + this.queuedPriority = this.store.getCompactPriority(); + if (this.queuedPriority > oldPriority) { + // Store priority decreased while we were in queue (due to some other compaction?), + // requeue with new priority to avoid blocking potential higher priorities. + this.parent.execute(this); + return; + } + try { + this.compaction = selectCompaction(this.region, this.store, queuedPriority, null); + } catch (IOException ex) { + LOG.error("Compaction selection failed " + this, ex); + server.checkFileSystem(); + return; + } + if (this.compaction == null) return; // nothing to do + // Now see if we are in correct pool for the size; if not, go to the correct one. + // We might end up waiting for a while, so cancel the selection. + assert this.compaction.hasSelection(); + ThreadPoolExecutor pool = store.throttleCompaction( + compaction.getRequest().getSize()) ? largeCompactions : smallCompactions; + if (this.parent != pool) { + this.store.cancelRequestedCompaction(this.compaction); + this.compaction = null; + this.parent = pool; + this.parent.execute(this); + return; + } + } + // Finally we can compact something. + assert this.compaction != null; + this.compaction.getRequest().beforeExecute(); try { // Note: please don't put single-compaction logic here; @@ -390,7 +467,7 @@ public class CompactSplitThread implements CompactionRequestor { if (completed) { // degenerate case: blocked regions require recursive enqueues if (store.getCompactPriority() <= 0) { - requestCompaction(region, store, "Recursive enqueue", null); + requestSystemCompaction(region, store, "Recursive enqueue"); } else { // see if the compaction has caused us to exceed max region size requestSplit(region); @@ -422,8 +499,13 @@ public class CompactSplitThread implements CompactionRequestor { @Override public int compareTo(CompactionRunner o) { - // Only compare the underlying request, for queue sorting purposes. - return this.compaction.getRequest().compareTo(o.compaction.getRequest()); + // Only compare the underlying request (if any), for queue sorting purposes. + int compareVal = queuedPriority - o.queuedPriority; // compare priority + if (compareVal != 0) return compareVal; + CompactionContext tc = this.compaction, oc = o.compaction; + // Sort pre-selected (user?) compactions before system ones with equal priority. + return (tc == null) ? ((oc == null) ? 0 : 1) + : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest())); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 0f4df9b1a7d..f1f09d1e600 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1330,8 +1330,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (iteration % multiplier != 0) continue; if (s.needsCompaction()) { // Queue a compaction. Will recognize if major is needed. - this.instance.compactSplitThread.requestCompaction(r, s, getName() - + " requests compaction", null); + this.instance.compactSplitThread.requestSystemCompaction(r, s, getName() + + " requests compaction"); } else if (s.isMajorCompaction()) { if (majorCompactPriority == DEFAULT_PRIORITY || majorCompactPriority > r.getCompactPriority()) { @@ -1690,7 +1690,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // Do checks to see if we need to compact (references or too many files) for (Store s : r.getStores().values()) { if (s.hasReferences() || s.needsCompaction()) { - getCompactionRequester().requestCompaction(r, s, "Opening Region", null); + this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); } } long openSeqNum = r.getOpenSeqNum(); @@ -3576,7 +3576,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (shouldFlush) { boolean result = region.flushcache(); if (result) { - this.compactSplitThread.requestCompaction(region, + this.compactSplitThread.requestSystemCompaction(region, "Compaction through user triggered flush"); } builder.setFlushed(result); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index d54fbac2ad5..e655e3520b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -408,8 +408,8 @@ class MemStoreFlusher implements FlushRequester { "store files; delaying flush up to " + this.blockingWaitTime + "ms"); if (!this.server.compactSplitThread.requestSplit(region)) { try { - this.server.compactSplitThread.requestCompaction(region, Thread - .currentThread().getName()); + this.server.compactSplitThread.requestSystemCompaction( + region, Thread.currentThread().getName()); } catch (IOException e) { LOG.error( "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()), @@ -457,7 +457,8 @@ class MemStoreFlusher implements FlushRequester { if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { - server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName()); + server.compactSplitThread.requestSystemCompaction( + region, Thread.currentThread().getName()); } } catch (DroppedSnapshotException ex) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java new file mode 100644 index 00000000000..dd9c037f21f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * This class is a helper that allows to create a partially-implemented, stateful mocks of + * Store. It contains a bunch of blank methods, and answers redirecting to these. + */ +public class StatefulStoreMockMaker { + // Add and expand the methods and answers as needed. + public CompactionContext selectCompaction() { return null; } + public void cancelCompaction(Object originalContext) {} + public int getPriority() { return 0; } + + private class SelectAnswer implements Answer { + public CompactionContext answer(InvocationOnMock invocation) throws Throwable { + return selectCompaction(); + } + } + private class PriorityAnswer implements Answer { + public Integer answer(InvocationOnMock invocation) throws Throwable { + return getPriority(); + } + } + private class CancelAnswer implements Answer { + public CompactionContext answer(InvocationOnMock invocation) throws Throwable { + cancelCompaction(invocation.getArguments()[0]); return null; + } + } + + public Store createStoreMock(String name) throws Exception { + Store store = mock(Store.class, name); + when(store.requestCompaction( + anyInt(), isNull(CompactionRequest.class))).then(new SelectAnswer()); + when(store.getCompactPriority()).then(new PriorityAnswer()); + doAnswer(new CancelAnswer()).when( + store).cancelRequestedCompaction(any(CompactionContext.class)); + return store; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 3d5040c1d34..c736378c57b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -18,8 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; +import static org.mockito.AdditionalMatchers.aryEq; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.io.IOException; import java.util.ArrayList; @@ -33,10 +34,12 @@ import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -53,6 +56,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; @@ -60,6 +64,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPoli import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Assume; import org.junit.experimental.categories.Category; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -730,6 +736,227 @@ public class TestCompaction extends HBaseTestCase { thread.interruptIfNecessary(); } + private class StoreMockMaker extends StatefulStoreMockMaker { + public ArrayList compacting = new ArrayList(); + public ArrayList notCompacting = new ArrayList(); + private ArrayList results; + + public StoreMockMaker(ArrayList results) { + this.results = results; + } + + public class TestCompactionContext extends CompactionContext { + private List selectedFiles; + public TestCompactionContext(List selectedFiles) { + super(); + this.selectedFiles = selectedFiles; + } + + @Override + public List preSelect(List filesCompacting) { + return new ArrayList(); + } + + @Override + public boolean select(List filesCompacting, boolean isUserCompaction, + boolean mayUseOffPeak, boolean forceMajor) throws IOException { + this.request = new CompactionRequest(selectedFiles); + this.request.setPriority(getPriority()); + return true; + } + + @Override + public List compact() throws IOException { + finishCompaction(this.selectedFiles); + return new ArrayList(); + } + } + + @Override + public synchronized CompactionContext selectCompaction() { + CompactionContext ctx = new TestCompactionContext(new ArrayList(notCompacting)); + compacting.addAll(notCompacting); + notCompacting.clear(); + try { + ctx.select(null, false, false, false); + } catch (IOException ex) { + fail("Shouldn't happen"); + } + return ctx; + } + + @Override + public synchronized void cancelCompaction(Object object) { + TestCompactionContext ctx = (TestCompactionContext)object; + compacting.removeAll(ctx.selectedFiles); + notCompacting.addAll(ctx.selectedFiles); + } + + public synchronized void finishCompaction(List sfs) { + if (sfs.isEmpty()) return; + synchronized (results) { + results.add(sfs.size()); + } + compacting.removeAll(sfs); + } + + @Override + public int getPriority() { + return 7 - compacting.size() - notCompacting.size(); + } + } + + public class BlockingStoreMockMaker extends StatefulStoreMockMaker { + BlockingCompactionContext blocked = null; + + public class BlockingCompactionContext extends CompactionContext { + public volatile boolean isInCompact = false; + + public void unblock() { + synchronized (this) { this.notifyAll(); } + } + + @Override + public List compact() throws IOException { + try { + isInCompact = true; + synchronized (this) { this.wait(); } + } catch (InterruptedException e) { + Assume.assumeNoException(e); + } + return new ArrayList(); + } + + @Override + public List preSelect(List filesCompacting) { + return new ArrayList(); + } + + @Override + public boolean select(List f, boolean i, boolean m, boolean e) + throws IOException { + this.request = new CompactionRequest(new ArrayList()); + return true; + } + } + + @Override + public CompactionContext selectCompaction() { + this.blocked = new BlockingCompactionContext(); + try { + this.blocked.select(null, false, false, false); + } catch (IOException ex) { + fail("Shouldn't happen"); + } + return this.blocked; + } + + @Override + public void cancelCompaction(Object object) {} + + public int getPriority() { + return Integer.MIN_VALUE; // some invalid value, see createStoreMock + } + + public BlockingCompactionContext waitForBlocking() { + while (this.blocked == null || !this.blocked.isInCompact) { + Threads.sleepWithoutInterrupt(50); + } + BlockingCompactionContext ctx = this.blocked; + this.blocked = null; + return ctx; + } + + @Override + public Store createStoreMock(String name) throws Exception { + return createStoreMock(Integer.MIN_VALUE, name); + } + + public Store createStoreMock(int priority, String name) throws Exception { + // Override the mock to always return the specified priority. + Store s = super.createStoreMock(name); + when(s.getCompactPriority()).thenReturn(priority); + return s; + } + } + + /** Test compaction priority management and multiple compactions per store (HBASE-8665). */ + public void testCompactionQueuePriorities() throws Exception { + // Setup a compact/split thread on a mock server. + final Configuration conf = HBaseConfiguration.create(); + HRegionServer mockServer = mock(HRegionServer.class); + when(mockServer.isStopped()).thenReturn(false); + when(mockServer.getConfiguration()).thenReturn(conf); + CompactSplitThread cst = new CompactSplitThread(mockServer); + when(mockServer.getCompactSplitThread()).thenReturn(cst); + + // Set up the region mock that redirects compactions. + HRegion r = mock(HRegion.class); + when(r.compact(any(CompactionContext.class), any(Store.class))).then(new Answer() { + public Boolean answer(InvocationOnMock invocation) throws Throwable { + ((CompactionContext)invocation.getArguments()[0]).compact(); + return true; + } + }); + + // Set up store mocks for 2 "real" stores and the one we use for blocking CST. + ArrayList results = new ArrayList(); + StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); + Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2"); + BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); + + // First, block the compaction thread so that we could muck with queue. + cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1"); + BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking(); + + // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively. + for (int i = 0; i < 4; ++i) { + sm.notCompacting.add(createFile()); + } + cst.requestSystemCompaction(r, store, "s1-pri3"); + for (int i = 0; i < 3; ++i) { + sm2.notCompacting.add(createFile()); + } + cst.requestSystemCompaction(r, store2, "s2-pri4"); + // Now add 2 more files to store1 and queue compaction - pri 1. + for (int i = 0; i < 2; ++i) { + sm.notCompacting.add(createFile()); + } + cst.requestSystemCompaction(r, store, "s1-pri1"); + // Finally add blocking compaction with priority 2. + cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2"); + + // Unblock the blocking compaction; we should run pri1 and become block again in pri2. + currentBlock.unblock(); + currentBlock = blocker.waitForBlocking(); + // Pri1 should have "compacted" all 6 files. + assertEquals(1, results.size()); + assertEquals(6, results.get(0).intValue()); + // Add 2 files to store 1 (it has 2 files now). + for (int i = 0; i < 2; ++i) { + sm.notCompacting.add(createFile()); + } + // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority + // is 5, however, so it must not preempt store 2. Add blocking compaction at the end. + cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7"); + currentBlock.unblock(); + currentBlock = blocker.waitForBlocking(); + assertEquals(3, results.size()); + assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files. + assertEquals(2, results.get(2).intValue()); + + currentBlock.unblock(); + cst.interruptIfNecessary(); + } + + private static StoreFile createFile() throws Exception { + StoreFile sf = mock(StoreFile.class); + when(sf.getPath()).thenReturn(new Path("file")); + StoreFile.Reader r = mock(StoreFile.Reader.class); + when(r.length()).thenReturn(10L); + when(sf.getReader()).thenReturn(r); + return sf; + } /** * Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes.