HBASE-6694 Test scanner batching in export job feature from HBASE-6372 (Alexander Alten-Lorenz)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1428772 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
77e46a5141
commit
b1e92a4746
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mapreduce;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -27,19 +29,18 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.filter.RowFilter;
|
||||
import org.apache.hadoop.hbase.filter.RegexStringComparator;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.filter.RegexStringComparator;
|
||||
import org.apache.hadoop.hbase.filter.RowFilter;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Export an HBase table.
|
||||
|
@ -137,10 +138,10 @@ public class Export {
|
|||
|
||||
int batching = conf.getInt(EXPORT_BATCHING, -1);
|
||||
if (batching != -1){
|
||||
try{
|
||||
try {
|
||||
s.setBatch(batching);
|
||||
} catch (RuntimeException e) {
|
||||
LOG.error("Batching could not be set", e);
|
||||
} catch (IncompatibleFilterException e) {
|
||||
LOG.error("Batching could not be set", e);
|
||||
}
|
||||
}
|
||||
LOG.info("versions=" + versions + ", starttime=" + startTime +
|
||||
|
|
|
@ -59,7 +59,7 @@ public class TestImportExport {
|
|||
private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
|
||||
private static final byte[] QUAL = Bytes.toBytes("q");
|
||||
private static final String OUTPUT_DIR = "outputdir";
|
||||
private static final String EXPORT_BATCHING = "100";
|
||||
private static final String EXPORT_BATCH_SIZE = "100";
|
||||
|
||||
private static MiniHBaseCluster cluster;
|
||||
private static long now = System.currentTimeMillis();
|
||||
|
@ -126,8 +126,7 @@ public class TestImportExport {
|
|||
String[] args = new String[] {
|
||||
EXPORT_TABLE,
|
||||
OUTPUT_DIR,
|
||||
EXPORT_BATCHING,
|
||||
"1000"
|
||||
"1000", // max number of key versions per key to export
|
||||
};
|
||||
|
||||
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||
|
@ -196,6 +195,47 @@ public class TestImportExport {
|
|||
assertTrue(job.isSuccessful());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test export scanner batching
|
||||
*/
|
||||
@Test
|
||||
public void testExportScannerBatching() throws Exception {
|
||||
String BATCH_TABLE = "exportWithBatch";
|
||||
HTableDescriptor desc = new HTableDescriptor(BATCH_TABLE);
|
||||
desc.addFamily(new HColumnDescriptor(FAMILYA)
|
||||
.setMaxVersions(1)
|
||||
);
|
||||
UTIL.getHBaseAdmin().createTable(desc);
|
||||
HTable t = new HTable(UTIL.getConfiguration(), BATCH_TABLE);
|
||||
|
||||
Put p = new Put(ROW1);
|
||||
p.add(FAMILYA, QUAL, now, QUAL);
|
||||
p.add(FAMILYA, QUAL, now+1, QUAL);
|
||||
p.add(FAMILYA, QUAL, now+2, QUAL);
|
||||
p.add(FAMILYA, QUAL, now+3, QUAL);
|
||||
p.add(FAMILYA, QUAL, now+4, QUAL);
|
||||
t.put(p);
|
||||
|
||||
String[] args = new String[] {
|
||||
"-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg.
|
||||
BATCH_TABLE,
|
||||
OUTPUT_DIR
|
||||
};
|
||||
|
||||
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||
Configuration conf = opts.getConfiguration();
|
||||
args = opts.getRemainingArgs();
|
||||
assertEquals(conf.get(Export.EXPORT_BATCHING), EXPORT_BATCH_SIZE);
|
||||
|
||||
Job job = Export.createSubmittableJob(conf, args);
|
||||
job.getConfiguration().set("mapreduce.framework.name", "yarn");
|
||||
job.waitForCompletion(false);
|
||||
assertTrue(job.isSuccessful());
|
||||
|
||||
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||
fs.delete(new Path(OUTPUT_DIR), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithDeletes() throws Exception {
|
||||
String EXPORT_TABLE = "exportWithDeletes";
|
||||
|
@ -225,8 +265,7 @@ public class TestImportExport {
|
|||
"-D" + Export.RAW_SCAN + "=true",
|
||||
EXPORT_TABLE,
|
||||
OUTPUT_DIR,
|
||||
EXPORT_BATCHING,
|
||||
"1000"
|
||||
"1000", // max number of key versions per key to export
|
||||
};
|
||||
|
||||
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||
|
|
Loading…
Reference in New Issue