diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 95c3b9eeae4..15dd8fe499b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -96,14 +96,16 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { HRegion r = null; try { r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); - if (r != null && !this.server.isStopped()) { + if (r != null) { lock.lock(); try { - // Don't interrupt us while we are working - byte [] midKey = r.compactStores(); - if (shouldSplitRegion() && midKey != null && - !this.server.isStopped()) { - split(r, midKey); + if(!this.server.isStopped()) { + // Don't interrupt us while we are working + byte [] midKey = r.compactStores(); + if (shouldSplitRegion() && midKey != null && + !this.server.isStopped()) { + split(r, midKey); + } } } finally { lock.unlock(); @@ -208,7 +210,11 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { */ void interruptIfNecessary() { if (lock.tryLock()) { - this.interrupt(); + try { + this.interrupt(); + } finally { + lock.unlock(); + } } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0a4fbce99a9..5f829e4f176 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; import java.lang.reflect.Constructor; import java.util.AbstractList; @@ -207,7 +208,7 @@ public class HRegion implements HeapSize { // , Writable{ } } - private final WriteState writestate = new WriteState(); + final WriteState writestate = new WriteState(); final long memstoreFlushSize; private volatile long lastFlushTime; @@ -429,6 +430,12 @@ public class HRegion implements HeapSize { // , Writable{ public boolean isClosing() { return this.closing.get(); } + + boolean areWritesEnabled() { + synchronized(this.writestate) { + return this.writestate.writesEnabled; + } + } public ReadWriteConsistencyControl getRWCC() { return rwcc; @@ -624,7 +631,7 @@ public class HRegion implements HeapSize { // , Writable{ * Do preparation for pending compaction. * @throws IOException */ - private void doRegionCompactionPrep() throws IOException { + void doRegionCompactionPrep() throws IOException { } /* @@ -717,16 +724,24 @@ public class HRegion implements HeapSize { // , Writable{ long startTime = EnvironmentEdgeManager.currentTimeMillis(); doRegionCompactionPrep(); long maxSize = -1; - for (Store store: stores.values()) { - final Store.StoreSize ss = store.compact(majorCompaction); - if (ss != null && ss.getSize() > maxSize) { - maxSize = ss.getSize(); - splitRow = ss.getSplitRow(); + boolean completed = false; + try { + for (Store store: stores.values()) { + final Store.StoreSize ss = store.compact(majorCompaction); + if (ss != null && ss.getSize() > maxSize) { + maxSize = ss.getSize(); + splitRow = ss.getSplitRow(); + } } + completed = true; + } catch (InterruptedIOException iioe) { + LOG.info("compaction interrupted by user: ", iioe); + } finally { + long now = EnvironmentEdgeManager.currentTimeMillis(); + LOG.info(((completed) ? "completed" : "aborted") + + " compaction on region " + this + + " after " + StringUtils.formatTimeDiff(now, startTime)); } - String timeTaken = StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(), - startTime); - LOG.info("compaction completed on region " + this + " in " + timeTaken); } finally { synchronized (writestate) { writestate.compacting = false; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 881c6e99f93..50261073ce4 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -90,6 +91,8 @@ public class Store implements HeapSize { protected long ttl; private long majorCompactionTime; private int maxFilesToCompact; + /* how many bytes to write between status checks */ + static int closeCheckInterval = 0; private final long desiredMaxFileSize; private volatile long storeSize = 0L; private final Object flushLock = new Object(); @@ -192,6 +195,10 @@ public class Store implements HeapSize { } this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10); + if (Store.closeCheckInterval == 0) { + Store.closeCheckInterval = conf.getInt( + "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); + } this.storefiles = sortAndClone(loadStoreFiles()); } @@ -813,23 +820,43 @@ public class Store implements HeapSize { // where all source cells are expired or deleted. StoreFile.Writer writer = null; try { + // NOTE: the majority of the time for a compaction is spent in this section if (majorCompaction) { InternalScanner scanner = null; try { Scan scan = new Scan(); scan.setMaxVersions(family.getMaxVersions()); scanner = new StoreScanner(this, scan, scanners); + int bytesWritten = 0; // since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. ArrayList kvs = new ArrayList(); while (scanner.next(kvs)) { - // output to writer: - for (KeyValue kv : kvs) { - if (writer == null) { - writer = createWriterInTmp(maxKeyCount, - this.compactionCompression); + if (writer == null && !kvs.isEmpty()) { + writer = createWriterInTmp(maxKeyCount, + this.compactionCompression); + } + if (writer != null) { + // output to writer: + for (KeyValue kv : kvs) { + writer.append(kv); + + // check periodically to see if a system stop is requested + if (Store.closeCheckInterval > 0) { + bytesWritten += kv.getLength(); + if (bytesWritten > Store.closeCheckInterval) { + bytesWritten = 0; + if (!this.region.areWritesEnabled()) { + writer.close(); + fs.delete(writer.getPath(), false); + throw new InterruptedIOException( + "Aborting compaction of store " + this + + " in region " + this.region + + " because user requested stop."); + } + } + } } - writer.append(kv); } kvs.clear(); } @@ -842,9 +869,29 @@ public class Store implements HeapSize { MinorCompactingStoreScanner scanner = null; try { scanner = new MinorCompactingStoreScanner(this, scanners); - writer = createWriterInTmp(maxKeyCount); - while (scanner.next(writer)) { - // Nothing to do + if (scanner.peek() != null) { + writer = createWriterInTmp(maxKeyCount); + int bytesWritten = 0; + while (scanner.peek() != null) { + KeyValue kv = scanner.next(); + writer.append(kv); + + // check periodically to see if a system stop is requested + if (Store.closeCheckInterval > 0) { + bytesWritten += kv.getLength(); + if (bytesWritten > Store.closeCheckInterval) { + bytesWritten = 0; + if (!this.region.areWritesEnabled()) { + writer.close(); + fs.delete(writer.getPath(), false); + throw new InterruptedIOException( + "Aborting compaction of store " + this + + " in region " + this.region + + " because user requested stop."); + } + } + } + } } } finally { if (scanner != null) diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 2c3747d39ef..b82db8bab1a 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HConstants; @@ -33,12 +34,18 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + /** * Test compactions @@ -217,17 +224,94 @@ public class TestCompaction extends HBaseTestCase { } assertTrue(containsStartRow); assertTrue(count == 3); - // Do a simple TTL test. + + // Multiple versions allowed for an entry, so the delete isn't enough + // Lower TTL and expire to ensure that all our entries have been wiped final int ttlInSeconds = 1; for (Store store: this.r.stores.values()) { store.ttl = ttlInSeconds * 1000; } Thread.sleep(ttlInSeconds * 1000); + r.compactStores(true); count = count(); assertTrue(count == 0); } + + /** + * Verify that you can stop a long-running compaction + * (used during RS shutdown) + * @throws Exception + */ + public void testInterruptCompaction() throws Exception { + assertEquals(0, count()); + + // lower the polling interval for this test + int origWI = Store.closeCheckInterval; + Store.closeCheckInterval = 10*1000; // 10 KB + + try { + // Create a couple store files w/ 15KB (over 10KB interval) + int jmax = (int) Math.ceil(15.0/COMPACTION_THRESHOLD); + byte [] pad = new byte[1000]; // 1 KB chunk + for (int i = 0; i < COMPACTION_THRESHOLD; i++) { + HRegionIncommon loader = new HRegionIncommon(r); + Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); + for (int j = 0; j < jmax; j++) { + p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad); + } + addContent(loader, Bytes.toString(COLUMN_FAMILY)); + loader.put(p); + loader.flushcache(); + } + + HRegion spyR = spy(r); + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) throws Throwable { + r.writestate.writesEnabled = false; + return invocation.callRealMethod(); + } + }).when(spyR).doRegionCompactionPrep(); + + // force a minor compaction, but not before requesting a stop + spyR.compactStores(); + + // ensure that the compaction stopped, all old files are intact, + Store s = r.stores.get(COLUMN_FAMILY); + assertEquals(COMPACTION_THRESHOLD, s.getStorefilesCount()); + assertTrue(s.getStorefilesSize() > 15*1000); + // and no new store files persisted past compactStores() + FileStatus[] ls = cluster.getFileSystem().listStatus(r.getTmpDir()); + assertEquals(0, ls.length); + + } finally { + // don't mess up future tests + r.writestate.writesEnabled = true; + Store.closeCheckInterval = origWI; + + // Delete all Store information once done using + for (int i = 0; i < COMPACTION_THRESHOLD; i++) { + Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); + byte [][] famAndQf = {COLUMN_FAMILY, null}; + delete.deleteFamily(famAndQf[0]); + r.delete(delete, null, true); + } + r.flushcache(); + + // Multiple versions allowed for an entry, so the delete isn't enough + // Lower TTL and expire to ensure that all our entries have been wiped + final int ttlInSeconds = 1; + for (Store store: this.r.stores.values()) { + store.ttl = ttlInSeconds * 1000; + } + Thread.sleep(ttlInSeconds * 1000); + + r.compactStores(true); + assertEquals(0, count()); + } + } + private int count() throws IOException { int count = 0; for (StoreFile f: this.r.stores.