HBASE-12457 Regions in transition for a long time when CLOSE interleaves with a slow compaction.

This commit is contained in:
Lars Hofhansl 2014-11-12 22:49:00 -08:00
parent b25e5bb890
commit 0e795c1cf8
6 changed files with 189 additions and 4 deletions

View File

@ -250,6 +250,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
REPLAY_BATCH_MUTATE, COMPACT_REGION
}
private final Map<Thread, Store> currentCompactions = Maps.newConcurrentMap();
//////////////////////////////////////////////////////////////////////////////
// Members
//////////////////////////////////////////////////////////////////////////////
@ -779,6 +781,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// A region can be reopened if failed a split; reset flags
this.closing.set(false);
this.closed.set(false);
this.writestate.writesEnabled = true;
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-open hooks");
@ -1182,7 +1185,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// region.
writestate.writesEnabled = false;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
waitForFlushesAndCompactions();
// give compactions 30s to finish before we start to interrupt
waitForFlushesAndCompactions(30000);
}
// If we were not just flushing, is it worth doing a preflush...one
// that will clear out of the bulk of the memstore before we put up
@ -1309,6 +1313,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* Exposed for TESTING.
*/
public void waitForFlushesAndCompactions() {
waitForFlushesAndCompactions(0);
}
/**
* Wait for all current flushes and compactions of the region to complete.
* <p>
* Exposed for TESTING.
*/
public void waitForFlushesAndCompactions(long millis) {
synchronized (writestate) {
boolean interrupted = false;
try {
@ -1316,7 +1329,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
LOG.debug("waiting for " + writestate.compacting + " compactions"
+ (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
try {
writestate.wait();
long start = EnvironmentEdgeManager.currentTime();
writestate.wait(millis);
if (millis > 0 && EnvironmentEdgeManager.currentTime() - start >= millis) {
// if we waited once for compactions to finish, interrupt them, and try again
if (LOG.isDebugEnabled()) {
LOG.debug("Waited for " + millis
+ " ms for compactions to finish on close. Interrupting "
+ currentCompactions.size() + " compactions.");
}
for (Thread t : currentCompactions.keySet()) {
// interrupt any current IO in the currently running compactions.
t.interrupt();
}
millis = 0;
}
} catch (InterruptedException iex) {
// essentially ignore and propagate the interrupt back up
LOG.warn("Interrupted while waiting");
@ -5763,7 +5790,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
42 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(12 * Bytes.SIZEOF_LONG) +
4 * Bytes.SIZEOF_BOOLEAN);
@ -6338,6 +6365,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
: (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
}
public void reportCompactionStart(Store store) {
currentCompactions.put(Thread.currentThread(), store);
}
public void reportCompactionEnd(Store store) {
currentCompactions.remove(Thread.currentThread());
}
public void reportCompactionRequestStart(boolean isMajor){
(isMajor ? majorInProgress : minorInProgress).incrementAndGet();
}

View File

@ -2242,4 +2242,14 @@ public class HStore implements Store {
public void deregisterChildren(ConfigurationManager manager) {
// No children to deregister
}
@Override
public void reportCompactionStart() {
getHRegion().reportCompactionStart(this);
}
@Override
public void reportCompactionEnd() {
getHRegion().reportCompactionEnd(this);
}
}

View File

@ -782,7 +782,6 @@ public class SplitTransaction {
break;
case CREATE_SPLIT_DIR:
this.parent.writestate.writesEnabled = true;
this.parent.getRegionFileSystem().cleanupSplitsDir();
break;

View File

@ -398,4 +398,16 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* @throws IOException
*/
void refreshStoreFiles() throws IOException;
/**
* report the beginning of a compaction
* this must be called from the thread performing the compaction
*/
void reportCompactionStart();
/**
* report the completion of a compaction
* this must be called from the thread performing the compaction
*/
void reportCompactionEnd();
}

View File

@ -26,6 +26,7 @@ import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
@ -57,6 +58,7 @@ public class DefaultCompactor extends Compactor {
boolean cleanSeqId = false;
IOException e = null;
try {
store.reportCompactionStart();
InternalScanner scanner = null;
try {
/* Include deletes, unless we are doing a compaction of all files */
@ -108,6 +110,7 @@ public class DefaultCompactor extends Compactor {
newFiles.add(writer.getPath());
}
}
store.reportCompactionEnd();
}
return newFiles;
}

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test compaction IO cancellation.
*/
@Category(MediumTests.class)
public class TestCompactionIO {
private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
private static final CountDownLatch latch = new CountDownLatch(1);
/**
* verify that a compaction stuck in IO is aborted when we attempt to close a region
* @throws Exception
*/
@Test
public void testInterruptCompactionIO() throws Exception {
byte [] STARTROW = Bytes.toBytes(START_KEY);
byte [] COLUMN_FAMILY = fam1;
Configuration conf = UTIL.getConfiguration();
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
conf.set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, BlockedCompactor.class.getName());
int compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
final HRegion r = UTIL.createLocalHRegion(UTIL.createTableDescriptor("TestCompactionIO"), null, null);
//Create a couple store files w/ 15KB (over 10KB interval)
int jmax = (int) Math.ceil(15.0/compactionThreshold);
byte [] pad = new byte[1000]; // 1 KB chunk
for (int i = 0; i < compactionThreshold; i++) {
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
p.setDurability(Durability.SKIP_WAL);
for (int j = 0; j < jmax; j++) {
p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
}
UTIL.loadRegion(r, COLUMN_FAMILY);
r.put(p);
r.flushcache();
}
new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
Thread.sleep(1000);
r.close();
} catch (Exception x) {
throw new RuntimeException(x);
}
}
}).start();
// hangs
r.compactStores();
// ensure that the compaction stopped, all old files are intact,
Store s = r.stores.get(COLUMN_FAMILY);
assertEquals(compactionThreshold, s.getStorefilesCount());
assertTrue(s.getStorefilesSize() > 15*1000);
// and no new store files persisted past compactStores()
FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
// this is happening after the compaction start, the DefaultCompactor does not
// clean tmp files when it encounters an IOException. Should it?
assertEquals(1, ls.length);
}
public static class BlockedCompactor extends DefaultCompactor {
public BlockedCompactor(final Configuration conf, final Store store) {
super(conf, store);
}
@Override
protected boolean performCompaction(InternalScanner scanner,
CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
CellSink myWriter = new CellSink() {
@Override
public void append(Cell cell) throws IOException {
try {
Thread.sleep(100000);
} catch (InterruptedException ie) {
throw new InterruptedIOException(ie.getMessage());
}
}
};
latch.countDown();
return super.performCompaction(scanner, myWriter, smallestReadPoint, cleanSeqId);
}
}
}