HBASE-14575 Relax region read lock for compactions (Nick and Ted)
This commit is contained in:
parent
92e178df28
commit
d8b30b8925
|
@ -1793,8 +1793,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
MonitoredTask status = null;
|
MonitoredTask status = null;
|
||||||
boolean requestNeedsCancellation = true;
|
boolean requestNeedsCancellation = true;
|
||||||
// block waiting for the lock for compaction
|
/*
|
||||||
lock.readLock().lock();
|
* We are trying to remove / relax the region read lock for compaction.
|
||||||
|
* Let's see what are the potential race conditions among the operations (user scan,
|
||||||
|
* region split, region close and region bulk load).
|
||||||
|
*
|
||||||
|
* user scan ---> region read lock
|
||||||
|
* region split --> region close first --> region write lock
|
||||||
|
* region close --> region write lock
|
||||||
|
* region bulk load --> region write lock
|
||||||
|
*
|
||||||
|
* read lock is compatible with read lock. ---> no problem with user scan/read
|
||||||
|
* region bulk load does not cause problem for compaction (no consistency problem, store lock
|
||||||
|
* will help the store file accounting).
|
||||||
|
* They can run almost concurrently at the region level.
|
||||||
|
*
|
||||||
|
* The only remaining race condition is between the region close and compaction.
|
||||||
|
* So we will evaluate, below, how region close intervenes with compaction if compaction does
|
||||||
|
* not acquire region read lock.
|
||||||
|
*
|
||||||
|
* Here are the steps for compaction:
|
||||||
|
* 1. obtain list of StoreFile's
|
||||||
|
* 2. create StoreFileScanner's based on list from #1
|
||||||
|
* 3. perform compaction and save resulting files under tmp dir
|
||||||
|
* 4. swap in compacted files
|
||||||
|
*
|
||||||
|
* #1 is guarded by store lock. This patch does not change this --> no worse or better
|
||||||
|
* For #2, we obtain smallest read point (for region) across all the Scanners (for both default
|
||||||
|
* compactor and stripe compactor).
|
||||||
|
* The read points are for user scans. Region keeps the read points for all currently open
|
||||||
|
* user scanners.
|
||||||
|
* Compaction needs to know the smallest read point so that during re-write of the hfiles,
|
||||||
|
* it can remove the mvcc points for the cells if their mvccs are older than the smallest
|
||||||
|
* since they are not needed anymore.
|
||||||
|
* This will not conflict with compaction.
|
||||||
|
* For #3, it can be performed in parallel to other operations.
|
||||||
|
* For #4 bulk load and compaction don't conflict with each other on the region level
|
||||||
|
* (for multi-family atomicy).
|
||||||
|
* Region close and compaction are guarded pretty well by the 'writestate'.
|
||||||
|
* In HRegion#doClose(), we have :
|
||||||
|
* synchronized (writestate) {
|
||||||
|
* // Disable compacting and flushing by background threads for this
|
||||||
|
* // region.
|
||||||
|
* canFlush = !writestate.readOnly;
|
||||||
|
* writestate.writesEnabled = false;
|
||||||
|
* LOG.debug("Closing " + this + ": disabling compactions & flushes");
|
||||||
|
* waitForFlushesAndCompactions();
|
||||||
|
* }
|
||||||
|
* waitForFlushesAndCompactions() would wait for writestate.compacting to come down to 0.
|
||||||
|
* and in HRegion.compact()
|
||||||
|
* try {
|
||||||
|
* synchronized (writestate) {
|
||||||
|
* if (writestate.writesEnabled) {
|
||||||
|
* wasStateSet = true;
|
||||||
|
* ++writestate.compacting;
|
||||||
|
* } else {
|
||||||
|
* String msg = "NOT compacting region " + this + ". Writes disabled.";
|
||||||
|
* LOG.info(msg);
|
||||||
|
* status.abort(msg);
|
||||||
|
* return false;
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* Also in compactor.performCompaction():
|
||||||
|
* check periodically to see if a system stop is requested
|
||||||
|
* if (closeCheckInterval > 0) {
|
||||||
|
* bytesWritten += len;
|
||||||
|
* if (bytesWritten > closeCheckInterval) {
|
||||||
|
* bytesWritten = 0;
|
||||||
|
* if (!store.areWritesEnabled()) {
|
||||||
|
* progress.cancel();
|
||||||
|
* return false;
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
*/
|
||||||
try {
|
try {
|
||||||
byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
|
byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
|
||||||
if (stores.get(cf) != store) {
|
if (stores.get(cf) != store) {
|
||||||
|
@ -1852,12 +1924,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
status.markComplete("Compaction complete");
|
status.markComplete("Compaction complete");
|
||||||
return true;
|
return true;
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
|
||||||
if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
|
if (status != null) status.cleanup();
|
||||||
if (status != null) status.cleanup();
|
|
||||||
} finally {
|
|
||||||
lock.readLock().unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import static org.hamcrest.core.Is.is;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -42,6 +52,9 @@ import org.apache.hadoop.hbase.client.RpcRetryingCaller;
|
||||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
@ -60,24 +73,20 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALKey;
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
import static org.hamcrest.core.Is.is;
|
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
|
* Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
|
||||||
* the region server's bullkLoad functionality.
|
* the region server's bullkLoad functionality.
|
||||||
*/
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
@Category({RegionServerTests.class, LargeTests.class})
|
@Category({RegionServerTests.class, LargeTests.class})
|
||||||
public class TestHRegionServerBulkLoad {
|
public class TestHRegionServerBulkLoad {
|
||||||
private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
|
private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
|
||||||
|
@ -85,6 +94,7 @@ public class TestHRegionServerBulkLoad {
|
||||||
private final static Configuration conf = UTIL.getConfiguration();
|
private final static Configuration conf = UTIL.getConfiguration();
|
||||||
private final static byte[] QUAL = Bytes.toBytes("qual");
|
private final static byte[] QUAL = Bytes.toBytes("qual");
|
||||||
private final static int NUM_CFS = 10;
|
private final static int NUM_CFS = 10;
|
||||||
|
private int sleepDuration;
|
||||||
public static int BLOCKSIZE = 64 * 1024;
|
public static int BLOCKSIZE = 64 * 1024;
|
||||||
public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
|
public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
|
||||||
|
|
||||||
|
@ -94,6 +104,24 @@ public class TestHRegionServerBulkLoad {
|
||||||
families[i] = Bytes.toBytes(family(i));
|
families[i] = Bytes.toBytes(family(i));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@Parameters
|
||||||
|
public static final Collection<Object[]> parameters() {
|
||||||
|
int[] sleepDurations = new int[] { 0, 30000 };
|
||||||
|
List<Object[]> configurations = new ArrayList<Object[]>();
|
||||||
|
for (int i : sleepDurations) {
|
||||||
|
configurations.add(new Object[] { i });
|
||||||
|
}
|
||||||
|
return configurations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestHRegionServerBulkLoad(int duration) {
|
||||||
|
this.sleepDuration = duration;
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
conf.setInt("hbase.rpc.timeout", 10 * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a rowkey compatible with
|
* Create a rowkey compatible with
|
||||||
|
@ -189,8 +217,8 @@ public class TestHRegionServerBulkLoad {
|
||||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||||
|
|
||||||
// Periodically do compaction to reduce the number of open file handles.
|
// Periodically do compaction to reduce the number of open file handles.
|
||||||
if (numBulkLoads.get() % 10 == 0) {
|
if (numBulkLoads.get() % 5 == 0) {
|
||||||
// 10 * 50 = 500 open file handles!
|
// 5 * 50 = 250 open file handles!
|
||||||
callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
|
callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
|
||||||
@Override
|
@Override
|
||||||
public Void call(int callTimeout) throws Exception {
|
public Void call(int callTimeout) throws Exception {
|
||||||
|
@ -211,6 +239,23 @@ public class TestHRegionServerBulkLoad {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class MyObserver extends BaseRegionObserver {
|
||||||
|
static int sleepDuration;
|
||||||
|
@Override
|
||||||
|
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||||
|
final Store store, final InternalScanner scanner, final ScanType scanType)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
Thread.sleep(sleepDuration);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
IOException ioe = new InterruptedIOException();
|
||||||
|
ioe.initCause(ie);
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
|
return scanner;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread that does full scans of the table looking for any partially
|
* Thread that does full scans of the table looking for any partially
|
||||||
* completed rows.
|
* completed rows.
|
||||||
|
@ -278,6 +323,8 @@ public class TestHRegionServerBulkLoad {
|
||||||
try {
|
try {
|
||||||
LOG.info("Creating table " + table);
|
LOG.info("Creating table " + table);
|
||||||
HTableDescriptor htd = new HTableDescriptor(table);
|
HTableDescriptor htd = new HTableDescriptor(table);
|
||||||
|
htd.addCoprocessor(MyObserver.class.getName());
|
||||||
|
MyObserver.sleepDuration = this.sleepDuration;
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
htd.addFamily(new HColumnDescriptor(family(i)));
|
htd.addFamily(new HColumnDescriptor(family(i)));
|
||||||
}
|
}
|
||||||
|
@ -348,7 +395,7 @@ public class TestHRegionServerBulkLoad {
|
||||||
public static void main(String args[]) throws Exception {
|
public static void main(String args[]) throws Exception {
|
||||||
try {
|
try {
|
||||||
Configuration c = HBaseConfiguration.create();
|
Configuration c = HBaseConfiguration.create();
|
||||||
TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
|
TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
|
||||||
test.setConf(c);
|
test.setConf(c);
|
||||||
test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
|
test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
Loading…
Reference in New Issue