HBASE-816 TableMap should survive USE
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@684843 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fa86731a5b
commit
f34578157c
|
@ -10,6 +10,7 @@ Release 0.3.0 - Unreleased
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-801 When a table haven't disable, shell could response in a "user
|
HBASE-801 When a table haven't disable, shell could response in a "user
|
||||||
friendly" way.
|
friendly" way.
|
||||||
|
HBASE-816 TableMap should survive USE (Andrew Purtell via Stack)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Set;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Scanner;
|
import org.apache.hadoop.hbase.client.Scanner;
|
||||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.mapred.InputSplit;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.RecordReader;
|
import org.apache.hadoop.mapred.RecordReader;
|
||||||
import org.apache.hadoop.mapred.Reporter;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
|
* A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
|
||||||
|
@ -85,17 +87,18 @@ implements InputFormat<ImmutableBytesWritable, RowResult> {
|
||||||
implements RecordReader<ImmutableBytesWritable, RowResult> {
|
implements RecordReader<ImmutableBytesWritable, RowResult> {
|
||||||
private byte [] startRow;
|
private byte [] startRow;
|
||||||
private byte [] endRow;
|
private byte [] endRow;
|
||||||
|
private byte [] lastRow;
|
||||||
private RowFilterInterface trrRowFilter;
|
private RowFilterInterface trrRowFilter;
|
||||||
private Scanner scanner;
|
private Scanner scanner;
|
||||||
private HTable htable;
|
private HTable htable;
|
||||||
private byte [][] trrInputColumns;
|
private byte [][] trrInputColumns;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build the scanner. Not done in constructor to allow for extension.
|
* Restart from survivable exceptions by creating a new scanner.
|
||||||
*
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void init() throws IOException {
|
public void restart(byte[] firstRow) throws IOException {
|
||||||
if ((endRow != null) && (endRow.length > 0)) {
|
if ((endRow != null) && (endRow.length > 0)) {
|
||||||
if (trrRowFilter != null) {
|
if (trrRowFilter != null) {
|
||||||
final Set<RowFilterInterface> rowFiltersSet =
|
final Set<RowFilterInterface> rowFiltersSet =
|
||||||
|
@ -107,14 +110,23 @@ implements InputFormat<ImmutableBytesWritable, RowResult> {
|
||||||
rowFiltersSet));
|
rowFiltersSet));
|
||||||
} else {
|
} else {
|
||||||
this.scanner =
|
this.scanner =
|
||||||
this.htable.getScanner(trrInputColumns, startRow, endRow);
|
this.htable.getScanner(trrInputColumns, firstRow, endRow);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
this.scanner =
|
this.scanner =
|
||||||
this.htable.getScanner(trrInputColumns, startRow, trrRowFilter);
|
this.htable.getScanner(trrInputColumns, firstRow, trrRowFilter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the scanner. Not done in constructor to allow for extension.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void init() throws IOException {
|
||||||
|
restart(startRow);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param htable the {@link HTable} to scan.
|
* @param htable the {@link HTable} to scan.
|
||||||
*/
|
*/
|
||||||
|
@ -190,19 +202,25 @@ implements InputFormat<ImmutableBytesWritable, RowResult> {
|
||||||
/**
|
/**
|
||||||
* @param key HStoreKey as input key.
|
* @param key HStoreKey as input key.
|
||||||
* @param value MapWritable as input value
|
* @param value MapWritable as input value
|
||||||
*
|
|
||||||
* Converts Scanner.next() to Text, RowResult
|
|
||||||
*
|
|
||||||
* @return true if there was more data
|
* @return true if there was more data
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public boolean next(ImmutableBytesWritable key, RowResult value)
|
public boolean next(ImmutableBytesWritable key, RowResult value)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
RowResult result = this.scanner.next();
|
RowResult result;
|
||||||
|
try {
|
||||||
|
result = this.scanner.next();
|
||||||
|
} catch (UnknownScannerException e) {
|
||||||
|
LOG.debug("recovered from " + StringUtils.stringifyException(e));
|
||||||
|
restart(lastRow);
|
||||||
|
this.scanner.next(); // skip presumed already mapped row
|
||||||
|
result = this.scanner.next();
|
||||||
|
}
|
||||||
boolean hasMore = result != null && result.size() > 0;
|
boolean hasMore = result != null && result.size() > 0;
|
||||||
if (hasMore) {
|
if (hasMore) {
|
||||||
key.set(result.getRow());
|
key.set(result.getRow());
|
||||||
|
lastRow = key.get();
|
||||||
Writables.copyWritable(result, value);
|
Writables.copyWritable(result, value);
|
||||||
}
|
}
|
||||||
return hasMore;
|
return hasMore;
|
||||||
|
|
Loading…
Reference in New Issue