HBASE-1671 HBASE-1609 broke scanners riding across splits

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@798679 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-07-28 19:41:54 +00:00
parent 3086b191dd
commit a68b5ee409
13 changed files with 198 additions and 90 deletions

View File

@ -295,6 +295,7 @@ Release 0.20.0 - Unreleased
(Tim Sell and Ryan Rawson via Stack)
HBASE-1703 ICVs across /during a flush can cause multiple keys with the
same TS (bad)
HBASE-1671 HBASE-1609 broke scanners riding across splits
IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage

View File

@ -31,9 +31,8 @@ class TransactionScannerCallable extends ScannerCallable {
private TransactionState transactionState;
TransactionScannerCallable(final TransactionState transactionState,
final HConnection connection, final byte[] tableName,
final byte[] startRow, Scan scan) {
super(connection, tableName, startRow, scan);
final HConnection connection, final byte[] tableName, Scan scan) {
super(connection, tableName, scan);
this.transactionState = transactionState;
}

View File

@ -186,7 +186,7 @@ public class TransactionalTable extends HTable {
final byte[] localStartKey, int caching) {
TransactionScannerCallable t =
new TransactionScannerCallable(transactionState, getConnection(),
getTableName(), getScan().getStartRow(), getScan());
getTableName(), getScan());
t.setCaching(caching);
return t;
}

View File

@ -42,4 +42,12 @@ public class DoNotRetryIOException extends IOException {
public DoNotRetryIOException(String message) {
super(message);
}
/**
* @param message
* @param cause
*/
public DoNotRetryIOException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -65,8 +65,6 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
* Used by {@link HTable} and {@link HBaseAdmin}
*/
public class HConnectionManager implements HConstants {
private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
/*
* Not instantiable.
*/
@ -94,7 +92,6 @@ public class HConnectionManager implements HConstants {
if (connection == null) {
connection = new TableServers(conf);
HBASE_INSTANCES.put(conf, connection);
LOG.debug("Created new HBASE_INSTANCES");
}
}
return connection;
@ -131,7 +128,7 @@ public class HConnectionManager implements HConstants {
/* Encapsulates finding the servers for an HBase instance */
private static class TableServers implements ServerConnection, HConstants, Watcher {
private static final Log LOG = LogFactory.getLog(TableServers.class);
static final Log LOG = LogFactory.getLog(TableServers.class);
private final Class<? extends HRegionInterface> serverInterfaceClass;
private final long pause;
private final int numRetries;
@ -353,8 +350,7 @@ public class HConnectionManager implements HConstants {
MetaScannerVisitor visitor = new MetaScannerVisitor() {
public boolean processRow(Result result) throws IOException {
try {
byte[] value =
result.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER);
byte[] value = result.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER);
HRegionInfo info = null;
if (value != null) {
info = Writables.getHRegionInfo(value);
@ -411,9 +407,7 @@ public class HConnectionManager implements HConstants {
scan.addColumn(CATALOG_FAMILY, REGIONINFO_QUALIFIER);
ScannerCallable s = new ScannerCallable(this,
(Bytes.equals(tableName, HConstants.META_TABLE_NAME) ?
HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME),
scan.getStartRow(),
scan);
HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME), scan);
try {
// Open scanner
getRegionServerWithRetries(s);
@ -542,7 +536,7 @@ public class HConnectionManager implements HConstants {
*/
private HRegionLocation locateRegionInMeta(final byte [] parentTable,
final byte [] tableName, final byte [] row, boolean useCache)
throws IOException{
throws IOException {
HRegionLocation location = null;
// If supposed to be using the cache, then check it for a possible hit.
// Otherwise, delete any existing cached location so it won't interfere.
@ -969,7 +963,7 @@ public class HConnectionManager implements HConstants {
throw (DoNotRetryIOException) t;
}
}
return null;
return null;
}
private HRegionLocation

View File

@ -29,6 +29,7 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
@ -54,7 +56,6 @@ import org.apache.hadoop.hbase.util.Writables;
/**
* Used to communicate with a single HBase table
* TODO: checkAndSave in oldAPI
* TODO: Converting filters
* TODO: Regex deletes.
*/
public class HTable {
@ -1778,13 +1779,18 @@ public class HTable {
*/
protected class ClientScanner implements ResultScanner {
private final Log CLIENT_LOG = LogFactory.getLog(this.getClass());
// HEADSUP: The scan internal start row can change as we move through table.
private Scan scan;
private boolean closed = false;
// Current region scanner is against. Gets cleared if current region goes
// wonky: e.g. if it splits on us.
private HRegionInfo currentRegion = null;
private ScannerCallable callable = null;
private final LinkedList<Result> cache = new LinkedList<Result>();
private final int scannerCaching = HTable.this.scannerCaching;
private final int caching = HTable.this.scannerCaching;
private long lastNext;
// Keep lastResult returned successfully in case we have to reset scanner.
private Result lastResult = null;
protected ClientScanner(final Scan scan) {
if (CLIENT_LOG.isDebugEnabled()) {
@ -1804,7 +1810,7 @@ public class HTable {
}
public void initialize() throws IOException {
nextScanner(this.scannerCaching);
nextScanner(this.caching);
}
protected Scan getScan() {
@ -1814,10 +1820,12 @@ public class HTable {
protected long getTimestamp() {
return lastNext;
}
/*
* Gets a scanner for the next region.
* Returns false if there are no more scanners.
* Gets a scanner for the next region. If this.currentRegion != null, then
* we will move to the endrow of this.currentRegion. Else we will get
* scanner at the scan.getStartRow().
* @param nbRows
*/
private boolean nextScanner(int nbRows) throws IOException {
// Close the previous scanner if it's open
@ -1826,38 +1834,38 @@ public class HTable {
getConnection().getRegionServerWithRetries(callable);
this.callable = null;
}
// Where to start the next scanner
byte [] localStartKey = null;
// if we're at the end of the table, then close and return false
// to stop iterating
if (currentRegion != null) {
if (this.currentRegion != null) {
if (CLIENT_LOG.isDebugEnabled()) {
CLIENT_LOG.debug("Advancing forward from region " + currentRegion);
CLIENT_LOG.debug("Finished with region " + this.currentRegion);
}
byte [] endKey = currentRegion.getEndKey();
byte [] endKey = this.currentRegion.getEndKey();
if (endKey == null ||
Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
filterSaysStop(endKey)) {
close();
return false;
}
}
HRegionInfo oldRegion = this.currentRegion;
byte [] localStartKey =
oldRegion == null ? scan.getStartRow() : oldRegion.getEndKey();
localStartKey = endKey;
} else {
localStartKey = this.scan.getStartRow();
}
if (CLIENT_LOG.isDebugEnabled()) {
CLIENT_LOG.debug("Advancing internal scanner to startKey at '" +
Bytes.toStringBinary(localStartKey) + "'");
}
}
try {
callable = getScannerCallable(localStartKey, nbRows);
// open a scanner on the region server starting at the
// Open a scanner on the region server starting at the
// beginning of the region
getConnection().getRegionServerWithRetries(callable);
currentRegion = callable.getHRegionInfo();
this.currentRegion = callable.getHRegionInfo();
} catch (IOException e) {
close();
throw e;
@ -1867,8 +1875,9 @@ public class HTable {
protected ScannerCallable getScannerCallable(byte [] localStartKey,
int nbRows) {
scan.setStartRow(localStartKey);
ScannerCallable s = new ScannerCallable(getConnection(),
getTableName(), localStartKey, scan);
getTableName(), scan);
s.setCaching(nbRows);
return s;
}
@ -1915,13 +1924,35 @@ public class HTable {
}
if (cache.size() == 0) {
Result [] values = null;
int countdown = this.scannerCaching;
int countdown = this.caching;
// We need to reset it if it's a new callable that was created
// with a countdown in nextScanner
callable.setCaching(this.scannerCaching);
callable.setCaching(this.caching);
// This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us.
boolean skipFirst = false;
do {
try {
values = getConnection().getRegionServerWithRetries(callable);
if (skipFirst) {
skipFirst = false;
// Reget.
values = getConnection().getRegionServerWithRetries(callable);
}
} catch (DoNotRetryIOException e) {
Throwable cause = e.getCause();
if (cause == null || !(cause instanceof NotServingRegionException)) {
throw e;
}
// Else, its signal from depths of ScannerCallable that we got an
// NSRE on a next and that we need to reset the scanner.
this.scan.setStartRow(this.lastResult.getRow());
// Clear region as flag to nextScanner to use this.scan.startRow.
this.currentRegion = null;
// Skip first row returned. We already let it out on previous
// invocation.
skipFirst = true;
continue;
} catch (IOException e) {
if (e instanceof UnknownScannerException &&
lastNext + scannerTimeout < System.currentTimeMillis()) {
@ -1936,6 +1967,7 @@ public class HTable {
for (Result rs : values) {
cache.add(rs);
countdown--;
this.lastResult = rs;
}
}
} while (countdown > 0 && nextScanner(countdown));
@ -1965,7 +1997,7 @@ public class HTable {
}
return resultSets.toArray(new Result[resultSets.size()]);
}
public void close() {
if (callable != null) {
callable.setClose();
@ -1998,7 +2030,7 @@ public class HTable {
return next != null;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
return true;
}

View File

@ -49,7 +49,7 @@ class MetaScanner implements HConstants {
ScannerCallable callable = null;
do {
Scan scan = new Scan(startRow).addFamily(CATALOG_FAMILY);
callable = new ScannerCallable(connection, META_TABLE_NAME, scan.getStartRow(), scan);
callable = new ScannerCallable(connection, META_TABLE_NAME, scan);
// Open scanner
connection.getRegionServerWithRetries(callable);
try {

View File

@ -1,3 +1,4 @@
/**
* Copyright 2008 The Apache Software Foundation
*
@ -22,7 +23,12 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.ipc.RemoteException;
import org.mortbay.log.Log;
/**
@ -34,20 +40,16 @@ public class ScannerCallable extends ServerCallable<Result[]> {
private boolean instantiated = false;
private boolean closed = false;
private Scan scan;
private byte [] startRow;
private int caching = 1;
/**
* @param connection
* @param tableName
* @param startRow
* @param scan
*/
public ScannerCallable (HConnection connection, byte [] tableName,
byte [] startRow, Scan scan) {
super(connection, tableName, startRow);
public ScannerCallable (HConnection connection, byte [] tableName, Scan scan) {
super(connection, tableName, scan.getStartRow());
this.scan = scan;
this.startRow = startRow;
}
/**
@ -67,18 +69,42 @@ public class ScannerCallable extends ServerCallable<Result[]> {
*/
public Result [] call() throws IOException {
if (scannerId != -1L && closed) {
server.close(scannerId);
scannerId = -1L;
close();
} else if (scannerId == -1L && !closed) {
// open the scanner
scannerId = openScanner();
this.scannerId = openScanner();
} else {
Result [] rrs = server.next(scannerId, caching);
Result [] rrs = null;
try {
rrs = server.next(scannerId, caching);
} catch (IOException e) {
IOException ioe = null;
if (e instanceof RemoteException) {
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
}
if (ioe != null && ioe instanceof NotServingRegionException) {
// Throw a DNRE so that we break out of cycle of calling NSRE
// when what we need is to open scanner against new location.
// Attach NSRE to signal client that it needs to resetup scanner.
throw new DoNotRetryIOException("Reset scanner", ioe);
}
}
return rrs == null || rrs.length == 0? null: rrs;
}
return null;
}
private void close() {
if (this.scannerId == -1L) {
return;
}
try {
this.server.close(this.scannerId);
} catch (IOException e) {
Log.warn("Ignore, probably already closed", e);
}
this.scannerId = -1L;
}
protected long openScanner() throws IOException {
return server.openScanner(
this.location.getRegionInfo().getRegionName(), scan);

View File

@ -729,7 +729,7 @@ public class HFile {
}
protected String toStringLastKey() {
return KeyValue.keyToString(getFirstKey());
return KeyValue.keyToString(getLastKey());
}
public long length() {

View File

@ -188,7 +188,7 @@ public class HLog implements HConstants, Syncable {
* @param listener
* @throws IOException
*/
public HLog(final FileSystem fs, final Path dir, final Configuration conf,
public HLog(final FileSystem fs, final Path dir, final HBaseConfiguration conf,
final LogRollListener listener)
throws IOException {
super();
@ -219,7 +219,7 @@ public class HLog implements HConstants, Syncable {
", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
rollWriter();
// Test if syncfs is available.
this.append = conf.getBoolean("dfs.support.append", false);
this.append = isAppend(conf);
Method m = null;
if (this.append) {
try {
@ -784,7 +784,7 @@ public class HLog implements HConstants, Syncable {
* @throws IOException
*/
public static List<Path> splitLog(final Path rootDir, final Path srcDir,
final FileSystem fs, final Configuration conf)
final FileSystem fs, final HBaseConfiguration conf)
throws IOException {
long millis = System.currentTimeMillis();
List<Path> splits = null;
@ -833,7 +833,8 @@ public class HLog implements HConstants, Syncable {
* @return List of splits made.
*/
private static List<Path> splitLog(final Path rootDir,
final FileStatus [] logfiles, final FileSystem fs, final Configuration conf)
final FileStatus [] logfiles, final FileSystem fs,
final HBaseConfiguration conf)
throws IOException {
final Map<byte [], WriterAndPath> logWriters =
new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR);
@ -848,11 +849,12 @@ public class HLog implements HConstants, Syncable {
// More means faster but bigger mem consumption */
int concurrentLogReads =
conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
// Is append supported?
boolean append = isAppend(conf);
try {
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
concurrentLogReads)).intValue();
for(int step = 0; step < maxSteps; step++) {
for (int step = 0; step < maxSteps; step++) {
final Map<byte[], LinkedList<HLogEntry>> logEntries =
new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
// Stop at logfiles.length when it's the last step
@ -867,7 +869,6 @@ public class HLog implements HConstants, Syncable {
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
}
boolean append = conf.getBoolean("dfs.support.append", false);
recoverLog(fs, logfiles[i].getPath(), append);
SequenceFile.Reader in = null;
int count = 0;
@ -1021,6 +1022,24 @@ public class HLog implements HConstants, Syncable {
return splits;
}
/**
* @param conf
* @return True if append enabled and we have the syncFs in our path.
*/
private static boolean isAppend(final HBaseConfiguration conf) {
boolean append = conf.getBoolean("dfs.support.append", false);
if (append) {
try {
SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
append = true;
} catch (SecurityException e) {
} catch (NoSuchMethodException e) {
append = false;
}
}
return append;
}
/**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
@ -1158,10 +1177,9 @@ public class HLog implements HConstants, Syncable {
System.exit(-1);
}
}
Configuration conf = new HBaseConfiguration();
HBaseConfiguration conf = new HBaseConfiguration();
FileSystem fs = FileSystem.get(conf);
Path baseDir = new Path(conf.get(HBASE_DIR));
for (int i = 1; i < args.length; i++) {
Path logPath = new Path(args[i]);
if (!fs.exists(logPath)) {

View File

@ -110,7 +110,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* master as a region to close if the carrying regionserver is overloaded.
* Once set, it is never cleared.
*/
private final AtomicBoolean closing = new AtomicBoolean(false);
final AtomicBoolean closing = new AtomicBoolean(false);
private final RegionHistorian historian;
//////////////////////////////////////////////////////////////////////////////
@ -1671,8 +1671,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
return this.basedir;
}
//TODO
/**
* RegionScanner is an iterator through a bunch of rows in an HRegion.
* <p>
@ -1710,9 +1708,15 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* Get the next row of results from this region.
* @param results list to append results to
* @return true if there are more rows, false if scanner is done
* @throws NotServerRegionException If this region is closing or closed
*/
public boolean next(List<KeyValue> results)
throws IOException {
if (closing.get() || closed.get()) {
close();
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closing=" + closing.get() + " or closed=" + closed.get());
}
// This method should probably be reorganized a bit... has gotten messy
KeyValue kv = this.storeHeap.peek();
if (kv == null) {

View File

@ -803,7 +803,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
*/
private Throwable cleanup(final Throwable t, final String msg) {
if (msg == null) {
LOG.error(RemoteExceptionHandler.checkThrowable(t));
LOG.error("", RemoteExceptionHandler.checkThrowable(t));
} else {
LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
}
@ -1890,7 +1890,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
public Result [] next(final long scannerId, int nbRows) throws IOException {
try {
String scannerName = String.valueOf(scannerId);
InternalScanner s = scanners.get(scannerName);
InternalScanner s = this.scanners.get(scannerName);
if (s == null) {
throw new UnknownScannerException("Name: " + scannerName);
}
@ -1918,6 +1918,9 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
return results.toArray(new Result[0]);
} catch (Throwable t) {
if (t instanceof NotServingRegionException) {
this.scanners.remove(scannerId);
}
throw convertThrowableToIOE(cleanup(t));
}
}
@ -1978,9 +1981,9 @@ public class HRegionServer implements HConstants, HRegionInterface,
boolean writeToWAL = true;
this.cacheFlusher.reclaimMemStoreMemory();
this.requestCount.incrementAndGet();
Integer lock = getLockFromId(delete.getLockId());
Integer lid = getLockFromId(delete.getLockId());
HRegion region = getRegion(regionName);
region.delete(delete, lock, writeToWAL);
region.delete(delete, lid, writeToWAL);
} catch(WrongRegionException ex) {
} catch (NotServingRegionException ex) {
} catch (Throwable t) {

View File

@ -21,7 +21,10 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
@ -34,6 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* Tests forced splitting of HTable
*/
public class TestForceSplit extends HBaseClusterTestCase {
static final Log LOG = LogFactory.getLog(TestForceSplit.class);
private static final byte[] tableName = Bytes.toBytes("test");
private static final byte[] columnName = Bytes.toBytes("a:");
@ -44,7 +48,7 @@ public class TestForceSplit extends HBaseClusterTestCase {
}
/**
* the test
* Tests forcing split from client and having scanners successfully ride over split.
* @throws Exception
* @throws IOException
*/
@ -55,7 +59,7 @@ public class TestForceSplit extends HBaseClusterTestCase {
htd.addFamily(new HColumnDescriptor(columnName));
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(htd);
HTable table = new HTable(conf, tableName);
final HTable table = new HTable(conf, tableName);
byte[] k = new byte[3];
int rowCount = 0;
for (byte b1 = 'a'; b1 < 'z'; b1++) {
@ -88,31 +92,50 @@ public class TestForceSplit extends HBaseClusterTestCase {
scanner.close();
assertEquals(rowCount, rows);
// tell the master to split the table
admin.split(Bytes.toString(tableName));
// give some time for the split to happen
Thread.sleep(15 * 1000);
// check again table = new HTable(conf, tableName);
m = table.getRegionsInfo();
System.out.println("Regions after split (" + m.size() + "): " + m);
// should have two regions now
assertTrue(m.size() == 2);
// Verify row count
// Have an outstanding scan going on to make sure we can scan over splits.
scan = new Scan();
scanner = table.getScanner(scan);
rows = 0;
for(Result result : scanner) {
// Scan first row so we are into first region before split happens.
scanner.next();
final AtomicInteger count = new AtomicInteger(0);
Thread t = new Thread("CheckForSplit") {
public void run() {
for (int i = 0; i < 20; i++) {
try {
sleep(1000);
} catch (InterruptedException e) {
continue;
}
// check again table = new HTable(conf, tableName);
Map<HRegionInfo, HServerAddress> regions = null;
try {
regions = table.getRegionsInfo();
} catch (IOException e) {
e.printStackTrace();
}
if (regions == null) continue;
count.set(regions.size());
if (count.get() >= 2) break;
LOG.debug("Cycle waiting on split");
}
}
};
t.start();
// tell the master to split the table
admin.split(Bytes.toString(tableName));
t.join();
// Verify row count
rows = 1; // We counted one row above.
for (Result result : scanner) {
rows++;
if(rows > rowCount) {
if (rows > rowCount) {
scanner.close();
assertTrue("Have already scanned more rows than expected (" +
rowCount + ")", false);
assertTrue("Scanned more than expected (" + rowCount + ")", false);
}
}
scanner.close();
assertEquals(rowCount, rows);
}
}
}