HBASE-6330 TestImportExport has been failing against hadoop 0.23/2.0 profile
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1466256 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
976aefb5c0
commit
d09a134ad8
|
@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -78,7 +79,7 @@ public class Export {
|
|||
job.setOutputFormatClass(SequenceFileOutputFormat.class);
|
||||
job.setOutputKeyClass(ImmutableBytesWritable.class);
|
||||
job.setOutputValueClass(Result.class);
|
||||
FileOutputFormat.setOutputPath(job, outputDir);
|
||||
FileOutputFormat.setOutputPath(job, outputDir); // job conf doesn't contain the conf so doesn't have a default fs.
|
||||
return job;
|
||||
}
|
||||
|
||||
|
|
|
@ -1648,6 +1648,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
|
||||
forceChangeTaskLogDir();
|
||||
|
||||
// Tests were failing because this process used 6GB of virtual memory and was getting killed.
|
||||
// we up the VM usable so that processes don't get killed.
|
||||
conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
|
||||
|
||||
// Allow the user to override FS URI for this map-reduce cluster to use.
|
||||
mrCluster = new MiniMRCluster(servers,
|
||||
FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
|
||||
|
@ -1656,6 +1660,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
if (jobConf == null) {
|
||||
jobConf = mrCluster.createJobConf();
|
||||
}
|
||||
|
||||
jobConf.set("mapred.local.dir",
|
||||
conf.get("mapred.local.dir")); //Hadoop MiniMR overwrites this while it should not
|
||||
LOG.info("Mini mapreduce cluster started");
|
||||
|
@ -1664,14 +1669,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
conf.set("mapred.job.tracker", jobConf.get("mapred.job.tracker"));
|
||||
// this for mrv2 support; mr1 ignores this
|
||||
conf.set("mapreduce.framework.name", "yarn");
|
||||
String rmAdress = jobConf.get("yarn.resourcemanager.address");
|
||||
if (rmAdress != null) {
|
||||
conf.set("yarn.resourcemanager.address", rmAdress);
|
||||
String rmAddress = jobConf.get("yarn.resourcemanager.address");
|
||||
if (rmAddress != null) {
|
||||
conf.set("yarn.resourcemanager.address", rmAddress);
|
||||
}
|
||||
String schedulerAdress =
|
||||
String schedulerAddress =
|
||||
jobConf.get("yarn.resourcemanager.scheduler.address");
|
||||
if (schedulerAdress != null) {
|
||||
conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
|
||||
if (schedulerAddress != null) {
|
||||
conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
|
@ -51,9 +50,10 @@ import org.junit.Before;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Tests the table import and table export MR job functionality
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestImportExport {
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
@ -65,15 +65,16 @@ public class TestImportExport {
|
|||
private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
|
||||
private static final byte[] QUAL = Bytes.toBytes("q");
|
||||
private static final String OUTPUT_DIR = "outputdir";
|
||||
private static String FQ_OUTPUT_DIR;
|
||||
private static final String EXPORT_BATCH_SIZE = "100";
|
||||
|
||||
private static MiniHBaseCluster cluster;
|
||||
private static long now = System.currentTimeMillis();
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
cluster = UTIL.startMiniCluster();
|
||||
UTIL.startMiniCluster();
|
||||
UTIL.startMiniMapReduceCluster();
|
||||
FQ_OUTPUT_DIR = new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -90,24 +91,39 @@ public class TestImportExport {
|
|||
}
|
||||
|
||||
/**
|
||||
* When running on Hadoop 2, we need to copy (or add) configuration values for keys
|
||||
* that start with "yarn." (from the map reduce minicluster) to the
|
||||
* configuration that will be used during the test (from the HBase minicluster).
|
||||
* YARN configuration values are set properly in the map reduce minicluster,
|
||||
* but not necessarily in the HBase mini cluster.
|
||||
* @param srcConf the configuration to copy from (the map reduce minicluster version)
|
||||
* @param destConf the configuration to copy to (the HBase minicluster version)
|
||||
* Runs an export job with the specified command line args
|
||||
* @param args
|
||||
* @return true if job completed successfully
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws ClassNotFoundException
|
||||
*/
|
||||
private void copyConfigurationValues(Configuration srcConf, Configuration destConf) {
|
||||
Iterator<Map.Entry<String,String>> it = srcConf.iterator();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<String, String> entry = it.next();
|
||||
String key = entry.getKey();
|
||||
String value = entry.getValue();
|
||||
if (key.startsWith("yarn.") && !value.isEmpty()) {
|
||||
destConf.set(key, value);
|
||||
}
|
||||
}
|
||||
boolean runExport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
|
||||
// need to make a copy of the configuration because to make sure different temp dirs are used.
|
||||
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
|
||||
Configuration conf = opts.getConfiguration();
|
||||
args = opts.getRemainingArgs();
|
||||
Job job = Export.createSubmittableJob(conf, args);
|
||||
job.waitForCompletion(false);
|
||||
return job.isSuccessful();
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs an import job with the specified command line args
|
||||
* @param args
|
||||
* @return true if job completed successfully
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws ClassNotFoundException
|
||||
*/
|
||||
boolean runImport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
|
||||
// need to make a copy of the configuration because to make sure different temp dirs are used.
|
||||
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
|
||||
Configuration conf = opts.getConfiguration();
|
||||
args = opts.getRemainingArgs();
|
||||
Job job = Import.createSubmittableJob(conf, args);
|
||||
job.waitForCompletion(false);
|
||||
return job.isSuccessful();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -131,42 +147,19 @@ public class TestImportExport {
|
|||
|
||||
String[] args = new String[] {
|
||||
EXPORT_TABLE,
|
||||
OUTPUT_DIR,
|
||||
FQ_OUTPUT_DIR,
|
||||
"1000", // max number of key versions per key to export
|
||||
};
|
||||
|
||||
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||
Configuration conf = opts.getConfiguration();
|
||||
|
||||
// copy or add the necessary configuration values from the map reduce config to the hbase config
|
||||
copyConfigurationValues(UTIL.getConfiguration(), conf);
|
||||
args = opts.getRemainingArgs();
|
||||
|
||||
Job job = Export.createSubmittableJob(conf, args);
|
||||
job.getConfiguration().set("mapreduce.framework.name", "yarn");
|
||||
job.waitForCompletion(false);
|
||||
assertTrue(job.isSuccessful());
|
||||
|
||||
assertTrue(runExport(args));
|
||||
|
||||
String IMPORT_TABLE = "importTableSimpleCase";
|
||||
t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), FAMILYB);
|
||||
args = new String[] {
|
||||
"-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
|
||||
IMPORT_TABLE,
|
||||
OUTPUT_DIR
|
||||
FQ_OUTPUT_DIR
|
||||
};
|
||||
|
||||
opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||
conf = opts.getConfiguration();
|
||||
|
||||
// copy or add the necessary configuration values from the map reduce config to the hbase config
|
||||
copyConfigurationValues(UTIL.getConfiguration(), conf);
|
||||
args = opts.getRemainingArgs();
|
||||
|
||||
job = Import.createSubmittableJob(conf, args);
|
||||
job.getConfiguration().set("mapreduce.framework.name", "yarn");
|
||||
job.waitForCompletion(false);
|
||||
assertTrue(job.isSuccessful());
|
||||
assertTrue(runImport(args));
|
||||
|
||||
Get g = new Get(ROW1);
|
||||
g.setMaxVersions();
|
||||
|
@ -186,19 +179,8 @@ public class TestImportExport {
|
|||
@Test
|
||||
public void testMetaExport() throws Exception {
|
||||
String EXPORT_TABLE = ".META.";
|
||||
String[] args = new String[] { EXPORT_TABLE, OUTPUT_DIR, "1", "0", "0" };
|
||||
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(
|
||||
cluster.getConfiguration()), args);
|
||||
Configuration conf = opts.getConfiguration();
|
||||
|
||||
// copy or add the necessary configuration values from the map reduce config to the hbase config
|
||||
copyConfigurationValues(UTIL.getConfiguration(), conf);
|
||||
args = opts.getRemainingArgs();
|
||||
|
||||
Job job = Export.createSubmittableJob(conf, args);
|
||||
job.getConfiguration().set("mapreduce.framework.name", "yarn");
|
||||
job.waitForCompletion(false);
|
||||
assertTrue(job.isSuccessful());
|
||||
String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" };
|
||||
assertTrue(runExport(args));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -225,21 +207,12 @@ public class TestImportExport {
|
|||
String[] args = new String[] {
|
||||
"-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg.
|
||||
BATCH_TABLE,
|
||||
OUTPUT_DIR
|
||||
FQ_OUTPUT_DIR
|
||||
};
|
||||
|
||||
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||
Configuration conf = opts.getConfiguration();
|
||||
args = opts.getRemainingArgs();
|
||||
assertEquals(conf.get(Export.EXPORT_BATCHING), EXPORT_BATCH_SIZE);
|
||||
assertTrue(runExport(args));
|
||||
|
||||
Job job = Export.createSubmittableJob(conf, args);
|
||||
job.getConfiguration().set("mapreduce.framework.name", "yarn");
|
||||
job.waitForCompletion(false);
|
||||
assertTrue(job.isSuccessful());
|
||||
|
||||
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||
fs.delete(new Path(OUTPUT_DIR), true);
|
||||
fs.delete(new Path(FQ_OUTPUT_DIR), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -266,26 +239,14 @@ public class TestImportExport {
|
|||
d = new Delete(ROW1);
|
||||
d.deleteColumns(FAMILYA, QUAL, now+2);
|
||||
t.delete(d);
|
||||
|
||||
|
||||
String[] args = new String[] {
|
||||
"-D" + Export.RAW_SCAN + "=true",
|
||||
EXPORT_TABLE,
|
||||
OUTPUT_DIR,
|
||||
FQ_OUTPUT_DIR,
|
||||
"1000", // max number of key versions per key to export
|
||||
};
|
||||
|
||||
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||
Configuration conf = opts.getConfiguration();
|
||||
|
||||
// copy or add the necessary configuration values from the map reduce config to the hbase config
|
||||
copyConfigurationValues(UTIL.getConfiguration(), conf);
|
||||
args = opts.getRemainingArgs();
|
||||
|
||||
Job job = Export.createSubmittableJob(conf, args);
|
||||
job.getConfiguration().set("mapreduce.framework.name", "yarn");
|
||||
job.waitForCompletion(false);
|
||||
assertTrue(job.isSuccessful());
|
||||
|
||||
assertTrue(runExport(args));
|
||||
|
||||
String IMPORT_TABLE = "importWithDeletes";
|
||||
desc = new HTableDescriptor(IMPORT_TABLE);
|
||||
|
@ -298,20 +259,9 @@ public class TestImportExport {
|
|||
t = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
|
||||
args = new String[] {
|
||||
IMPORT_TABLE,
|
||||
OUTPUT_DIR
|
||||
FQ_OUTPUT_DIR
|
||||
};
|
||||
|
||||
opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||
conf = opts.getConfiguration();
|
||||
|
||||
// copy or add the necessary configuration values from the map reduce config to the hbase config
|
||||
copyConfigurationValues(UTIL.getConfiguration(), conf);
|
||||
args = opts.getRemainingArgs();
|
||||
|
||||
job = Import.createSubmittableJob(conf, args);
|
||||
job.getConfiguration().set("mapreduce.framework.name", "yarn");
|
||||
job.waitForCompletion(false);
|
||||
assertTrue(job.isSuccessful());
|
||||
assertTrue(runImport(args));
|
||||
|
||||
Scan s = new Scan();
|
||||
s.setMaxVersions();
|
||||
|
@ -329,8 +279,13 @@ public class TestImportExport {
|
|||
t.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a simple table, run an Export Job on it, Import with filtering on, verify counts,
|
||||
* attempt with invalid values.
|
||||
*/
|
||||
@Test
|
||||
public void testWithFilter() throws Exception {
|
||||
// Create simple table to export
|
||||
String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
|
||||
HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE);
|
||||
desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
|
||||
|
@ -345,18 +300,11 @@ public class TestImportExport {
|
|||
p.add(FAMILYA, QUAL, now + 4, QUAL);
|
||||
exportTable.put(p);
|
||||
|
||||
String[] args = new String[] { EXPORT_TABLE, OUTPUT_DIR, "1000" };
|
||||
|
||||
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(
|
||||
cluster.getConfiguration()), args);
|
||||
Configuration conf = opts.getConfiguration();
|
||||
args = opts.getRemainingArgs();
|
||||
|
||||
Job job = Export.createSubmittableJob(conf, args);
|
||||
job.getConfiguration().set("mapreduce.framework.name", "yarn");
|
||||
job.waitForCompletion(false);
|
||||
assertTrue(job.isSuccessful());
|
||||
// Export the simple table
|
||||
String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
|
||||
assertTrue(runExport(args));
|
||||
|
||||
// Import to a new table
|
||||
String IMPORT_TABLE = "importWithFilter";
|
||||
desc = new HTableDescriptor(IMPORT_TABLE);
|
||||
desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
|
||||
|
@ -364,17 +312,9 @@ public class TestImportExport {
|
|||
|
||||
HTable importTable = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
|
||||
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
|
||||
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, OUTPUT_DIR,
|
||||
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
|
||||
"1000" };
|
||||
|
||||
opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||
conf = opts.getConfiguration();
|
||||
args = opts.getRemainingArgs();
|
||||
|
||||
job = Import.createSubmittableJob(conf, args);
|
||||
job.getConfiguration().set("mapreduce.framework.name", "yarn");
|
||||
job.waitForCompletion(false);
|
||||
assertTrue(job.isSuccessful());
|
||||
assertTrue(runImport(args));
|
||||
|
||||
// get the count of the source table for that time range
|
||||
PrefixFilter filter = new PrefixFilter(ROW1);
|
||||
|
@ -388,16 +328,8 @@ public class TestImportExport {
|
|||
|
||||
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
|
||||
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE,
|
||||
OUTPUT_DIR, "1000" };
|
||||
|
||||
opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||
conf = opts.getConfiguration();
|
||||
args = opts.getRemainingArgs();
|
||||
|
||||
job = Import.createSubmittableJob(conf, args);
|
||||
job.getConfiguration().set("mapreduce.framework.name", "yarn");
|
||||
job.waitForCompletion(false);
|
||||
assertFalse("Job succeeedd, but it had a non-instantiable filter!", job.isSuccessful());
|
||||
FQ_OUTPUT_DIR, "1000" };
|
||||
assertFalse(runImport(args));
|
||||
|
||||
// cleanup
|
||||
exportTable.close();
|
||||
|
|
Loading…
Reference in New Issue