HBASE-1558 deletes use 'HConstants.LATEST_TIMESTAMP' but no one translates that into 'now' -- undo mistaken commit
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@787342 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
88de4001bb
commit
3599514ff6
|
@ -43,9 +43,6 @@ public class TableInputFormat extends TableInputFormatBase implements
|
|||
* space delimited list of columns
|
||||
*/
|
||||
public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
|
||||
|
||||
public static final String TIME_RANGE_MAX = "hbase.mapred.timerange.max";
|
||||
public static final String TIME_RANGE_MIN = "hbase.mapred.timerange.min";
|
||||
|
||||
public void configure(JobConf job) {
|
||||
Path[] tableNames = FileInputFormat.getInputPaths(job);
|
||||
|
@ -56,11 +53,6 @@ public class TableInputFormat extends TableInputFormatBase implements
|
|||
m_cols[i] = Bytes.toBytes(colNames[i]);
|
||||
}
|
||||
setInputColumns(m_cols);
|
||||
|
||||
String minArg = job.get(TIME_RANGE_MIN);
|
||||
String maxArg = job.get(TIME_RANGE_MAX);
|
||||
setTimeRange(Long.parseLong(minArg), Long.parseLong(maxArg));
|
||||
|
||||
try {
|
||||
setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
|
||||
} catch (Exception e) {
|
||||
|
@ -86,15 +78,5 @@ public class TableInputFormat extends TableInputFormatBase implements
|
|||
if (colArg == null || colArg.length() == 0) {
|
||||
throw new IOException("expecting at least one column");
|
||||
}
|
||||
|
||||
// maxStamp must be higher then minStamp
|
||||
String minArg = job.get(TIME_RANGE_MIN);
|
||||
String maxArg = job.get(TIME_RANGE_MAX);
|
||||
if (minArg == null || minArg.length() == 0
|
||||
|| maxArg == null || maxArg.length() == 0
|
||||
|| Long.parseLong(maxArg) <= Long.parseLong(minArg)) {
|
||||
throw new IOException("invalid time stamp values");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.filter.StopRowFilter;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
|
||||
|
@ -77,7 +77,6 @@ public abstract class TableInputFormatBase
|
|||
implements InputFormat<ImmutableBytesWritable, Result> {
|
||||
final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
|
||||
private byte [][] inputColumns;
|
||||
private long minStamp, maxStamp;
|
||||
private HTable table;
|
||||
private TableRecordReader tableRecordReader;
|
||||
private RowFilterInterface rowFilter;
|
||||
|
@ -94,8 +93,7 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
private ResultScanner scanner;
|
||||
private HTable htable;
|
||||
private byte [][] trrInputColumns;
|
||||
private long minStamp, maxStamp;
|
||||
|
||||
|
||||
/**
|
||||
* Restart from survivable exceptions by creating a new scanner.
|
||||
*
|
||||
|
@ -103,33 +101,28 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void restart(byte[] firstRow) throws IOException {
|
||||
Scan scan = null;
|
||||
if ((endRow != null) && (endRow.length > 0)) {
|
||||
if (trrRowFilter != null) {
|
||||
final Set<RowFilterInterface> rowFiltersSet =
|
||||
new HashSet<RowFilterInterface>();
|
||||
rowFiltersSet.add(new WhileMatchRowFilter(new StopRowFilter(endRow)));
|
||||
rowFiltersSet.add(trrRowFilter);
|
||||
scan = new Scan(startRow);
|
||||
Scan scan = new Scan(startRow);
|
||||
scan.addColumns(trrInputColumns);
|
||||
// scan.setFilter(new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
|
||||
// rowFiltersSet));
|
||||
this.scanner = this.htable.getScanner(scan);
|
||||
} else {
|
||||
scan = new Scan(firstRow, endRow);
|
||||
|
||||
Scan scan = new Scan(firstRow, endRow);
|
||||
scan.addColumns(trrInputColumns);
|
||||
this.scanner = this.htable.getScanner(scan);
|
||||
}
|
||||
} else {
|
||||
scan = new Scan(firstRow);
|
||||
Scan scan = new Scan(firstRow);
|
||||
scan.addColumns(trrInputColumns);
|
||||
// scan.setFilter(trrRowFilter);
|
||||
this.scanner = this.htable.getScanner(scan);
|
||||
}
|
||||
scan.addColumns(trrInputColumns);
|
||||
|
||||
if (minStamp !=0 && maxStamp !=0) { // if timestamps are ungiven.
|
||||
scan.setTimeRange(minStamp, maxStamp);
|
||||
scan.setMaxVersions();
|
||||
}
|
||||
|
||||
this.scanner = this.htable.getScanner(scan);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -154,11 +147,6 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
public void setInputColumns(final byte [][] inputColumns) {
|
||||
this.trrInputColumns = inputColumns;
|
||||
}
|
||||
|
||||
public void setTimeRange(long minStamp, long maxStamp) {
|
||||
this.minStamp = minStamp;
|
||||
this.maxStamp = maxStamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param startRow the first row in the split
|
||||
|
@ -263,7 +251,6 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
trr.setEndRow(tSplit.getEndRow());
|
||||
trr.setHTable(this.table);
|
||||
trr.setInputColumns(this.inputColumns);
|
||||
trr.setTimeRange(this.minStamp, this.maxStamp);
|
||||
trr.setRowFilter(this.rowFilter);
|
||||
trr.init();
|
||||
return trr;
|
||||
|
@ -323,16 +310,6 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
this.inputColumns = inputColumns;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param minStamp
|
||||
* @param maxStam
|
||||
*/
|
||||
protected void setTimeRange(long minStamp, long maxStamp) {
|
||||
this.minStamp = minStamp;
|
||||
this.maxStamp = maxStamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows subclasses to get the {@link HTable}.
|
||||
*/
|
||||
|
@ -367,5 +344,4 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
protected void setRowFilter(RowFilterInterface rowFilter) {
|
||||
this.rowFilter = rowFilter;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -51,39 +51,13 @@ public class TableMapReduceUtil {
|
|||
Class<? extends TableMap> mapper,
|
||||
Class<? extends WritableComparable> outputKeyClass,
|
||||
Class<? extends Writable> outputValueClass, JobConf job) {
|
||||
|
||||
initTableMapJob(table, columns, mapper, outputKeyClass,
|
||||
outputValueClass, job, 0, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this before submitting a TableMap job. It will
|
||||
* appropriately set up the JocConf.
|
||||
*
|
||||
* @param table The table name to read from.
|
||||
* @param columns The columns to scan.
|
||||
* @param mapper The mapper class to use.
|
||||
* @param outputKeyClass The class of the output key.
|
||||
* @param outputValueClass The class of the output value.
|
||||
* @param job The current job configuration to adjust.
|
||||
* @param minStamp the minimum timestamp, inclusive
|
||||
* @param maxStamp the maximum timestamp, exclusive
|
||||
*/
|
||||
public static void initTableMapJob(String table, String columns,
|
||||
Class<? extends TableMap> mapper,
|
||||
Class<? extends WritableComparable> outputKeyClass,
|
||||
Class<? extends Writable> outputValueClass, JobConf job,
|
||||
long minStamp,
|
||||
long maxStamp) {
|
||||
|
||||
|
||||
job.setInputFormat(TableInputFormat.class);
|
||||
job.setMapOutputValueClass(outputValueClass);
|
||||
job.setMapOutputKeyClass(outputKeyClass);
|
||||
job.setMapperClass(mapper);
|
||||
FileInputFormat.addInputPaths(job, table);
|
||||
job.set(TableInputFormat.COLUMN_LIST, columns);
|
||||
job.setLong(TableInputFormat.TIME_RANGE_MIN, minStamp);
|
||||
job.setLong(TableInputFormat.TIME_RANGE_MAX, maxStamp);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue