diff --git a/CHANGES.txt b/CHANGES.txt index bb464b10c4c..2a794123bc5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -434,6 +434,8 @@ Release 0.21.0 - Unreleased (Kay Kay via Stack) HBASE-2279 Hbase Shell does not have any tests (Alexey Kovyrin via Stack) HBASE-2314 [shell] Support for getting counters (Alexey Kovyrin via Stack) + HBASE-2324 Refactoring of TableRecordReader (mapred / mapreduce) for reuse + outside the scope of InputSplit / RecordReader (Kay Kay via Stack) NEW FEATURES HBASE-1961 HBase EC2 scripts diff --git a/core/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/core/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java index f5eca94ad73..56384e179d9 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java +++ b/core/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java @@ -24,23 +24,16 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.StringUtils; /** * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a @@ -80,157 +73,6 @@ implements InputFormat { private TableRecordReader tableRecordReader; private Filter rowFilter; - /** - * Iterate over an HBase table data, return (Text, RowResult) pairs - */ - protected class TableRecordReader - implements RecordReader { - private byte [] startRow; - private byte [] endRow; - private byte [] lastRow; - private Filter trrRowFilter; - private ResultScanner scanner; - private HTable htable; - private byte [][] trrInputColumns; - - /** - * Restart from survivable exceptions by creating a new scanner. - * - * @param firstRow - * @throws IOException - */ - public void restart(byte[] firstRow) throws IOException { - if ((endRow != null) && (endRow.length > 0)) { - if (trrRowFilter != null) { - Scan scan = new Scan(firstRow, endRow); - scan.addColumns(trrInputColumns); - scan.setFilter(trrRowFilter); - this.scanner = this.htable.getScanner(scan); - } else { - LOG.debug("TIFB.restart, firstRow: " + - Bytes.toStringBinary(firstRow) + ", endRow: " + - Bytes.toStringBinary(endRow)); - Scan scan = new Scan(firstRow, endRow); - scan.addColumns(trrInputColumns); - this.scanner = this.htable.getScanner(scan); - } - } else { - LOG.debug("TIFB.restart, firstRow: " + - Bytes.toStringBinary(firstRow) + ", no endRow"); - - Scan scan = new Scan(firstRow); - scan.addColumns(trrInputColumns); -// scan.setFilter(trrRowFilter); - this.scanner = this.htable.getScanner(scan); - } - } - - /** - * Build the scanner. Not done in constructor to allow for extension. - * - * @throws IOException - */ - public void init() throws IOException { - restart(startRow); - } - - /** - * @param htable the {@link HTable} to scan. - */ - public void setHTable(HTable htable) { - this.htable = htable; - } - - /** - * @param inputColumns the columns to be placed in {@link Result}. - */ - public void setInputColumns(final byte [][] inputColumns) { - this.trrInputColumns = inputColumns; - } - - /** - * @param startRow the first row in the split - */ - public void setStartRow(final byte [] startRow) { - this.startRow = startRow; - } - - /** - * - * @param endRow the last row in the split - */ - public void setEndRow(final byte [] endRow) { - this.endRow = endRow; - } - - /** - * @param rowFilter the {@link Filter} to be used. - */ - public void setRowFilter(Filter rowFilter) { - this.trrRowFilter = rowFilter; - } - - public void close() { - this.scanner.close(); - } - - /** - * @return ImmutableBytesWritable - * - * @see org.apache.hadoop.mapred.RecordReader#createKey() - */ - public ImmutableBytesWritable createKey() { - return new ImmutableBytesWritable(); - } - - /** - * @return RowResult - * - * @see org.apache.hadoop.mapred.RecordReader#createValue() - */ - public Result createValue() { - return new Result(); - } - - public long getPos() { - // This should be the ordinal tuple in the range; - // not clear how to calculate... - return 0; - } - - public float getProgress() { - // Depends on the total number of tuples and getPos - return 0; - } - - /** - * @param key HStoreKey as input key. - * @param value MapWritable as input value - * @return true if there was more data - * @throws IOException - */ - public boolean next(ImmutableBytesWritable key, Result value) - throws IOException { - Result result; - try { - result = this.scanner.next(); - } catch (UnknownScannerException e) { - LOG.debug("recovered from " + StringUtils.stringifyException(e)); - restart(lastRow); - this.scanner.next(); // skip presumed already mapped row - result = this.scanner.next(); - } - - if (result != null && result.size() > 0) { - key.set(result.getRow()); - lastRow = key.get(); - Writables.copyWritable(result, value); - return true; - } - return false; - } - } - /** * Builds a TableRecordReader. If no TableRecordReader was provided, uses * the default. diff --git a/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java b/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java new file mode 100644 index 00000000000..a6a8995b82e --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java @@ -0,0 +1,138 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.RecordReader; + + +/** + * Iterate over an HBase table data, return (Text, RowResult) pairs + */ +public class TableRecordReader +implements RecordReader { + + private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl(); + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow + * @throws IOException + */ + public void restart(byte[] firstRow) throws IOException { + this.recordReaderImpl.restart(firstRow); + } + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + public void init() throws IOException { + this.recordReaderImpl.restart(this.recordReaderImpl.getStartRow()); + } + + /** + * @param htable the {@link HTable} to scan. + */ + public void setHTable(HTable htable) { + this.recordReaderImpl.setHTable(htable); + } + + /** + * @param inputColumns the columns to be placed in {@link Result}. + */ + public void setInputColumns(final byte [][] inputColumns) { + this.recordReaderImpl.setInputColumns(inputColumns); + } + + /** + * @param startRow the first row in the split + */ + public void setStartRow(final byte [] startRow) { + this.recordReaderImpl.setStartRow(startRow); + } + + /** + * + * @param endRow the last row in the split + */ + public void setEndRow(final byte [] endRow) { + this.recordReaderImpl.setEndRow(endRow); + } + + /** + * @param rowFilter the {@link Filter} to be used. + */ + public void setRowFilter(Filter rowFilter) { + this.recordReaderImpl.setRowFilter(rowFilter); + } + + public void close() { + this.recordReaderImpl.close(); + } + + /** + * @return ImmutableBytesWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public ImmutableBytesWritable createKey() { + return this.recordReaderImpl.createKey(); + } + + /** + * @return RowResult + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public Result createValue() { + return this.recordReaderImpl.createValue(); + } + + public long getPos() { + + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return this.recordReaderImpl.getPos(); + } + + public float getProgress() { + // Depends on the total number of tuples and getPos + return this.recordReaderImpl.getPos(); + } + + /** + * @param key HStoreKey as input key. + * @param value MapWritable as input value + * @return true if there was more data + * @throws IOException + */ + public boolean next(ImmutableBytesWritable key, Result value) + throws IOException { + return this.recordReaderImpl.next(key, value); + } +} diff --git a/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java b/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java new file mode 100644 index 00000000000..34da93707a7 --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java @@ -0,0 +1,192 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; + +import org.apache.hadoop.util.StringUtils; + + +/** + * Iterate over an HBase table data, return (Text, RowResult) pairs + */ +public class TableRecordReaderImpl { + static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class); + + private byte [] startRow; + private byte [] endRow; + private byte [] lastRow; + private Filter trrRowFilter; + private ResultScanner scanner; + private HTable htable; + private byte [][] trrInputColumns; + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow + * @throws IOException + */ + public void restart(byte[] firstRow) throws IOException { + if ((endRow != null) && (endRow.length > 0)) { + if (trrRowFilter != null) { + Scan scan = new Scan(firstRow, endRow); + scan.addColumns(trrInputColumns); + scan.setFilter(trrRowFilter); + this.scanner = this.htable.getScanner(scan); + } else { + LOG.debug("TIFB.restart, firstRow: " + + Bytes.toStringBinary(firstRow) + ", endRow: " + + Bytes.toStringBinary(endRow)); + Scan scan = new Scan(firstRow, endRow); + scan.addColumns(trrInputColumns); + this.scanner = this.htable.getScanner(scan); + } + } else { + LOG.debug("TIFB.restart, firstRow: " + + Bytes.toStringBinary(firstRow) + ", no endRow"); + + Scan scan = new Scan(firstRow); + scan.addColumns(trrInputColumns); +// scan.setFilter(trrRowFilter); + this.scanner = this.htable.getScanner(scan); + } + } + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + public void init() throws IOException { + restart(startRow); + } + + byte[] getStartRow() { + return this.startRow; + } + /** + * @param htable the {@link HTable} to scan. + */ + public void setHTable(HTable htable) { + this.htable = htable; + } + + /** + * @param inputColumns the columns to be placed in {@link Result}. + */ + public void setInputColumns(final byte [][] inputColumns) { + this.trrInputColumns = inputColumns; + } + + /** + * @param startRow the first row in the split + */ + public void setStartRow(final byte [] startRow) { + this.startRow = startRow; + } + + /** + * + * @param endRow the last row in the split + */ + public void setEndRow(final byte [] endRow) { + this.endRow = endRow; + } + + /** + * @param rowFilter the {@link Filter} to be used. + */ + public void setRowFilter(Filter rowFilter) { + this.trrRowFilter = rowFilter; + } + + public void close() { + this.scanner.close(); + } + + /** + * @return ImmutableBytesWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + /** + * @return RowResult + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public Result createValue() { + return new Result(); + } + + public long getPos() { + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return 0; + } + + public float getProgress() { + // Depends on the total number of tuples and getPos + return 0; + } + + /** + * @param key HStoreKey as input key. + * @param value MapWritable as input value + * @return true if there was more data + * @throws IOException + */ + public boolean next(ImmutableBytesWritable key, Result value) + throws IOException { + Result result; + try { + result = this.scanner.next(); + } catch (UnknownScannerException e) { + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + restart(lastRow); + this.scanner.next(); // skip presumed already mapped row + result = this.scanner.next(); + } + + if (result != null && result.size() > 0) { + key.set(result.getRow()); + lastRow = key.get(); + Writables.copyWritable(result, value); + return true; + } + return false; + } +} diff --git a/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 4c41abe01ee..58145ee6ee2 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -79,154 +79,7 @@ extends InputFormat { /** The reader scanning the table, can be a custom one. */ private TableRecordReader tableRecordReader = null; - /** - * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) - * pairs. - */ - protected class TableRecordReader - extends RecordReader { - - private ResultScanner scanner = null; - private Scan scan = null; - private HTable htable = null; - private byte[] lastRow = null; - private ImmutableBytesWritable key = null; - private Result value = null; - - /** - * Restart from survivable exceptions by creating a new scanner. - * - * @param firstRow The first row to start at. - * @throws IOException When restarting fails. - */ - public void restart(byte[] firstRow) throws IOException { - Scan newScan = new Scan(scan); - newScan.setStartRow(firstRow); - this.scanner = this.htable.getScanner(newScan); - } - - /** - * Build the scanner. Not done in constructor to allow for extension. - * - * @throws IOException When restarting the scan fails. - */ - public void init() throws IOException { - restart(scan.getStartRow()); - } - - /** - * Sets the HBase table. - * - * @param htable The {@link HTable} to scan. - */ - public void setHTable(HTable htable) { - this.htable = htable; - } - - /** - * Sets the scan defining the actual details like columns etc. - * - * @param scan The scan to set. - */ - public void setScan(Scan scan) { - this.scan = scan; - } - - /** - * Closes the split. - * - * @see org.apache.hadoop.mapreduce.RecordReader#close() - */ - @Override - public void close() { - this.scanner.close(); - } - - /** - * Returns the current key. - * - * @return The current key. - * @throws IOException - * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey() - */ - @Override - public ImmutableBytesWritable getCurrentKey() throws IOException, - InterruptedException { - return key; - } - - /** - * Returns the current value. - * - * @return The current value. - * @throws IOException When the value is faulty. - * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() - */ - @Override - public Result getCurrentValue() throws IOException, InterruptedException { - return value; - } - - /** - * Initializes the reader. - * - * @param inputsplit The split to work with. - * @param context The current task context. - * @throws IOException When setting up the reader fails. - * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.RecordReader#initialize( - * org.apache.hadoop.mapreduce.InputSplit, - * org.apache.hadoop.mapreduce.TaskAttemptContext) - */ - @Override - public void initialize(InputSplit inputsplit, - TaskAttemptContext context) throws IOException, - InterruptedException { - } - - /** - * Positions the record reader to the next record. - * - * @return true if there was another record. - * @throws IOException When reading the record failed. - * @throws InterruptedException When the job was aborted. - * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue() - */ - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (key == null) key = new ImmutableBytesWritable(); - if (value == null) value = new Result(); - try { - value = this.scanner.next(); - } catch (IOException e) { - LOG.debug("recovered from " + StringUtils.stringifyException(e)); - restart(lastRow); - scanner.next(); // skip presumed already mapped row - value = scanner.next(); - } - if (value != null && value.size() > 0) { - key.set(value.getRow()); - lastRow = key.get(); - return true; - } - return false; - } - - /** - * The current progress of the record reader through its data. - * - * @return A number between 0.0 and 1.0, the fraction of the data read. - * @see org.apache.hadoop.mapreduce.RecordReader#getProgress() - */ - @Override - public float getProgress() { - // Depends on the total number of tuples - return 0; - } - } - + /** * Builds a TableRecordReader. If no TableRecordReader was provided, uses * the default. diff --git a/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java b/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java new file mode 100644 index 00000000000..903b4a820bd --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java @@ -0,0 +1,155 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) + * pairs. + */ +public class TableRecordReader +extends RecordReader { + + private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl(); + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow The first row to start at. + * @throws IOException When restarting fails. + */ + public void restart(byte[] firstRow) throws IOException { + this.recordReaderImpl.restart(firstRow); + } + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException When restarting the scan fails. + */ + public void init() throws IOException { + this.recordReaderImpl.init(); + } + + /** + * Sets the HBase table. + * + * @param htable The {@link HTable} to scan. + */ + public void setHTable(HTable htable) { + this.recordReaderImpl.setHTable(htable); + } + + /** + * Sets the scan defining the actual details like columns etc. + * + * @param scan The scan to set. + */ + public void setScan(Scan scan) { + this.recordReaderImpl.setScan(scan); + } + + /** + * Closes the split. + * + * @see org.apache.hadoop.mapreduce.RecordReader#close() + */ + @Override + public void close() { + this.recordReaderImpl.close(); + } + + /** + * Returns the current key. + * + * @return The current key. + * @throws IOException + * @throws InterruptedException When the job is aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey() + */ + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, + InterruptedException { + return this.recordReaderImpl.getCurrentKey(); + } + + /** + * Returns the current value. + * + * @return The current value. + * @throws IOException When the value is faulty. + * @throws InterruptedException When the job is aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() + */ + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return this.recordReaderImpl.getCurrentValue(); + } + + /** + * Initializes the reader. + * + * @param inputsplit The split to work with. + * @param context The current task context. + * @throws IOException When setting up the reader fails. + * @throws InterruptedException When the job is aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#initialize( + * org.apache.hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public void initialize(InputSplit inputsplit, + TaskAttemptContext context) throws IOException, + InterruptedException { + } + + /** + * Positions the record reader to the next record. + * + * @return true if there was another record. + * @throws IOException When reading the record failed. + * @throws InterruptedException When the job was aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue() + */ + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return this.recordReaderImpl.nextKeyValue(); + } + + /** + * The current progress of the record reader through its data. + * + * @return A number between 0.0 and 1.0, the fraction of the data read. + * @see org.apache.hadoop.mapreduce.RecordReader#getProgress() + */ + @Override + public float getProgress() { + return this.recordReaderImpl.getProgress(); + } +} diff --git a/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java new file mode 100644 index 00000000000..6a2d8bb3fe8 --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -0,0 +1,157 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.util.StringUtils; + +/** + * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) + * pairs. + */ +public class TableRecordReaderImpl { + + + static final Log LOG = LogFactory.getLog(TableRecordReader.class); + + private ResultScanner scanner = null; + private Scan scan = null; + private HTable htable = null; + private byte[] lastRow = null; + private ImmutableBytesWritable key = null; + private Result value = null; + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow The first row to start at. + * @throws IOException When restarting fails. + */ + public void restart(byte[] firstRow) throws IOException { + Scan newScan = new Scan(scan); + newScan.setStartRow(firstRow); + this.scanner = this.htable.getScanner(newScan); + } + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException When restarting the scan fails. + */ + public void init() throws IOException { + restart(scan.getStartRow()); + } + + /** + * Sets the HBase table. + * + * @param htable The {@link HTable} to scan. + */ + public void setHTable(HTable htable) { + this.htable = htable; + } + + /** + * Sets the scan defining the actual details like columns etc. + * + * @param scan The scan to set. + */ + public void setScan(Scan scan) { + this.scan = scan; + } + + /** + * Closes the split. + * + * + */ + public void close() { + this.scanner.close(); + } + + /** + * Returns the current key. + * + * @return The current key. + * @throws IOException + * @throws InterruptedException When the job is aborted. + */ + public ImmutableBytesWritable getCurrentKey() throws IOException, + InterruptedException { + return key; + } + + /** + * Returns the current value. + * + * @return The current value. + * @throws IOException When the value is faulty. + * @throws InterruptedException When the job is aborted. + */ + public Result getCurrentValue() throws IOException, InterruptedException { + return value; + } + + + /** + * Positions the record reader to the next record. + * + * @return true if there was another record. + * @throws IOException When reading the record failed. + * @throws InterruptedException When the job was aborted. + */ + public boolean nextKeyValue() throws IOException, InterruptedException { + if (key == null) key = new ImmutableBytesWritable(); + if (value == null) value = new Result(); + try { + value = this.scanner.next(); + } catch (IOException e) { + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + restart(lastRow); + scanner.next(); // skip presumed already mapped row + value = scanner.next(); + } + if (value != null && value.size() > 0) { + key.set(value.getRow()); + lastRow = key.get(); + return true; + } + return false; + } + + /** + * The current progress of the record reader through its data. + * + * @return A number between 0.0 and 1.0, the fraction of the data read. + */ + public float getProgress() { + // Depends on the total number of tuples + return 0; + } + +}