HBASE-1970 Export does one version only; make it configurable how many it does
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@883845 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7486a7efe2
commit
97eb0f0b6f
|
@ -198,6 +198,8 @@ Release 0.21.0 - Unreleased
|
||||||
(Jeremiah Jacquet via Stack)
|
(Jeremiah Jacquet via Stack)
|
||||||
HBASE-1961 HBase EC2 scripts
|
HBASE-1961 HBase EC2 scripts
|
||||||
HBASE-1971 Unit test the full WAL replay cycle
|
HBASE-1971 Unit test the full WAL replay cycle
|
||||||
|
HBASE-1970 Export does one version only; make it configurable how many
|
||||||
|
it does
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-1901 "General" partitioner for "hbase-48" bulk (behind the api, write
|
HBASE-1901 "General" partitioner for "hbase-48" bulk (behind the api, write
|
||||||
|
|
|
@ -76,7 +76,6 @@ import org.apache.hadoop.io.WritableFactories;
|
||||||
*/
|
*/
|
||||||
public class Scan implements Writable {
|
public class Scan implements Writable {
|
||||||
private static final byte SCAN_VERSION = (byte)1;
|
private static final byte SCAN_VERSION = (byte)1;
|
||||||
|
|
||||||
private byte [] startRow = HConstants.EMPTY_START_ROW;
|
private byte [] startRow = HConstants.EMPTY_START_ROW;
|
||||||
private byte [] stopRow = HConstants.EMPTY_END_ROW;
|
private byte [] stopRow = HConstants.EMPTY_END_ROW;
|
||||||
private int maxVersions = 1;
|
private int maxVersions = 1;
|
||||||
|
@ -180,10 +179,14 @@ public class Scan implements Writable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get versions of columns only within the specified timestamp range,
|
* Get versions of columns only within the specified timestamp range,
|
||||||
* [minStamp, maxStamp).
|
* [minStamp, maxStamp). Note, default maximum versions to return is 1. If
|
||||||
|
* your time range spans more than one version and you want all versions
|
||||||
|
* returned, up the number of versions beyond the defaut.
|
||||||
* @param minStamp minimum timestamp value, inclusive
|
* @param minStamp minimum timestamp value, inclusive
|
||||||
* @param maxStamp maximum timestamp value, exclusive
|
* @param maxStamp maximum timestamp value, exclusive
|
||||||
* @throws IOException if invalid time range
|
* @throws IOException if invalid time range
|
||||||
|
* @see {@link #setMaxVersions()}
|
||||||
|
* @see {@link #setMaxVersions(int)}
|
||||||
*/
|
*/
|
||||||
public Scan setTimeRange(long minStamp, long maxStamp)
|
public Scan setTimeRange(long minStamp, long maxStamp)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -192,8 +195,13 @@ public class Scan implements Writable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get versions of columns with the specified timestamp.
|
* Get versions of columns with the specified timestamp. Note, default maximum
|
||||||
* @param timestamp version timestamp
|
* versions to return is 1. If your time range spans more than one version
|
||||||
|
* and you want all versions returned, up the number of versions beyond the
|
||||||
|
* defaut.
|
||||||
|
* @param timestamp version timestamp
|
||||||
|
* @see {@link #setMaxVersions()}
|
||||||
|
* @see {@link #setMaxVersions(int)}
|
||||||
*/
|
*/
|
||||||
public Scan setTimeStamp(long timestamp) {
|
public Scan setTimeStamp(long timestamp) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
|
import org.mortbay.log.Log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Export an HBase table.
|
* Export an HBase table.
|
||||||
|
@ -80,8 +81,17 @@ public class Export {
|
||||||
Job job = new Job(conf, NAME + "_" + tableName);
|
Job job = new Job(conf, NAME + "_" + tableName);
|
||||||
job.setJarByClass(Exporter.class);
|
job.setJarByClass(Exporter.class);
|
||||||
// TODO: Allow passing filter and subset of rows/columns.
|
// TODO: Allow passing filter and subset of rows/columns.
|
||||||
TableMapReduceUtil.initTableMapperJob(tableName, new Scan(),
|
Scan s = new Scan();
|
||||||
Exporter.class, null, null, job);
|
// Optional arguments.
|
||||||
|
int versions = args.length > 2? Integer.parseInt(args[2]): 1;
|
||||||
|
s.setMaxVersions(versions);
|
||||||
|
long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
|
||||||
|
long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
|
||||||
|
s.setTimeRange(startTime, endTime);
|
||||||
|
Log.info("verisons=" + versions + ", starttime=" + startTime +
|
||||||
|
", endtime=" + endTime);
|
||||||
|
TableMapReduceUtil.initTableMapperJob(tableName, s, Exporter.class, null,
|
||||||
|
null, job);
|
||||||
// No reducers. Just write straight to output files.
|
// No reducers. Just write straight to output files.
|
||||||
job.setNumReduceTasks(0);
|
job.setNumReduceTasks(0);
|
||||||
job.setOutputFormatClass(SequenceFileOutputFormat.class);
|
job.setOutputFormatClass(SequenceFileOutputFormat.class);
|
||||||
|
@ -98,7 +108,8 @@ public class Export {
|
||||||
if (errorMsg != null && errorMsg.length() > 0) {
|
if (errorMsg != null && errorMsg.length() > 0) {
|
||||||
System.err.println("ERROR: " + errorMsg);
|
System.err.println("ERROR: " + errorMsg);
|
||||||
}
|
}
|
||||||
System.err.println("Usage: Export <tablename> <outputdir>");
|
System.err.println("Usage: Export <tablename> <outputdir> [<versions> " +
|
||||||
|
"[<starttime> [<endtime>]]]");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -115,6 +126,6 @@ public class Export {
|
||||||
System.exit(-1);
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
Job job = createSubmittableJob(conf, otherArgs);
|
Job job = createSubmittableJob(conf, otherArgs);
|
||||||
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
System.exit(job.waitForCompletion(true)? 0 : 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,7 +102,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
||||||
public void restart(byte[] firstRow) throws IOException {
|
public void restart(byte[] firstRow) throws IOException {
|
||||||
Scan newScan = new Scan(scan);
|
Scan newScan = new Scan(scan);
|
||||||
newScan.setStartRow(firstRow);
|
newScan.setStartRow(firstRow);
|
||||||
this.scanner = this.htable.getScanner(newScan);
|
this.scanner = this.htable.getScanner(newScan);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1079,7 +1079,6 @@ public class HLog implements HConstants, Syncable {
|
||||||
SequenceFile.Reader in = null;
|
SequenceFile.Reader in = null;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
try {
|
try {
|
||||||
long len = fs.getFileStatus(logfiles[i].getPath()).getLen();
|
|
||||||
in = HLog.getReader(fs, logfiles[i].getPath(), conf);
|
in = HLog.getReader(fs, logfiles[i].getPath(), conf);
|
||||||
try {
|
try {
|
||||||
HLogKey key = newKey(conf);
|
HLogKey key = newKey(conf);
|
||||||
|
|
|
@ -34,8 +34,8 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
public class TestStoreScanner extends TestCase {
|
public class TestStoreScanner extends TestCase {
|
||||||
|
private final String CF_STR = "cf";
|
||||||
final byte [] CF = Bytes.toBytes("cf");
|
final byte [] CF = Bytes.toBytes(CF_STR);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test utility for building a NavigableSet for scanners.
|
* Test utility for building a NavigableSet for scanners.
|
||||||
|
@ -50,6 +50,60 @@ public class TestStoreScanner extends TestCase {
|
||||||
}
|
}
|
||||||
return cols;
|
return cols;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testScanTimeRange() throws IOException {
|
||||||
|
String r1 = "R1";
|
||||||
|
// returns only 1 of these 2 even though same timestamp
|
||||||
|
KeyValue [] kvs = new KeyValue[] {
|
||||||
|
KeyValueTestUtil.create(r1, CF_STR, "a", 1, KeyValue.Type.Put, "dont-care"),
|
||||||
|
KeyValueTestUtil.create(r1, CF_STR, "a", 2, KeyValue.Type.Put, "dont-care"),
|
||||||
|
KeyValueTestUtil.create(r1, CF_STR, "a", 3, KeyValue.Type.Put, "dont-care"),
|
||||||
|
KeyValueTestUtil.create(r1, CF_STR, "a", 4, KeyValue.Type.Put, "dont-care"),
|
||||||
|
KeyValueTestUtil.create(r1, CF_STR, "a", 5, KeyValue.Type.Put, "dont-care"),
|
||||||
|
};
|
||||||
|
KeyValueScanner [] scanners = new KeyValueScanner[] {
|
||||||
|
new KeyValueScanFixture(KeyValue.COMPARATOR, kvs)
|
||||||
|
};
|
||||||
|
Scan scanSpec = new Scan(Bytes.toBytes(r1));
|
||||||
|
scanSpec.setTimeRange(0, 6);
|
||||||
|
scanSpec.setMaxVersions();
|
||||||
|
StoreScanner scan =
|
||||||
|
new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
|
||||||
|
KeyValue.COMPARATOR, getCols("a"), scanners);
|
||||||
|
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||||
|
assertEquals(true, scan.next(results));
|
||||||
|
assertEquals(5, results.size());
|
||||||
|
assertEquals(kvs[kvs.length - 1], results.get(0));
|
||||||
|
// Scan limited TimeRange
|
||||||
|
scanSpec = new Scan(Bytes.toBytes(r1));
|
||||||
|
scanSpec.setTimeRange(1, 3);
|
||||||
|
scanSpec.setMaxVersions();
|
||||||
|
scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
|
||||||
|
KeyValue.COMPARATOR, getCols("a"), scanners);
|
||||||
|
results = new ArrayList<KeyValue>();
|
||||||
|
assertEquals(true, scan.next(results));
|
||||||
|
assertEquals(2, results.size());
|
||||||
|
// Another range.
|
||||||
|
scanSpec = new Scan(Bytes.toBytes(r1));
|
||||||
|
scanSpec.setTimeRange(5, 10);
|
||||||
|
scanSpec.setMaxVersions();
|
||||||
|
scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
|
||||||
|
KeyValue.COMPARATOR, getCols("a"), scanners);
|
||||||
|
results = new ArrayList<KeyValue>();
|
||||||
|
assertEquals(true, scan.next(results));
|
||||||
|
assertEquals(1, results.size());
|
||||||
|
// See how TimeRange and Versions interact.
|
||||||
|
// Another range.
|
||||||
|
scanSpec = new Scan(Bytes.toBytes(r1));
|
||||||
|
scanSpec.setTimeRange(0, 10);
|
||||||
|
scanSpec.setMaxVersions(3);
|
||||||
|
scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
|
||||||
|
KeyValue.COMPARATOR, getCols("a"), scanners);
|
||||||
|
results = new ArrayList<KeyValue>();
|
||||||
|
assertEquals(true, scan.next(results));
|
||||||
|
assertEquals(3, results.size());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public void testScanSameTimestamp() throws IOException {
|
public void testScanSameTimestamp() throws IOException {
|
||||||
// returns only 1 of these 2 even though same timestamp
|
// returns only 1 of these 2 even though same timestamp
|
||||||
|
@ -58,8 +112,7 @@ public class TestStoreScanner extends TestCase {
|
||||||
KeyValueTestUtil.create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"),
|
KeyValueTestUtil.create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"),
|
||||||
};
|
};
|
||||||
KeyValueScanner [] scanners = new KeyValueScanner[] {
|
KeyValueScanner [] scanners = new KeyValueScanner[] {
|
||||||
new KeyValueScanFixture(KeyValue.COMPARATOR,
|
new KeyValueScanFixture(KeyValue.COMPARATOR, kvs)
|
||||||
kvs)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
|
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
|
||||||
|
|
Loading…
Reference in New Issue