HBASE-810 Prevent temporary deadlocks when, during a scan with write operations, the region splits (Jean-Daniel Cryans via Jim Kellerman)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@687869 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ed98d6bc25
commit
3e28fa30b7
|
@ -29,6 +29,9 @@ Release 0.3.0 - Unreleased
|
|||
HBASE-831 committing BatchUpdate with no row should complain
|
||||
(Andrew Purtell via Jim Kellerman)
|
||||
HBASE-833 Doing an insert with an unknown family throws a NPE in HRS
|
||||
HBASE-810 Prevent temporary deadlocks when, during a scan with write
|
||||
operations, the region splits (Jean-Daniel Cryans via Jim
|
||||
Kellerman)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-801 When a table haven't disable, shell could response in a "user
|
||||
|
|
|
@ -372,6 +372,9 @@ public class HRegion implements HConstants {
|
|||
// Used to guard splits and closes
|
||||
private final ReentrantReadWriteLock splitsAndClosesLock =
|
||||
new ReentrantReadWriteLock();
|
||||
private final ReentrantReadWriteLock newScannerLock =
|
||||
new ReentrantReadWriteLock();
|
||||
|
||||
// Stop updates lock
|
||||
private final ReentrantReadWriteLock updatesLock =
|
||||
new ReentrantReadWriteLock();
|
||||
|
@ -583,8 +586,8 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
}
|
||||
}
|
||||
splitsAndClosesLock.writeLock().lock();
|
||||
LOG.debug("Updates and scanners disabled for region " + this);
|
||||
newScannerLock.writeLock().lock();
|
||||
LOG.debug("Scanners disabled for region " + this);
|
||||
try {
|
||||
// Wait for active scanners to finish. The write lock we hold will
|
||||
// prevent new scanners from being created.
|
||||
|
@ -600,28 +603,33 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
}
|
||||
LOG.debug("No more active scanners for region " + this);
|
||||
|
||||
// Write lock means no more row locks can be given out. Wait on
|
||||
// outstanding row locks to come in before we close so we do not drop
|
||||
// outstanding updates.
|
||||
waitOnRowLocks();
|
||||
LOG.debug("No more row locks outstanding on region " + this);
|
||||
|
||||
// Don't flush the cache if we are aborting
|
||||
if (!abort) {
|
||||
internalFlushcache();
|
||||
splitsAndClosesLock.writeLock().lock();
|
||||
LOG.debug("Updates disabled for region " + this);
|
||||
try {
|
||||
// Write lock means no more row locks can be given out. Wait on
|
||||
// outstanding row locks to come in before we close so we do not drop
|
||||
// outstanding updates.
|
||||
waitOnRowLocks();
|
||||
LOG.debug("No more row locks outstanding on region " + this);
|
||||
|
||||
// Don't flush the cache if we are aborting
|
||||
if (!abort) {
|
||||
internalFlushcache();
|
||||
}
|
||||
|
||||
List<HStoreFile> result = new ArrayList<HStoreFile>();
|
||||
for (HStore store: stores.values()) {
|
||||
result.addAll(store.close());
|
||||
}
|
||||
this.closed.set(true);
|
||||
|
||||
LOG.info("closed " + this);
|
||||
return result;
|
||||
} finally {
|
||||
splitsAndClosesLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
List<HStoreFile> result = new ArrayList<HStoreFile>();
|
||||
for (HStore store: stores.values()) {
|
||||
result.addAll(store.close());
|
||||
}
|
||||
this.closed.set(true);
|
||||
|
||||
LOG.info("closed " + this);
|
||||
return result;
|
||||
} finally {
|
||||
splitsAndClosesLock.writeLock().unlock();
|
||||
newScannerLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -867,45 +875,50 @@ public class HRegion implements HConstants {
|
|||
* @throws IOException
|
||||
*/
|
||||
private byte [] compactStores(final boolean force) throws IOException {
|
||||
byte [] midKey = null;
|
||||
if (this.closed.get()) {
|
||||
return midKey;
|
||||
}
|
||||
splitsAndClosesLock.readLock().lock();
|
||||
try {
|
||||
synchronized (writestate) {
|
||||
if (!writestate.compacting && writestate.writesEnabled) {
|
||||
writestate.compacting = true;
|
||||
} else {
|
||||
LOG.info("NOT compacting region " + this +
|
||||
": compacting=" + writestate.compacting + ", writesEnabled=" +
|
||||
writestate.writesEnabled);
|
||||
return midKey;
|
||||
byte [] midKey = null;
|
||||
if (this.closed.get()) {
|
||||
return midKey;
|
||||
}
|
||||
try {
|
||||
synchronized (writestate) {
|
||||
if (!writestate.compacting && writestate.writesEnabled) {
|
||||
writestate.compacting = true;
|
||||
} else {
|
||||
LOG.info("NOT compacting region " + this +
|
||||
": compacting=" + writestate.compacting + ", writesEnabled=" +
|
||||
writestate.writesEnabled);
|
||||
return midKey;
|
||||
}
|
||||
}
|
||||
LOG.info("starting compaction on region " + this);
|
||||
long startTime = System.currentTimeMillis();
|
||||
doRegionCompactionPrep();
|
||||
long maxSize = -1;
|
||||
for (HStore store: stores.values()) {
|
||||
final HStore.StoreSize size = store.compact(force);
|
||||
if (size != null && size.getSize() > maxSize) {
|
||||
maxSize = size.getSize();
|
||||
midKey = size.getKey();
|
||||
}
|
||||
}
|
||||
doRegionCompactionCleanup();
|
||||
String timeTaken = StringUtils.formatTimeDiff(System.currentTimeMillis(),
|
||||
startTime);
|
||||
LOG.info("compaction completed on region " + this + " in " + timeTaken);
|
||||
|
||||
this.historian.addRegionCompaction(regionInfo, timeTaken);
|
||||
} finally {
|
||||
synchronized (writestate) {
|
||||
writestate.compacting = false;
|
||||
writestate.notifyAll();
|
||||
}
|
||||
}
|
||||
LOG.info("starting compaction on region " + this);
|
||||
long startTime = System.currentTimeMillis();
|
||||
doRegionCompactionPrep();
|
||||
long maxSize = -1;
|
||||
for (HStore store: stores.values()) {
|
||||
final HStore.StoreSize size = store.compact(force);
|
||||
if (size != null && size.getSize() > maxSize) {
|
||||
maxSize = size.getSize();
|
||||
midKey = size.getKey();
|
||||
}
|
||||
}
|
||||
doRegionCompactionCleanup();
|
||||
String timeTaken = StringUtils.formatTimeDiff(System.currentTimeMillis(),
|
||||
startTime);
|
||||
LOG.info("compaction completed on region " + this + " in " + timeTaken);
|
||||
|
||||
this.historian.addRegionCompaction(regionInfo, timeTaken);
|
||||
return midKey;
|
||||
} finally {
|
||||
synchronized (writestate) {
|
||||
writestate.compacting = false;
|
||||
writestate.notifyAll();
|
||||
}
|
||||
splitsAndClosesLock.readLock().unlock();
|
||||
}
|
||||
return midKey;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1143,15 +1156,20 @@ public class HRegion implements HConstants {
|
|||
public Cell[] get(byte [] row, byte [] column, long timestamp,
|
||||
int numVersions)
|
||||
throws IOException {
|
||||
if (this.closed.get()) {
|
||||
throw new IOException("Region " + this + " closed");
|
||||
splitsAndClosesLock.readLock().lock();
|
||||
try {
|
||||
if (this.closed.get()) {
|
||||
throw new IOException("Region " + this + " closed");
|
||||
}
|
||||
// Make sure this is a valid row and valid column
|
||||
checkRow(row);
|
||||
checkColumn(column);
|
||||
// Don't need a row lock for a simple get
|
||||
HStoreKey key = new HStoreKey(row, column, timestamp);
|
||||
return getStore(column).get(key, numVersions);
|
||||
} finally {
|
||||
splitsAndClosesLock.readLock().unlock();
|
||||
}
|
||||
// Make sure this is a valid row and valid column
|
||||
checkRow(row);
|
||||
checkColumn(column);
|
||||
// Don't need a row lock for a simple get
|
||||
HStoreKey key = new HStoreKey(row, column, timestamp);
|
||||
return getStore(column).get(key, numVersions);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1321,7 +1339,7 @@ public class HRegion implements HConstants {
|
|||
public InternalScanner getScanner(byte[][] cols, byte [] firstRow,
|
||||
long timestamp, RowFilterInterface filter)
|
||||
throws IOException {
|
||||
splitsAndClosesLock.readLock().lock();
|
||||
newScannerLock.readLock().lock();
|
||||
try {
|
||||
if (this.closed.get()) {
|
||||
throw new IOException("Region " + this + " closed");
|
||||
|
@ -1336,7 +1354,7 @@ public class HRegion implements HConstants {
|
|||
return new HScanner(cols, firstRow, timestamp,
|
||||
storeSet.toArray(new HStore [storeSet.size()]), filter);
|
||||
} finally {
|
||||
splitsAndClosesLock.readLock().unlock();
|
||||
newScannerLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -63,7 +63,6 @@ implements ChangedReadersObserver {
|
|||
super(timestamp, targetCols);
|
||||
this.store = store;
|
||||
this.store.addChangedReaderObserver(this);
|
||||
this.store.lock.readLock().lock();
|
||||
try {
|
||||
openReaders(firstRow);
|
||||
} catch (Exception ex) {
|
||||
|
@ -71,8 +70,6 @@ implements ChangedReadersObserver {
|
|||
IOException e = new IOException("HStoreScanner failed construction");
|
||||
e.initCause(ex);
|
||||
throw e;
|
||||
} finally {
|
||||
this.store.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,9 @@ import org.apache.hadoop.hbase.HBaseClusterTestCase;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
|
@ -52,7 +55,9 @@ public class TestSplit extends HBaseClusterTestCase {
|
|||
// Make lease timeout longer, lease checks less frequent
|
||||
conf.setInt("hbase.master.lease.period", 10 * 1000);
|
||||
conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
|
||||
|
||||
|
||||
conf.setInt("hbase.regionserver.lease.period", 10 * 1000);
|
||||
|
||||
// Increase the amount of time between client retries
|
||||
conf.setLong("hbase.client.pause", 15 * 1000);
|
||||
|
||||
|
@ -84,6 +89,57 @@ public class TestSplit extends HBaseClusterTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for HBASE-810
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testScanSplitOnRegion() throws Exception {
|
||||
HRegion region = null;
|
||||
try {
|
||||
HTableDescriptor htd = createTableDescriptor(getName());
|
||||
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3));
|
||||
region = createNewHRegion(htd, null, null);
|
||||
addContent(region, COLFAMILY_NAME3);
|
||||
region.flushcache();
|
||||
final byte [] midkey = region.compactStores();
|
||||
assertNotNull(midkey);
|
||||
byte [][] cols = {COLFAMILY_NAME3};
|
||||
final InternalScanner s = region.getScanner(cols,
|
||||
HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null);
|
||||
final HRegion regionForThread = region;
|
||||
|
||||
Thread splitThread = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
HRegion [] regions = split(regionForThread, midkey);
|
||||
} catch (IOException e) {
|
||||
fail("Unexpected exception " + e);
|
||||
}
|
||||
}
|
||||
};
|
||||
splitThread.start();
|
||||
HRegionServer server = cluster.getRegionThreads().get(0).getRegionServer();
|
||||
long id = server.addScanner(s);
|
||||
for(int i = 0; i < 6; i++) {
|
||||
try {
|
||||
BatchUpdate update =
|
||||
new BatchUpdate(region.getRegionInfo().getStartKey());
|
||||
update.put(COLFAMILY_NAME3, Bytes.toBytes("val"));
|
||||
region.batchUpdate(update);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
fail("Unexpected exception " + e);
|
||||
}
|
||||
}
|
||||
server.next(id);
|
||||
server.close(id);
|
||||
} catch(UnknownScannerException ex) {
|
||||
ex.printStackTrace();
|
||||
fail("Got the " + ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void basicSplit(final HRegion region) throws Exception {
|
||||
addContent(region, COLFAMILY_NAME3);
|
||||
region.flushcache();
|
||||
|
|
Loading…
Reference in New Issue