HBASE-2503 PriorityQueue isn't thread safe, KeyValueHeap uses it that way
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@942215 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e06e00450e
commit
6dae1e055b
|
@ -303,6 +303,7 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-2482 regions in transition do not get reassigned by master when RS
|
HBASE-2482 regions in transition do not get reassigned by master when RS
|
||||||
crashes (Todd Lipcon via Stack)
|
crashes (Todd Lipcon via Stack)
|
||||||
HBASE-2513 hbase-2414 added bug where we'd tight-loop if no root available
|
HBASE-2513 hbase-2414 added bug where we'd tight-loop if no root available
|
||||||
|
HBASE-2503 PriorityQueue isn't thread safe, KeyValueHeap uses it that way
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-1760 Cleanup TODOs in HTable
|
HBASE-1760 Cleanup TODOs in HTable
|
||||||
|
|
|
@ -34,6 +34,7 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
@ -1824,6 +1825,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
||||||
private Filter filter;
|
private Filter filter;
|
||||||
private List<KeyValue> results = new ArrayList<KeyValue>();
|
private List<KeyValue> results = new ArrayList<KeyValue>();
|
||||||
private int batch;
|
private int batch;
|
||||||
|
// Doesn't need to be volatile, always accessed under a sync'ed method
|
||||||
|
private boolean filterClosed = false;
|
||||||
|
|
||||||
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
|
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
|
||||||
this.filter = scan.getFilter();
|
this.filter = scan.getFilter();
|
||||||
|
@ -1858,7 +1861,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean next(List<KeyValue> outResults, int limit) throws IOException {
|
public synchronized boolean next(List<KeyValue> outResults, int limit)
|
||||||
|
throws IOException {
|
||||||
|
if (this.filterClosed) {
|
||||||
|
throw new UnknownScannerException("Scanner was closed (timed out?) " +
|
||||||
|
"after we renewed it. Could be caused by a very slow scanner " +
|
||||||
|
"or a lengthy garbage collection");
|
||||||
|
}
|
||||||
if (closing.get() || closed.get()) {
|
if (closing.get() || closed.get()) {
|
||||||
close();
|
close();
|
||||||
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
|
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
|
||||||
|
@ -1877,7 +1886,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
||||||
return returnResult;
|
return returnResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean next(List<KeyValue> outResults) throws IOException {
|
public synchronized boolean next(List<KeyValue> outResults)
|
||||||
|
throws IOException {
|
||||||
// apply the batching limit by default
|
// apply the batching limit by default
|
||||||
return next(outResults, batch);
|
return next(outResults, batch);
|
||||||
}
|
}
|
||||||
|
@ -1885,7 +1895,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
||||||
/*
|
/*
|
||||||
* @return True if a filter rules the scanner is over, done.
|
* @return True if a filter rules the scanner is over, done.
|
||||||
*/
|
*/
|
||||||
boolean isFilterDone() {
|
synchronized boolean isFilterDone() {
|
||||||
return this.filter != null && this.filter.filterAllRemaining();
|
return this.filter != null && this.filter.filterAllRemaining();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1955,24 +1965,15 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {
|
public synchronized void close() {
|
||||||
storeHeap.close();
|
storeHeap.close();
|
||||||
}
|
this.filterClosed = true;
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param scanner to be closed
|
|
||||||
*/
|
|
||||||
public void close(KeyValueScanner scanner) {
|
|
||||||
try {
|
|
||||||
scanner.close();
|
|
||||||
} catch(NullPointerException npe) {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the current storeHeap
|
* @return the current storeHeap
|
||||||
*/
|
*/
|
||||||
public KeyValueHeap getStoreHeap() {
|
public synchronized KeyValueHeap getStoreHeap() {
|
||||||
return this.storeHeap;
|
return this.storeHeap;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HServerAddress;
|
import org.apache.hadoop.hbase.HServerAddress;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
@ -195,6 +196,34 @@ public class TestScanner extends HBaseTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that closing a scanner while a client is using it doesn't throw
|
||||||
|
* NPEs but instead a UnknownScannerException. HBASE-2503
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void testRaceBetweenClientAndTimeout() throws Exception {
|
||||||
|
try {
|
||||||
|
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
|
||||||
|
addContent(this.r, HConstants.CATALOG_FAMILY);
|
||||||
|
Scan scan = new Scan();
|
||||||
|
InternalScanner s = r.getScanner(scan);
|
||||||
|
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||||
|
try {
|
||||||
|
s.next(results);
|
||||||
|
s.close();
|
||||||
|
s.next(results);
|
||||||
|
fail("We don't want anything more, we should be failing");
|
||||||
|
} catch (UnknownScannerException ex) {
|
||||||
|
// ok!
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
this.r.close();
|
||||||
|
this.r.getLog().closeAndDelete();
|
||||||
|
shutdownDfs(this.cluster);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** The test!
|
/** The test!
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue