HBASE-3043 'hbase-daemon.sh stop regionserver' should kill compactions that are in progress

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1004340 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-10-04 17:59:38 +00:00
parent 70ced466fc
commit 18d7fff188
4 changed files with 179 additions and 27 deletions

View File

@ -96,14 +96,16 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
HRegion r = null; HRegion r = null;
try { try {
r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
if (r != null && !this.server.isStopped()) { if (r != null) {
lock.lock(); lock.lock();
try { try {
// Don't interrupt us while we are working if(!this.server.isStopped()) {
byte [] midKey = r.compactStores(); // Don't interrupt us while we are working
if (shouldSplitRegion() && midKey != null && byte [] midKey = r.compactStores();
!this.server.isStopped()) { if (shouldSplitRegion() && midKey != null &&
split(r, midKey); !this.server.isStopped()) {
split(r, midKey);
}
} }
} finally { } finally {
lock.unlock(); lock.unlock();
@ -208,7 +210,11 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
*/ */
void interruptIfNecessary() { void interruptIfNecessary() {
if (lock.tryLock()) { if (lock.tryLock()) {
this.interrupt(); try {
this.interrupt();
} finally {
lock.unlock();
}
} }
} }

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.AbstractList; import java.util.AbstractList;
@ -207,7 +208,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
private final WriteState writestate = new WriteState(); final WriteState writestate = new WriteState();
final long memstoreFlushSize; final long memstoreFlushSize;
private volatile long lastFlushTime; private volatile long lastFlushTime;
@ -429,6 +430,12 @@ public class HRegion implements HeapSize { // , Writable{
public boolean isClosing() { public boolean isClosing() {
return this.closing.get(); return this.closing.get();
} }
boolean areWritesEnabled() {
synchronized(this.writestate) {
return this.writestate.writesEnabled;
}
}
public ReadWriteConsistencyControl getRWCC() { public ReadWriteConsistencyControl getRWCC() {
return rwcc; return rwcc;
@ -624,7 +631,7 @@ public class HRegion implements HeapSize { // , Writable{
* Do preparation for pending compaction. * Do preparation for pending compaction.
* @throws IOException * @throws IOException
*/ */
private void doRegionCompactionPrep() throws IOException { void doRegionCompactionPrep() throws IOException {
} }
/* /*
@ -717,16 +724,24 @@ public class HRegion implements HeapSize { // , Writable{
long startTime = EnvironmentEdgeManager.currentTimeMillis(); long startTime = EnvironmentEdgeManager.currentTimeMillis();
doRegionCompactionPrep(); doRegionCompactionPrep();
long maxSize = -1; long maxSize = -1;
for (Store store: stores.values()) { boolean completed = false;
final Store.StoreSize ss = store.compact(majorCompaction); try {
if (ss != null && ss.getSize() > maxSize) { for (Store store: stores.values()) {
maxSize = ss.getSize(); final Store.StoreSize ss = store.compact(majorCompaction);
splitRow = ss.getSplitRow(); if (ss != null && ss.getSize() > maxSize) {
maxSize = ss.getSize();
splitRow = ss.getSplitRow();
}
} }
completed = true;
} catch (InterruptedIOException iioe) {
LOG.info("compaction interrupted by user: ", iioe);
} finally {
long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.info(((completed) ? "completed" : "aborted")
+ " compaction on region " + this
+ " after " + StringUtils.formatTimeDiff(now, startTime));
} }
String timeTaken = StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(),
startTime);
LOG.info("compaction completed on region " + this + " in " + timeTaken);
} finally { } finally {
synchronized (writestate) { synchronized (writestate) {
writestate.compacting = false; writestate.compacting = false;

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -90,6 +91,8 @@ public class Store implements HeapSize {
protected long ttl; protected long ttl;
private long majorCompactionTime; private long majorCompactionTime;
private int maxFilesToCompact; private int maxFilesToCompact;
/* how many bytes to write between status checks */
static int closeCheckInterval = 0;
private final long desiredMaxFileSize; private final long desiredMaxFileSize;
private volatile long storeSize = 0L; private volatile long storeSize = 0L;
private final Object flushLock = new Object(); private final Object flushLock = new Object();
@ -192,6 +195,10 @@ public class Store implements HeapSize {
} }
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10); this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
if (Store.closeCheckInterval == 0) {
Store.closeCheckInterval = conf.getInt(
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
}
this.storefiles = sortAndClone(loadStoreFiles()); this.storefiles = sortAndClone(loadStoreFiles());
} }
@ -813,23 +820,43 @@ public class Store implements HeapSize {
// where all source cells are expired or deleted. // where all source cells are expired or deleted.
StoreFile.Writer writer = null; StoreFile.Writer writer = null;
try { try {
// NOTE: the majority of the time for a compaction is spent in this section
if (majorCompaction) { if (majorCompaction) {
InternalScanner scanner = null; InternalScanner scanner = null;
try { try {
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(family.getMaxVersions()); scan.setMaxVersions(family.getMaxVersions());
scanner = new StoreScanner(this, scan, scanners); scanner = new StoreScanner(this, scan, scanners);
int bytesWritten = 0;
// since scanner.next() can return 'false' but still be delivering data, // since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop. // we have to use a do/while loop.
ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(); ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
while (scanner.next(kvs)) { while (scanner.next(kvs)) {
// output to writer: if (writer == null && !kvs.isEmpty()) {
for (KeyValue kv : kvs) { writer = createWriterInTmp(maxKeyCount,
if (writer == null) { this.compactionCompression);
writer = createWriterInTmp(maxKeyCount, }
this.compactionCompression); if (writer != null) {
// output to writer:
for (KeyValue kv : kvs) {
writer.append(kv);
// check periodically to see if a system stop is requested
if (Store.closeCheckInterval > 0) {
bytesWritten += kv.getLength();
if (bytesWritten > Store.closeCheckInterval) {
bytesWritten = 0;
if (!this.region.areWritesEnabled()) {
writer.close();
fs.delete(writer.getPath(), false);
throw new InterruptedIOException(
"Aborting compaction of store " + this +
" in region " + this.region +
" because user requested stop.");
}
}
}
} }
writer.append(kv);
} }
kvs.clear(); kvs.clear();
} }
@ -842,9 +869,29 @@ public class Store implements HeapSize {
MinorCompactingStoreScanner scanner = null; MinorCompactingStoreScanner scanner = null;
try { try {
scanner = new MinorCompactingStoreScanner(this, scanners); scanner = new MinorCompactingStoreScanner(this, scanners);
writer = createWriterInTmp(maxKeyCount); if (scanner.peek() != null) {
while (scanner.next(writer)) { writer = createWriterInTmp(maxKeyCount);
// Nothing to do int bytesWritten = 0;
while (scanner.peek() != null) {
KeyValue kv = scanner.next();
writer.append(kv);
// check periodically to see if a system stop is requested
if (Store.closeCheckInterval > 0) {
bytesWritten += kv.getLength();
if (bytesWritten > Store.closeCheckInterval) {
bytesWritten = 0;
if (!this.region.areWritesEnabled()) {
writer.close();
fs.delete(writer.getPath(), false);
throw new InterruptedIOException(
"Aborting compaction of store " + this +
" in region " + this.region +
" because user requested stop.");
}
}
}
}
} }
} finally { } finally {
if (scanner != null) if (scanner != null)

View File

@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -33,12 +34,18 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
/** /**
* Test compactions * Test compactions
@ -217,17 +224,94 @@ public class TestCompaction extends HBaseTestCase {
} }
assertTrue(containsStartRow); assertTrue(containsStartRow);
assertTrue(count == 3); assertTrue(count == 3);
// Do a simple TTL test.
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
final int ttlInSeconds = 1; final int ttlInSeconds = 1;
for (Store store: this.r.stores.values()) { for (Store store: this.r.stores.values()) {
store.ttl = ttlInSeconds * 1000; store.ttl = ttlInSeconds * 1000;
} }
Thread.sleep(ttlInSeconds * 1000); Thread.sleep(ttlInSeconds * 1000);
r.compactStores(true); r.compactStores(true);
count = count(); count = count();
assertTrue(count == 0); assertTrue(count == 0);
} }
/**
* Verify that you can stop a long-running compaction
* (used during RS shutdown)
* @throws Exception
*/
public void testInterruptCompaction() throws Exception {
assertEquals(0, count());
// lower the polling interval for this test
int origWI = Store.closeCheckInterval;
Store.closeCheckInterval = 10*1000; // 10 KB
try {
// Create a couple store files w/ 15KB (over 10KB interval)
int jmax = (int) Math.ceil(15.0/COMPACTION_THRESHOLD);
byte [] pad = new byte[1000]; // 1 KB chunk
for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
HRegionIncommon loader = new HRegionIncommon(r);
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
for (int j = 0; j < jmax; j++) {
p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
}
addContent(loader, Bytes.toString(COLUMN_FAMILY));
loader.put(p);
loader.flushcache();
}
HRegion spyR = spy(r);
doAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) throws Throwable {
r.writestate.writesEnabled = false;
return invocation.callRealMethod();
}
}).when(spyR).doRegionCompactionPrep();
// force a minor compaction, but not before requesting a stop
spyR.compactStores();
// ensure that the compaction stopped, all old files are intact,
Store s = r.stores.get(COLUMN_FAMILY);
assertEquals(COMPACTION_THRESHOLD, s.getStorefilesCount());
assertTrue(s.getStorefilesSize() > 15*1000);
// and no new store files persisted past compactStores()
FileStatus[] ls = cluster.getFileSystem().listStatus(r.getTmpDir());
assertEquals(0, ls.length);
} finally {
// don't mess up future tests
r.writestate.writesEnabled = true;
Store.closeCheckInterval = origWI;
// Delete all Store information once done using
for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
byte [][] famAndQf = {COLUMN_FAMILY, null};
delete.deleteFamily(famAndQf[0]);
r.delete(delete, null, true);
}
r.flushcache();
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
final int ttlInSeconds = 1;
for (Store store: this.r.stores.values()) {
store.ttl = ttlInSeconds * 1000;
}
Thread.sleep(ttlInSeconds * 1000);
r.compactStores(true);
assertEquals(0, count());
}
}
private int count() throws IOException { private int count() throws IOException {
int count = 0; int count = 0;
for (StoreFile f: this.r.stores. for (StoreFile f: this.r.stores.