HADOOP-1231. Add generics to Mapper and Reducer interfaces. Contributed by Tom White.
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@566798 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9dedf26b0f
commit
e220809017
|
@ -48,7 +48,9 @@ import org.apache.log4j.Logger;
|
||||||
/**
|
/**
|
||||||
* Convert HBase tabular data into a format that is consumable by Map/Reduce
|
* Convert HBase tabular data into a format that is consumable by Map/Reduce
|
||||||
*/
|
*/
|
||||||
public class TableInputFormat implements InputFormat, JobConfigurable {
|
public class TableInputFormat
|
||||||
|
implements InputFormat<HStoreKey, KeyedDataArrayWritable>, JobConfigurable {
|
||||||
|
|
||||||
static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
|
static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -64,7 +66,7 @@ public class TableInputFormat implements InputFormat, JobConfigurable {
|
||||||
/**
|
/**
|
||||||
* Iterate over an HBase table data, return (HStoreKey, KeyedDataArrayWritable) pairs
|
* Iterate over an HBase table data, return (HStoreKey, KeyedDataArrayWritable) pairs
|
||||||
*/
|
*/
|
||||||
class TableRecordReader implements RecordReader {
|
class TableRecordReader implements RecordReader<HStoreKey, KeyedDataArrayWritable> {
|
||||||
private HScannerInterface m_scanner;
|
private HScannerInterface m_scanner;
|
||||||
private TreeMap<Text, byte[]> m_row; // current buffer
|
private TreeMap<Text, byte[]> m_row; // current buffer
|
||||||
private Text m_endRow;
|
private Text m_endRow;
|
||||||
|
@ -95,7 +97,7 @@ public class TableInputFormat implements InputFormat, JobConfigurable {
|
||||||
*
|
*
|
||||||
* @see org.apache.hadoop.mapred.RecordReader#createKey()
|
* @see org.apache.hadoop.mapred.RecordReader#createKey()
|
||||||
*/
|
*/
|
||||||
public WritableComparable createKey() {
|
public HStoreKey createKey() {
|
||||||
return new HStoreKey();
|
return new HStoreKey();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,7 +106,7 @@ public class TableInputFormat implements InputFormat, JobConfigurable {
|
||||||
*
|
*
|
||||||
* @see org.apache.hadoop.mapred.RecordReader#createValue()
|
* @see org.apache.hadoop.mapred.RecordReader#createValue()
|
||||||
*/
|
*/
|
||||||
public Writable createValue() {
|
public KeyedDataArrayWritable createValue() {
|
||||||
return new KeyedDataArrayWritable();
|
return new KeyedDataArrayWritable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,17 +132,17 @@ public class TableInputFormat implements InputFormat, JobConfigurable {
|
||||||
* @return true if there was more data
|
* @return true if there was more data
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public boolean next(Writable key, Writable value) throws IOException {
|
public boolean next(HStoreKey key, KeyedDataArrayWritable value) throws IOException {
|
||||||
LOG.debug("start next");
|
LOG.debug("start next");
|
||||||
m_row.clear();
|
m_row.clear();
|
||||||
HStoreKey tKey = (HStoreKey)key;
|
HStoreKey tKey = key;
|
||||||
boolean hasMore = m_scanner.next(tKey, m_row);
|
boolean hasMore = m_scanner.next(tKey, m_row);
|
||||||
|
|
||||||
if(hasMore) {
|
if(hasMore) {
|
||||||
if(m_endRow.getLength() > 0 && (tKey.getRow().compareTo(m_endRow) < 0)) {
|
if(m_endRow.getLength() > 0 && (tKey.getRow().compareTo(m_endRow) < 0)) {
|
||||||
hasMore = false;
|
hasMore = false;
|
||||||
} else {
|
} else {
|
||||||
KeyedDataArrayWritable rowVal = (KeyedDataArrayWritable) value;
|
KeyedDataArrayWritable rowVal = value;
|
||||||
ArrayList<KeyedData> columns = new ArrayList<KeyedData>();
|
ArrayList<KeyedData> columns = new ArrayList<KeyedData>();
|
||||||
|
|
||||||
for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
|
for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
|
||||||
|
@ -159,8 +161,8 @@ public class TableInputFormat implements InputFormat, JobConfigurable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
public RecordReader<HStoreKey, KeyedDataArrayWritable> getRecordReader(
|
||||||
public RecordReader getRecordReader(InputSplit split,
|
InputSplit split,
|
||||||
@SuppressWarnings("unused") JobConf job,
|
@SuppressWarnings("unused") JobConf job,
|
||||||
@SuppressWarnings("unused") Reporter reporter) throws IOException {
|
@SuppressWarnings("unused") Reporter reporter) throws IOException {
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,8 @@ import org.apache.log4j.Logger;
|
||||||
/**
|
/**
|
||||||
* Convert Map/Reduce output and write it to an HBase table
|
* Convert Map/Reduce output and write it to an HBase table
|
||||||
*/
|
*/
|
||||||
public class TableOutputFormat extends OutputFormatBase {
|
public class TableOutputFormat
|
||||||
|
extends OutputFormatBase<Text, KeyedDataArrayWritable> {
|
||||||
|
|
||||||
/** JobConf parameter that specifies the output table */
|
/** JobConf parameter that specifies the output table */
|
||||||
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
|
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
|
||||||
|
@ -56,7 +57,9 @@ public class TableOutputFormat extends OutputFormatBase {
|
||||||
* Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
|
* Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
|
||||||
* and write to an HBase table
|
* and write to an HBase table
|
||||||
*/
|
*/
|
||||||
protected class TableRecordWriter implements RecordWriter {
|
protected class TableRecordWriter
|
||||||
|
implements RecordWriter<Text, KeyedDataArrayWritable> {
|
||||||
|
|
||||||
private HTable m_table;
|
private HTable m_table;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -77,10 +80,10 @@ public class TableOutputFormat extends OutputFormatBase {
|
||||||
*
|
*
|
||||||
* @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
|
* @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
|
||||||
*/
|
*/
|
||||||
public void write(WritableComparable key, Writable value) throws IOException {
|
public void write(Text key, KeyedDataArrayWritable value) throws IOException {
|
||||||
LOG.debug("start write");
|
LOG.debug("start write");
|
||||||
Text tKey = (Text)key;
|
Text tKey = key;
|
||||||
KeyedDataArrayWritable tValue = (KeyedDataArrayWritable) value;
|
KeyedDataArrayWritable tValue = value;
|
||||||
KeyedData[] columns = tValue.get();
|
KeyedData[] columns = tValue.get();
|
||||||
|
|
||||||
// start transaction
|
// start transaction
|
||||||
|
|
Loading…
Reference in New Issue