HBASE-10015 Replace intrinsic locking with explicit locks in StoreScanner
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1545838 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
60c3ce0081
commit
260fa3a539
|
@ -25,6 +25,7 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -96,6 +97,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
|
|
||||||
// A flag whether use pread for scan
|
// A flag whether use pread for scan
|
||||||
private boolean scanUsePread = false;
|
private boolean scanUsePread = false;
|
||||||
|
private ReentrantLock lock = new ReentrantLock();
|
||||||
|
|
||||||
private final long readPt;
|
private final long readPt;
|
||||||
|
|
||||||
|
@ -354,11 +356,16 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized KeyValue peek() {
|
public KeyValue peek() {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
if (this.heap == null) {
|
if (this.heap == null) {
|
||||||
return this.lastTop;
|
return this.lastTop;
|
||||||
}
|
}
|
||||||
return this.heap.peek();
|
return this.heap.peek();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -368,7 +375,9 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void close() {
|
public void close() {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
if (this.closing) return;
|
if (this.closing) return;
|
||||||
this.closing = true;
|
this.closing = true;
|
||||||
// under test, we dont have a this.store
|
// under test, we dont have a this.store
|
||||||
|
@ -378,13 +387,21 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
this.heap.close();
|
this.heap.close();
|
||||||
this.heap = null; // CLOSED!
|
this.heap = null; // CLOSED!
|
||||||
this.lastTop = null; // If both are null, we are closed.
|
this.lastTop = null; // If both are null, we are closed.
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean seek(KeyValue key) throws IOException {
|
public boolean seek(KeyValue key) throws IOException {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
// reset matcher state, in case that underlying store changed
|
// reset matcher state, in case that underlying store changed
|
||||||
checkReseek();
|
checkReseek();
|
||||||
return this.heap.seek(key);
|
return this.heap.seek(key);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -394,7 +411,9 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
* @return true if there are more rows, false if scanner is done
|
* @return true if there are more rows, false if scanner is done
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean next(List<Cell> outResult, int limit) throws IOException {
|
public boolean next(List<Cell> outResult, int limit) throws IOException {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
if (checkReseek()) {
|
if (checkReseek()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -530,16 +549,21 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
// No more keys
|
// No more keys
|
||||||
close();
|
close();
|
||||||
return false;
|
return false;
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean next(List<Cell> outResult) throws IOException {
|
public boolean next(List<Cell> outResult) throws IOException {
|
||||||
return next(outResult, -1);
|
return next(outResult, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implementation of ChangedReadersObserver
|
// Implementation of ChangedReadersObserver
|
||||||
@Override
|
@Override
|
||||||
public synchronized void updateReaders() throws IOException {
|
public void updateReaders() throws IOException {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
if (this.closing) return;
|
if (this.closing) return;
|
||||||
|
|
||||||
// All public synchronized API calls will call 'checkReseek' which will cause
|
// All public synchronized API calls will call 'checkReseek' which will cause
|
||||||
|
@ -559,6 +583,9 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
|
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
|
||||||
|
|
||||||
// Let the next() call handle re-creating and seeking
|
// Let the next() call handle re-creating and seeking
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -622,7 +649,9 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean reseek(KeyValue kv) throws IOException {
|
public boolean reseek(KeyValue kv) throws IOException {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
//Heap will not be null, if this is called from next() which.
|
//Heap will not be null, if this is called from next() which.
|
||||||
//If called from RegionScanner.reseek(...) make sure the scanner
|
//If called from RegionScanner.reseek(...) make sure the scanner
|
||||||
//stack is reset if needed.
|
//stack is reset if needed.
|
||||||
|
@ -631,6 +660,9 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
return heap.requestSeek(kv, true, useRowColBloom);
|
return heap.requestSeek(kv, true, useRowColBloom);
|
||||||
}
|
}
|
||||||
return heap.reseek(kv);
|
return heap.reseek(kv);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue