HBASE-2324 Refactoring of TableRecordReader (mapred / mapreduce) for reuse outside the scope of InputSplit / RecordReader
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@923404 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
11999bd2b3
commit
9013c837e8
|
@ -434,6 +434,8 @@ Release 0.21.0 - Unreleased
|
||||||
(Kay Kay via Stack)
|
(Kay Kay via Stack)
|
||||||
HBASE-2279 Hbase Shell does not have any tests (Alexey Kovyrin via Stack)
|
HBASE-2279 Hbase Shell does not have any tests (Alexey Kovyrin via Stack)
|
||||||
HBASE-2314 [shell] Support for getting counters (Alexey Kovyrin via Stack)
|
HBASE-2314 [shell] Support for getting counters (Alexey Kovyrin via Stack)
|
||||||
|
HBASE-2324 Refactoring of TableRecordReader (mapred / mapreduce) for reuse
|
||||||
|
outside the scope of InputSplit / RecordReader (Kay Kay via Stack)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-1961 HBase EC2 scripts
|
HBASE-1961 HBase EC2 scripts
|
||||||
|
|
|
@ -24,23 +24,16 @@ import java.io.IOException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.mapred.InputFormat;
|
import org.apache.hadoop.mapred.InputFormat;
|
||||||
import org.apache.hadoop.mapred.InputSplit;
|
import org.apache.hadoop.mapred.InputSplit;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.RecordReader;
|
import org.apache.hadoop.mapred.RecordReader;
|
||||||
import org.apache.hadoop.mapred.Reporter;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
|
* A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
|
||||||
|
@ -80,157 +73,6 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
||||||
private TableRecordReader tableRecordReader;
|
private TableRecordReader tableRecordReader;
|
||||||
private Filter rowFilter;
|
private Filter rowFilter;
|
||||||
|
|
||||||
/**
|
|
||||||
* Iterate over an HBase table data, return (Text, RowResult) pairs
|
|
||||||
*/
|
|
||||||
protected class TableRecordReader
|
|
||||||
implements RecordReader<ImmutableBytesWritable, Result> {
|
|
||||||
private byte [] startRow;
|
|
||||||
private byte [] endRow;
|
|
||||||
private byte [] lastRow;
|
|
||||||
private Filter trrRowFilter;
|
|
||||||
private ResultScanner scanner;
|
|
||||||
private HTable htable;
|
|
||||||
private byte [][] trrInputColumns;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Restart from survivable exceptions by creating a new scanner.
|
|
||||||
*
|
|
||||||
* @param firstRow
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public void restart(byte[] firstRow) throws IOException {
|
|
||||||
if ((endRow != null) && (endRow.length > 0)) {
|
|
||||||
if (trrRowFilter != null) {
|
|
||||||
Scan scan = new Scan(firstRow, endRow);
|
|
||||||
scan.addColumns(trrInputColumns);
|
|
||||||
scan.setFilter(trrRowFilter);
|
|
||||||
this.scanner = this.htable.getScanner(scan);
|
|
||||||
} else {
|
|
||||||
LOG.debug("TIFB.restart, firstRow: " +
|
|
||||||
Bytes.toStringBinary(firstRow) + ", endRow: " +
|
|
||||||
Bytes.toStringBinary(endRow));
|
|
||||||
Scan scan = new Scan(firstRow, endRow);
|
|
||||||
scan.addColumns(trrInputColumns);
|
|
||||||
this.scanner = this.htable.getScanner(scan);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG.debug("TIFB.restart, firstRow: " +
|
|
||||||
Bytes.toStringBinary(firstRow) + ", no endRow");
|
|
||||||
|
|
||||||
Scan scan = new Scan(firstRow);
|
|
||||||
scan.addColumns(trrInputColumns);
|
|
||||||
// scan.setFilter(trrRowFilter);
|
|
||||||
this.scanner = this.htable.getScanner(scan);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Build the scanner. Not done in constructor to allow for extension.
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public void init() throws IOException {
|
|
||||||
restart(startRow);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param htable the {@link HTable} to scan.
|
|
||||||
*/
|
|
||||||
public void setHTable(HTable htable) {
|
|
||||||
this.htable = htable;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param inputColumns the columns to be placed in {@link Result}.
|
|
||||||
*/
|
|
||||||
public void setInputColumns(final byte [][] inputColumns) {
|
|
||||||
this.trrInputColumns = inputColumns;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param startRow the first row in the split
|
|
||||||
*/
|
|
||||||
public void setStartRow(final byte [] startRow) {
|
|
||||||
this.startRow = startRow;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param endRow the last row in the split
|
|
||||||
*/
|
|
||||||
public void setEndRow(final byte [] endRow) {
|
|
||||||
this.endRow = endRow;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param rowFilter the {@link Filter} to be used.
|
|
||||||
*/
|
|
||||||
public void setRowFilter(Filter rowFilter) {
|
|
||||||
this.trrRowFilter = rowFilter;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void close() {
|
|
||||||
this.scanner.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return ImmutableBytesWritable
|
|
||||||
*
|
|
||||||
* @see org.apache.hadoop.mapred.RecordReader#createKey()
|
|
||||||
*/
|
|
||||||
public ImmutableBytesWritable createKey() {
|
|
||||||
return new ImmutableBytesWritable();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return RowResult
|
|
||||||
*
|
|
||||||
* @see org.apache.hadoop.mapred.RecordReader#createValue()
|
|
||||||
*/
|
|
||||||
public Result createValue() {
|
|
||||||
return new Result();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getPos() {
|
|
||||||
// This should be the ordinal tuple in the range;
|
|
||||||
// not clear how to calculate...
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public float getProgress() {
|
|
||||||
// Depends on the total number of tuples and getPos
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param key HStoreKey as input key.
|
|
||||||
* @param value MapWritable as input value
|
|
||||||
* @return true if there was more data
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public boolean next(ImmutableBytesWritable key, Result value)
|
|
||||||
throws IOException {
|
|
||||||
Result result;
|
|
||||||
try {
|
|
||||||
result = this.scanner.next();
|
|
||||||
} catch (UnknownScannerException e) {
|
|
||||||
LOG.debug("recovered from " + StringUtils.stringifyException(e));
|
|
||||||
restart(lastRow);
|
|
||||||
this.scanner.next(); // skip presumed already mapped row
|
|
||||||
result = this.scanner.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (result != null && result.size() > 0) {
|
|
||||||
key.set(result.getRow());
|
|
||||||
lastRow = key.get();
|
|
||||||
Writables.copyWritable(result, value);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
|
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
|
||||||
* the default.
|
* the default.
|
||||||
|
|
|
@ -0,0 +1,138 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapred;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.mapred.RecordReader;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Iterate over an HBase table data, return (Text, RowResult) pairs
|
||||||
|
*/
|
||||||
|
public class TableRecordReader
|
||||||
|
implements RecordReader<ImmutableBytesWritable, Result> {
|
||||||
|
|
||||||
|
private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restart from survivable exceptions by creating a new scanner.
|
||||||
|
*
|
||||||
|
* @param firstRow
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void restart(byte[] firstRow) throws IOException {
|
||||||
|
this.recordReaderImpl.restart(firstRow);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the scanner. Not done in constructor to allow for extension.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void init() throws IOException {
|
||||||
|
this.recordReaderImpl.restart(this.recordReaderImpl.getStartRow());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param htable the {@link HTable} to scan.
|
||||||
|
*/
|
||||||
|
public void setHTable(HTable htable) {
|
||||||
|
this.recordReaderImpl.setHTable(htable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param inputColumns the columns to be placed in {@link Result}.
|
||||||
|
*/
|
||||||
|
public void setInputColumns(final byte [][] inputColumns) {
|
||||||
|
this.recordReaderImpl.setInputColumns(inputColumns);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param startRow the first row in the split
|
||||||
|
*/
|
||||||
|
public void setStartRow(final byte [] startRow) {
|
||||||
|
this.recordReaderImpl.setStartRow(startRow);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param endRow the last row in the split
|
||||||
|
*/
|
||||||
|
public void setEndRow(final byte [] endRow) {
|
||||||
|
this.recordReaderImpl.setEndRow(endRow);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param rowFilter the {@link Filter} to be used.
|
||||||
|
*/
|
||||||
|
public void setRowFilter(Filter rowFilter) {
|
||||||
|
this.recordReaderImpl.setRowFilter(rowFilter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
this.recordReaderImpl.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return ImmutableBytesWritable
|
||||||
|
*
|
||||||
|
* @see org.apache.hadoop.mapred.RecordReader#createKey()
|
||||||
|
*/
|
||||||
|
public ImmutableBytesWritable createKey() {
|
||||||
|
return this.recordReaderImpl.createKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return RowResult
|
||||||
|
*
|
||||||
|
* @see org.apache.hadoop.mapred.RecordReader#createValue()
|
||||||
|
*/
|
||||||
|
public Result createValue() {
|
||||||
|
return this.recordReaderImpl.createValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getPos() {
|
||||||
|
|
||||||
|
// This should be the ordinal tuple in the range;
|
||||||
|
// not clear how to calculate...
|
||||||
|
return this.recordReaderImpl.getPos();
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getProgress() {
|
||||||
|
// Depends on the total number of tuples and getPos
|
||||||
|
return this.recordReaderImpl.getPos();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param key HStoreKey as input key.
|
||||||
|
* @param value MapWritable as input value
|
||||||
|
* @return true if there was more data
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public boolean next(ImmutableBytesWritable key, Result value)
|
||||||
|
throws IOException {
|
||||||
|
return this.recordReaderImpl.next(key, value);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,192 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapred;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
|
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Iterate over an HBase table data, return (Text, RowResult) pairs
|
||||||
|
*/
|
||||||
|
public class TableRecordReaderImpl {
|
||||||
|
static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
|
||||||
|
|
||||||
|
private byte [] startRow;
|
||||||
|
private byte [] endRow;
|
||||||
|
private byte [] lastRow;
|
||||||
|
private Filter trrRowFilter;
|
||||||
|
private ResultScanner scanner;
|
||||||
|
private HTable htable;
|
||||||
|
private byte [][] trrInputColumns;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restart from survivable exceptions by creating a new scanner.
|
||||||
|
*
|
||||||
|
* @param firstRow
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void restart(byte[] firstRow) throws IOException {
|
||||||
|
if ((endRow != null) && (endRow.length > 0)) {
|
||||||
|
if (trrRowFilter != null) {
|
||||||
|
Scan scan = new Scan(firstRow, endRow);
|
||||||
|
scan.addColumns(trrInputColumns);
|
||||||
|
scan.setFilter(trrRowFilter);
|
||||||
|
this.scanner = this.htable.getScanner(scan);
|
||||||
|
} else {
|
||||||
|
LOG.debug("TIFB.restart, firstRow: " +
|
||||||
|
Bytes.toStringBinary(firstRow) + ", endRow: " +
|
||||||
|
Bytes.toStringBinary(endRow));
|
||||||
|
Scan scan = new Scan(firstRow, endRow);
|
||||||
|
scan.addColumns(trrInputColumns);
|
||||||
|
this.scanner = this.htable.getScanner(scan);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.debug("TIFB.restart, firstRow: " +
|
||||||
|
Bytes.toStringBinary(firstRow) + ", no endRow");
|
||||||
|
|
||||||
|
Scan scan = new Scan(firstRow);
|
||||||
|
scan.addColumns(trrInputColumns);
|
||||||
|
// scan.setFilter(trrRowFilter);
|
||||||
|
this.scanner = this.htable.getScanner(scan);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the scanner. Not done in constructor to allow for extension.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void init() throws IOException {
|
||||||
|
restart(startRow);
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] getStartRow() {
|
||||||
|
return this.startRow;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* @param htable the {@link HTable} to scan.
|
||||||
|
*/
|
||||||
|
public void setHTable(HTable htable) {
|
||||||
|
this.htable = htable;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param inputColumns the columns to be placed in {@link Result}.
|
||||||
|
*/
|
||||||
|
public void setInputColumns(final byte [][] inputColumns) {
|
||||||
|
this.trrInputColumns = inputColumns;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param startRow the first row in the split
|
||||||
|
*/
|
||||||
|
public void setStartRow(final byte [] startRow) {
|
||||||
|
this.startRow = startRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param endRow the last row in the split
|
||||||
|
*/
|
||||||
|
public void setEndRow(final byte [] endRow) {
|
||||||
|
this.endRow = endRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param rowFilter the {@link Filter} to be used.
|
||||||
|
*/
|
||||||
|
public void setRowFilter(Filter rowFilter) {
|
||||||
|
this.trrRowFilter = rowFilter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
this.scanner.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return ImmutableBytesWritable
|
||||||
|
*
|
||||||
|
* @see org.apache.hadoop.mapred.RecordReader#createKey()
|
||||||
|
*/
|
||||||
|
public ImmutableBytesWritable createKey() {
|
||||||
|
return new ImmutableBytesWritable();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return RowResult
|
||||||
|
*
|
||||||
|
* @see org.apache.hadoop.mapred.RecordReader#createValue()
|
||||||
|
*/
|
||||||
|
public Result createValue() {
|
||||||
|
return new Result();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getPos() {
|
||||||
|
// This should be the ordinal tuple in the range;
|
||||||
|
// not clear how to calculate...
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getProgress() {
|
||||||
|
// Depends on the total number of tuples and getPos
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param key HStoreKey as input key.
|
||||||
|
* @param value MapWritable as input value
|
||||||
|
* @return true if there was more data
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public boolean next(ImmutableBytesWritable key, Result value)
|
||||||
|
throws IOException {
|
||||||
|
Result result;
|
||||||
|
try {
|
||||||
|
result = this.scanner.next();
|
||||||
|
} catch (UnknownScannerException e) {
|
||||||
|
LOG.debug("recovered from " + StringUtils.stringifyException(e));
|
||||||
|
restart(lastRow);
|
||||||
|
this.scanner.next(); // skip presumed already mapped row
|
||||||
|
result = this.scanner.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result != null && result.size() > 0) {
|
||||||
|
key.set(result.getRow());
|
||||||
|
lastRow = key.get();
|
||||||
|
Writables.copyWritable(result, value);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -79,154 +79,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
/** The reader scanning the table, can be a custom one. */
|
/** The reader scanning the table, can be a custom one. */
|
||||||
private TableRecordReader tableRecordReader = null;
|
private TableRecordReader tableRecordReader = null;
|
||||||
|
|
||||||
/**
|
|
||||||
* Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
|
|
||||||
* pairs.
|
|
||||||
*/
|
|
||||||
protected class TableRecordReader
|
|
||||||
extends RecordReader<ImmutableBytesWritable, Result> {
|
|
||||||
|
|
||||||
private ResultScanner scanner = null;
|
|
||||||
private Scan scan = null;
|
|
||||||
private HTable htable = null;
|
|
||||||
private byte[] lastRow = null;
|
|
||||||
private ImmutableBytesWritable key = null;
|
|
||||||
private Result value = null;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Restart from survivable exceptions by creating a new scanner.
|
|
||||||
*
|
|
||||||
* @param firstRow The first row to start at.
|
|
||||||
* @throws IOException When restarting fails.
|
|
||||||
*/
|
|
||||||
public void restart(byte[] firstRow) throws IOException {
|
|
||||||
Scan newScan = new Scan(scan);
|
|
||||||
newScan.setStartRow(firstRow);
|
|
||||||
this.scanner = this.htable.getScanner(newScan);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Build the scanner. Not done in constructor to allow for extension.
|
|
||||||
*
|
|
||||||
* @throws IOException When restarting the scan fails.
|
|
||||||
*/
|
|
||||||
public void init() throws IOException {
|
|
||||||
restart(scan.getStartRow());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the HBase table.
|
|
||||||
*
|
|
||||||
* @param htable The {@link HTable} to scan.
|
|
||||||
*/
|
|
||||||
public void setHTable(HTable htable) {
|
|
||||||
this.htable = htable;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the scan defining the actual details like columns etc.
|
|
||||||
*
|
|
||||||
* @param scan The scan to set.
|
|
||||||
*/
|
|
||||||
public void setScan(Scan scan) {
|
|
||||||
this.scan = scan;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Closes the split.
|
|
||||||
*
|
|
||||||
* @see org.apache.hadoop.mapreduce.RecordReader#close()
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
this.scanner.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the current key.
|
|
||||||
*
|
|
||||||
* @return The current key.
|
|
||||||
* @throws IOException
|
|
||||||
* @throws InterruptedException When the job is aborted.
|
|
||||||
* @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public ImmutableBytesWritable getCurrentKey() throws IOException,
|
|
||||||
InterruptedException {
|
|
||||||
return key;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the current value.
|
|
||||||
*
|
|
||||||
* @return The current value.
|
|
||||||
* @throws IOException When the value is faulty.
|
|
||||||
* @throws InterruptedException When the job is aborted.
|
|
||||||
* @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public Result getCurrentValue() throws IOException, InterruptedException {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initializes the reader.
|
|
||||||
*
|
|
||||||
* @param inputsplit The split to work with.
|
|
||||||
* @param context The current task context.
|
|
||||||
* @throws IOException When setting up the reader fails.
|
|
||||||
* @throws InterruptedException When the job is aborted.
|
|
||||||
* @see org.apache.hadoop.mapreduce.RecordReader#initialize(
|
|
||||||
* org.apache.hadoop.mapreduce.InputSplit,
|
|
||||||
* org.apache.hadoop.mapreduce.TaskAttemptContext)
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void initialize(InputSplit inputsplit,
|
|
||||||
TaskAttemptContext context) throws IOException,
|
|
||||||
InterruptedException {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Positions the record reader to the next record.
|
|
||||||
*
|
|
||||||
* @return <code>true</code> if there was another record.
|
|
||||||
* @throws IOException When reading the record failed.
|
|
||||||
* @throws InterruptedException When the job was aborted.
|
|
||||||
* @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public boolean nextKeyValue() throws IOException, InterruptedException {
|
|
||||||
if (key == null) key = new ImmutableBytesWritable();
|
|
||||||
if (value == null) value = new Result();
|
|
||||||
try {
|
|
||||||
value = this.scanner.next();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.debug("recovered from " + StringUtils.stringifyException(e));
|
|
||||||
restart(lastRow);
|
|
||||||
scanner.next(); // skip presumed already mapped row
|
|
||||||
value = scanner.next();
|
|
||||||
}
|
|
||||||
if (value != null && value.size() > 0) {
|
|
||||||
key.set(value.getRow());
|
|
||||||
lastRow = key.get();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The current progress of the record reader through its data.
|
|
||||||
*
|
|
||||||
* @return A number between 0.0 and 1.0, the fraction of the data read.
|
|
||||||
* @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public float getProgress() {
|
|
||||||
// Depends on the total number of tuples
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
|
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
|
||||||
* the default.
|
* the default.
|
||||||
|
|
|
@ -0,0 +1,155 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
|
||||||
|
* pairs.
|
||||||
|
*/
|
||||||
|
public class TableRecordReader
|
||||||
|
extends RecordReader<ImmutableBytesWritable, Result> {
|
||||||
|
|
||||||
|
private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restart from survivable exceptions by creating a new scanner.
|
||||||
|
*
|
||||||
|
* @param firstRow The first row to start at.
|
||||||
|
* @throws IOException When restarting fails.
|
||||||
|
*/
|
||||||
|
public void restart(byte[] firstRow) throws IOException {
|
||||||
|
this.recordReaderImpl.restart(firstRow);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the scanner. Not done in constructor to allow for extension.
|
||||||
|
*
|
||||||
|
* @throws IOException When restarting the scan fails.
|
||||||
|
*/
|
||||||
|
public void init() throws IOException {
|
||||||
|
this.recordReaderImpl.init();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the HBase table.
|
||||||
|
*
|
||||||
|
* @param htable The {@link HTable} to scan.
|
||||||
|
*/
|
||||||
|
public void setHTable(HTable htable) {
|
||||||
|
this.recordReaderImpl.setHTable(htable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the scan defining the actual details like columns etc.
|
||||||
|
*
|
||||||
|
* @param scan The scan to set.
|
||||||
|
*/
|
||||||
|
public void setScan(Scan scan) {
|
||||||
|
this.recordReaderImpl.setScan(scan);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes the split.
|
||||||
|
*
|
||||||
|
* @see org.apache.hadoop.mapreduce.RecordReader#close()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
this.recordReaderImpl.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current key.
|
||||||
|
*
|
||||||
|
* @return The current key.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException When the job is aborted.
|
||||||
|
* @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public ImmutableBytesWritable getCurrentKey() throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
return this.recordReaderImpl.getCurrentKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current value.
|
||||||
|
*
|
||||||
|
* @return The current value.
|
||||||
|
* @throws IOException When the value is faulty.
|
||||||
|
* @throws InterruptedException When the job is aborted.
|
||||||
|
* @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Result getCurrentValue() throws IOException, InterruptedException {
|
||||||
|
return this.recordReaderImpl.getCurrentValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the reader.
|
||||||
|
*
|
||||||
|
* @param inputsplit The split to work with.
|
||||||
|
* @param context The current task context.
|
||||||
|
* @throws IOException When setting up the reader fails.
|
||||||
|
* @throws InterruptedException When the job is aborted.
|
||||||
|
* @see org.apache.hadoop.mapreduce.RecordReader#initialize(
|
||||||
|
* org.apache.hadoop.mapreduce.InputSplit,
|
||||||
|
* org.apache.hadoop.mapreduce.TaskAttemptContext)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void initialize(InputSplit inputsplit,
|
||||||
|
TaskAttemptContext context) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Positions the record reader to the next record.
|
||||||
|
*
|
||||||
|
* @return <code>true</code> if there was another record.
|
||||||
|
* @throws IOException When reading the record failed.
|
||||||
|
* @throws InterruptedException When the job was aborted.
|
||||||
|
* @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean nextKeyValue() throws IOException, InterruptedException {
|
||||||
|
return this.recordReaderImpl.nextKeyValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The current progress of the record reader through its data.
|
||||||
|
*
|
||||||
|
* @return A number between 0.0 and 1.0, the fraction of the data read.
|
||||||
|
* @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
return this.recordReaderImpl.getProgress();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,157 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
|
||||||
|
* pairs.
|
||||||
|
*/
|
||||||
|
public class TableRecordReaderImpl {
|
||||||
|
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(TableRecordReader.class);
|
||||||
|
|
||||||
|
private ResultScanner scanner = null;
|
||||||
|
private Scan scan = null;
|
||||||
|
private HTable htable = null;
|
||||||
|
private byte[] lastRow = null;
|
||||||
|
private ImmutableBytesWritable key = null;
|
||||||
|
private Result value = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restart from survivable exceptions by creating a new scanner.
|
||||||
|
*
|
||||||
|
* @param firstRow The first row to start at.
|
||||||
|
* @throws IOException When restarting fails.
|
||||||
|
*/
|
||||||
|
public void restart(byte[] firstRow) throws IOException {
|
||||||
|
Scan newScan = new Scan(scan);
|
||||||
|
newScan.setStartRow(firstRow);
|
||||||
|
this.scanner = this.htable.getScanner(newScan);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the scanner. Not done in constructor to allow for extension.
|
||||||
|
*
|
||||||
|
* @throws IOException When restarting the scan fails.
|
||||||
|
*/
|
||||||
|
public void init() throws IOException {
|
||||||
|
restart(scan.getStartRow());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the HBase table.
|
||||||
|
*
|
||||||
|
* @param htable The {@link HTable} to scan.
|
||||||
|
*/
|
||||||
|
public void setHTable(HTable htable) {
|
||||||
|
this.htable = htable;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the scan defining the actual details like columns etc.
|
||||||
|
*
|
||||||
|
* @param scan The scan to set.
|
||||||
|
*/
|
||||||
|
public void setScan(Scan scan) {
|
||||||
|
this.scan = scan;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes the split.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public void close() {
|
||||||
|
this.scanner.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current key.
|
||||||
|
*
|
||||||
|
* @return The current key.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException When the job is aborted.
|
||||||
|
*/
|
||||||
|
public ImmutableBytesWritable getCurrentKey() throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current value.
|
||||||
|
*
|
||||||
|
* @return The current value.
|
||||||
|
* @throws IOException When the value is faulty.
|
||||||
|
* @throws InterruptedException When the job is aborted.
|
||||||
|
*/
|
||||||
|
public Result getCurrentValue() throws IOException, InterruptedException {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Positions the record reader to the next record.
|
||||||
|
*
|
||||||
|
* @return <code>true</code> if there was another record.
|
||||||
|
* @throws IOException When reading the record failed.
|
||||||
|
* @throws InterruptedException When the job was aborted.
|
||||||
|
*/
|
||||||
|
public boolean nextKeyValue() throws IOException, InterruptedException {
|
||||||
|
if (key == null) key = new ImmutableBytesWritable();
|
||||||
|
if (value == null) value = new Result();
|
||||||
|
try {
|
||||||
|
value = this.scanner.next();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.debug("recovered from " + StringUtils.stringifyException(e));
|
||||||
|
restart(lastRow);
|
||||||
|
scanner.next(); // skip presumed already mapped row
|
||||||
|
value = scanner.next();
|
||||||
|
}
|
||||||
|
if (value != null && value.size() > 0) {
|
||||||
|
key.set(value.getRow());
|
||||||
|
lastRow = key.get();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The current progress of the record reader through its data.
|
||||||
|
*
|
||||||
|
* @return A number between 0.0 and 1.0, the fraction of the data read.
|
||||||
|
*/
|
||||||
|
public float getProgress() {
|
||||||
|
// Depends on the total number of tuples
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue