HBASE-8665 bad compaction priority behavior in queue can cause store to be blocked

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1491661 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2013-06-11 00:24:21 +00:00
parent 30c8032cb2
commit 03dc97a1f4
5 changed files with 408 additions and 34 deletions

View File

@ -152,7 +152,7 @@ public class CompactSplitThread implements CompactionRequestor {
queueLists.append(" "+it.next().toString());
queueLists.append("\n");
}
if( smallCompactions != null ){
queueLists.append("\n");
queueLists.append(" SmallCompation Queue:\n");
@ -248,14 +248,21 @@ public class CompactSplitThread implements CompactionRequestor {
@Override
public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
return requestCompactionInternal(r, why, p, requests, true);
}
private List<CompactionRequest> requestCompactionInternal(final HRegion r, final String why,
int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow) throws IOException {
// not a special compaction request, so make our own list
List<CompactionRequest> ret;
List<CompactionRequest> ret = null;
if (requests == null) {
ret = new ArrayList<CompactionRequest>(r.getStores().size());
ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
for (Store s : r.getStores().values()) {
ret.add(requestCompaction(r, s, why, p, null));
CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
if (selectNow) ret.add(cr);
}
} else {
Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
ret = new ArrayList<CompactionRequest>(requests.size());
for (Pair<CompactionRequest, Store> pair : requests) {
ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
@ -264,6 +271,21 @@ public class CompactSplitThread implements CompactionRequestor {
return ret;
}
public CompactionRequest requestCompaction(final HRegion r, final Store s,
final String why, int priority, CompactionRequest request) throws IOException {
return requestCompactionInternal(r, s, why, priority, request, true);
}
public synchronized void requestSystemCompaction(
final HRegion r, final String why) throws IOException {
requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
}
public void requestSystemCompaction(
final HRegion r, final Store s, final String why) throws IOException {
requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
}
/**
* @param r HRegion store belongs to
* @param s Store to request compaction on
@ -272,34 +294,48 @@ public class CompactSplitThread implements CompactionRequestor {
* @param request custom compaction request. Can be <tt>null</tt> in which case a simple
* compaction will be used.
*/
public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
final String why, int priority, CompactionRequest request) throws IOException {
private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
final String why, int priority, CompactionRequest request, boolean selectNow)
throws IOException {
if (this.server.isStopped()) {
return null;
}
CompactionContext compaction = null;
if (selectNow) {
compaction = selectCompaction(r, s, priority, request);
if (compaction == null) return null; // message logged inside
}
// We assume that most compactions are small. So, put system compactions into small
// pool; we will do selection there, and move to large pool if necessary.
long size = selectNow ? compaction.getRequest().getSize() : 0;
ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
? largeCompactions : smallCompactions;
pool.execute(new CompactionRunner(s, r, compaction, pool));
if (LOG.isDebugEnabled()) {
String type = (pool == smallCompactions) ? "Small " : "Large ";
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
}
return selectNow ? compaction.getRequest() : null;
}
private CompactionContext selectCompaction(final HRegion r, final Store s,
int priority, CompactionRequest request) throws IOException {
CompactionContext compaction = s.requestCompaction(priority, request);
if (compaction == null) {
if(LOG.isDebugEnabled()) {
LOG.debug("Not compacting " + r.getRegionNameAsString() +
LOG.debug("Not compacting " + r.getRegionNameAsString() +
" because compaction request was cancelled");
}
return null;
}
assert compaction.hasSelection();
if (priority != Store.NO_PRIORITY) {
compaction.getRequest().setPriority(priority);
}
ThreadPoolExecutor pool = s.throttleCompaction(compaction.getRequest().getSize())
? largeCompactions : smallCompactions;
pool.execute(new CompactionRunner(s, r, compaction));
if (LOG.isDebugEnabled()) {
String type = (pool == smallCompactions) ? "Small " : "Large ";
LOG.debug(type + "Compaction requested: " + compaction
+ (why != null && !why.isEmpty() ? "; Because: " + why : "")
+ "; " + this);
}
return compaction.getRequest();
return compaction;
}
/**
@ -358,18 +394,25 @@ public class CompactSplitThread implements CompactionRequestor {
private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
private final Store store;
private final HRegion region;
private final CompactionContext compaction;
private CompactionContext compaction;
private int queuedPriority;
private ThreadPoolExecutor parent;
public CompactionRunner(Store store, HRegion region, CompactionContext compaction) {
public CompactionRunner(Store store, HRegion region,
CompactionContext compaction, ThreadPoolExecutor parent) {
super();
this.store = store;
this.region = region;
this.compaction = compaction;
this.queuedPriority = (this.compaction == null)
? store.getCompactPriority() : compaction.getRequest().getPriority();
this.parent = parent;
}
@Override
public String toString() {
return "Request = " + compaction.getRequest();
return (this.compaction != null) ? ("Request = " + compaction.getRequest())
: ("Store = " + store.toString() + ", pri = " + queuedPriority);
}
@Override
@ -378,6 +421,40 @@ public class CompactSplitThread implements CompactionRequestor {
if (server.isStopped()) {
return;
}
// Common case - system compaction without a file selection. Select now.
if (this.compaction == null) {
int oldPriority = this.queuedPriority;
this.queuedPriority = this.store.getCompactPriority();
if (this.queuedPriority > oldPriority) {
// Store priority decreased while we were in queue (due to some other compaction?),
// requeue with new priority to avoid blocking potential higher priorities.
this.parent.execute(this);
return;
}
try {
this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
} catch (IOException ex) {
LOG.error("Compaction selection failed " + this, ex);
server.checkFileSystem();
return;
}
if (this.compaction == null) return; // nothing to do
// Now see if we are in correct pool for the size; if not, go to the correct one.
// We might end up waiting for a while, so cancel the selection.
assert this.compaction.hasSelection();
ThreadPoolExecutor pool = store.throttleCompaction(
compaction.getRequest().getSize()) ? largeCompactions : smallCompactions;
if (this.parent != pool) {
this.store.cancelRequestedCompaction(this.compaction);
this.compaction = null;
this.parent = pool;
this.parent.execute(this);
return;
}
}
// Finally we can compact something.
assert this.compaction != null;
this.compaction.getRequest().beforeExecute();
try {
// Note: please don't put single-compaction logic here;
@ -390,7 +467,7 @@ public class CompactSplitThread implements CompactionRequestor {
if (completed) {
// degenerate case: blocked regions require recursive enqueues
if (store.getCompactPriority() <= 0) {
requestCompaction(region, store, "Recursive enqueue", null);
requestSystemCompaction(region, store, "Recursive enqueue");
} else {
// see if the compaction has caused us to exceed max region size
requestSplit(region);
@ -422,8 +499,13 @@ public class CompactSplitThread implements CompactionRequestor {
@Override
public int compareTo(CompactionRunner o) {
// Only compare the underlying request, for queue sorting purposes.
return this.compaction.getRequest().compareTo(o.compaction.getRequest());
// Only compare the underlying request (if any), for queue sorting purposes.
int compareVal = queuedPriority - o.queuedPriority; // compare priority
if (compareVal != 0) return compareVal;
CompactionContext tc = this.compaction, oc = o.compaction;
// Sort pre-selected (user?) compactions before system ones with equal priority.
return (tc == null) ? ((oc == null) ? 0 : 1)
: ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
}
}

View File

@ -1330,8 +1330,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
if (iteration % multiplier != 0) continue;
if (s.needsCompaction()) {
// Queue a compaction. Will recognize if major is needed.
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests compaction", null);
this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
+ " requests compaction");
} else if (s.isMajorCompaction()) {
if (majorCompactPriority == DEFAULT_PRIORITY
|| majorCompactPriority > r.getCompactPriority()) {
@ -1690,7 +1690,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
// Do checks to see if we need to compact (references or too many files)
for (Store s : r.getStores().values()) {
if (s.hasReferences() || s.needsCompaction()) {
getCompactionRequester().requestCompaction(r, s, "Opening Region", null);
this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
}
}
long openSeqNum = r.getOpenSeqNum();
@ -3576,7 +3576,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
if (shouldFlush) {
boolean result = region.flushcache();
if (result) {
this.compactSplitThread.requestCompaction(region,
this.compactSplitThread.requestSystemCompaction(region,
"Compaction through user triggered flush");
}
builder.setFlushed(result);

View File

@ -408,8 +408,8 @@ class MemStoreFlusher implements FlushRequester {
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
if (!this.server.compactSplitThread.requestSplit(region)) {
try {
this.server.compactSplitThread.requestCompaction(region, Thread
.currentThread().getName());
this.server.compactSplitThread.requestSystemCompaction(
region, Thread.currentThread().getName());
} catch (IOException e) {
LOG.error(
"Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
@ -457,7 +457,8 @@ class MemStoreFlusher implements FlushRequester {
if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName());
server.compactSplitThread.requestSystemCompaction(
region, Thread.currentThread().getName());
}
} catch (DroppedSnapshotException ex) {

View File

@ -0,0 +1,64 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* This class is a helper that allows to create a partially-implemented, stateful mocks of
* Store. It contains a bunch of blank methods, and answers redirecting to these.
*/
public class StatefulStoreMockMaker {
// Add and expand the methods and answers as needed.
public CompactionContext selectCompaction() { return null; }
public void cancelCompaction(Object originalContext) {}
public int getPriority() { return 0; }
private class SelectAnswer implements Answer<CompactionContext> {
public CompactionContext answer(InvocationOnMock invocation) throws Throwable {
return selectCompaction();
}
}
private class PriorityAnswer implements Answer<Integer> {
public Integer answer(InvocationOnMock invocation) throws Throwable {
return getPriority();
}
}
private class CancelAnswer implements Answer<Object> {
public CompactionContext answer(InvocationOnMock invocation) throws Throwable {
cancelCompaction(invocation.getArguments()[0]); return null;
}
}
public Store createStoreMock(String name) throws Exception {
Store store = mock(Store.class, name);
when(store.requestCompaction(
anyInt(), isNull(CompactionRequest.class))).then(new SelectAnswer());
when(store.getCompactPriority()).then(new PriorityAnswer());
doAnswer(new CancelAnswer()).when(
store).cancelRequestedCompaction(any(CompactionContext.class));
return store;
}
}

View File

@ -18,8 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.ArrayList;
@ -33,10 +34,12 @@ import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -53,6 +56,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
@ -60,6 +64,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPoli
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assume;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@ -730,6 +736,227 @@ public class TestCompaction extends HBaseTestCase {
thread.interruptIfNecessary();
}
private class StoreMockMaker extends StatefulStoreMockMaker {
public ArrayList<StoreFile> compacting = new ArrayList<StoreFile>();
public ArrayList<StoreFile> notCompacting = new ArrayList<StoreFile>();
private ArrayList<Integer> results;
public StoreMockMaker(ArrayList<Integer> results) {
this.results = results;
}
public class TestCompactionContext extends CompactionContext {
private List<StoreFile> selectedFiles;
public TestCompactionContext(List<StoreFile> selectedFiles) {
super();
this.selectedFiles = selectedFiles;
}
@Override
public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
return new ArrayList<StoreFile>();
}
@Override
public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
this.request = new CompactionRequest(selectedFiles);
this.request.setPriority(getPriority());
return true;
}
@Override
public List<Path> compact() throws IOException {
finishCompaction(this.selectedFiles);
return new ArrayList<Path>();
}
}
@Override
public synchronized CompactionContext selectCompaction() {
CompactionContext ctx = new TestCompactionContext(new ArrayList<StoreFile>(notCompacting));
compacting.addAll(notCompacting);
notCompacting.clear();
try {
ctx.select(null, false, false, false);
} catch (IOException ex) {
fail("Shouldn't happen");
}
return ctx;
}
@Override
public synchronized void cancelCompaction(Object object) {
TestCompactionContext ctx = (TestCompactionContext)object;
compacting.removeAll(ctx.selectedFiles);
notCompacting.addAll(ctx.selectedFiles);
}
public synchronized void finishCompaction(List<StoreFile> sfs) {
if (sfs.isEmpty()) return;
synchronized (results) {
results.add(sfs.size());
}
compacting.removeAll(sfs);
}
@Override
public int getPriority() {
return 7 - compacting.size() - notCompacting.size();
}
}
public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
BlockingCompactionContext blocked = null;
public class BlockingCompactionContext extends CompactionContext {
public volatile boolean isInCompact = false;
public void unblock() {
synchronized (this) { this.notifyAll(); }
}
@Override
public List<Path> compact() throws IOException {
try {
isInCompact = true;
synchronized (this) { this.wait(); }
} catch (InterruptedException e) {
Assume.assumeNoException(e);
}
return new ArrayList<Path>();
}
@Override
public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
return new ArrayList<StoreFile>();
}
@Override
public boolean select(List<StoreFile> f, boolean i, boolean m, boolean e)
throws IOException {
this.request = new CompactionRequest(new ArrayList<StoreFile>());
return true;
}
}
@Override
public CompactionContext selectCompaction() {
this.blocked = new BlockingCompactionContext();
try {
this.blocked.select(null, false, false, false);
} catch (IOException ex) {
fail("Shouldn't happen");
}
return this.blocked;
}
@Override
public void cancelCompaction(Object object) {}
public int getPriority() {
return Integer.MIN_VALUE; // some invalid value, see createStoreMock
}
public BlockingCompactionContext waitForBlocking() {
while (this.blocked == null || !this.blocked.isInCompact) {
Threads.sleepWithoutInterrupt(50);
}
BlockingCompactionContext ctx = this.blocked;
this.blocked = null;
return ctx;
}
@Override
public Store createStoreMock(String name) throws Exception {
return createStoreMock(Integer.MIN_VALUE, name);
}
public Store createStoreMock(int priority, String name) throws Exception {
// Override the mock to always return the specified priority.
Store s = super.createStoreMock(name);
when(s.getCompactPriority()).thenReturn(priority);
return s;
}
}
/** Test compaction priority management and multiple compactions per store (HBASE-8665). */
public void testCompactionQueuePriorities() throws Exception {
// Setup a compact/split thread on a mock server.
final Configuration conf = HBaseConfiguration.create();
HRegionServer mockServer = mock(HRegionServer.class);
when(mockServer.isStopped()).thenReturn(false);
when(mockServer.getConfiguration()).thenReturn(conf);
CompactSplitThread cst = new CompactSplitThread(mockServer);
when(mockServer.getCompactSplitThread()).thenReturn(cst);
// Set up the region mock that redirects compactions.
HRegion r = mock(HRegion.class);
when(r.compact(any(CompactionContext.class), any(Store.class))).then(new Answer<Boolean>() {
public Boolean answer(InvocationOnMock invocation) throws Throwable {
((CompactionContext)invocation.getArguments()[0]).compact();
return true;
}
});
// Set up store mocks for 2 "real" stores and the one we use for blocking CST.
ArrayList<Integer> results = new ArrayList<Integer>();
StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
// First, block the compaction thread so that we could muck with queue.
cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
// Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
for (int i = 0; i < 4; ++i) {
sm.notCompacting.add(createFile());
}
cst.requestSystemCompaction(r, store, "s1-pri3");
for (int i = 0; i < 3; ++i) {
sm2.notCompacting.add(createFile());
}
cst.requestSystemCompaction(r, store2, "s2-pri4");
// Now add 2 more files to store1 and queue compaction - pri 1.
for (int i = 0; i < 2; ++i) {
sm.notCompacting.add(createFile());
}
cst.requestSystemCompaction(r, store, "s1-pri1");
// Finally add blocking compaction with priority 2.
cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
// Unblock the blocking compaction; we should run pri1 and become block again in pri2.
currentBlock.unblock();
currentBlock = blocker.waitForBlocking();
// Pri1 should have "compacted" all 6 files.
assertEquals(1, results.size());
assertEquals(6, results.get(0).intValue());
// Add 2 files to store 1 (it has 2 files now).
for (int i = 0; i < 2; ++i) {
sm.notCompacting.add(createFile());
}
// Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
// is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
currentBlock.unblock();
currentBlock = blocker.waitForBlocking();
assertEquals(3, results.size());
assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
assertEquals(2, results.get(2).intValue());
currentBlock.unblock();
cst.interruptIfNecessary();
}
private static StoreFile createFile() throws Exception {
StoreFile sf = mock(StoreFile.class);
when(sf.getPath()).thenReturn(new Path("file"));
StoreFile.Reader r = mock(StoreFile.Reader.class);
when(r.length()).thenReturn(10L);
when(sf.getReader()).thenReturn(r);
return sf;
}
/**
* Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes.