HBASE-11104 IntegrationTestImportTsv#testRunFromOutputCommitter misses credential initialization (Vandana)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1596277 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b6646596c6
commit
71c1d02f02
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
|
@ -60,6 +61,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
|||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
@ -257,7 +259,8 @@ public class IntegrationTestImportTsv implements Configurable, Tool {
|
|||
// look a lot like TestImportTsv#testMROnTable.
|
||||
final String table = format("%s-%s-child", NAME, context.getJobID());
|
||||
final String cf = "FAM";
|
||||
|
||||
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
|
||||
conf.set(ImportTsv.CREDENTIALS_LOCATION, fileLocation);
|
||||
String[] args = {
|
||||
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
|
||||
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
|
||||
|
@ -271,7 +274,9 @@ public class IntegrationTestImportTsv implements Configurable, Tool {
|
|||
} catch (Exception e) {
|
||||
throw new IOException("Underlying MapReduce job failed. Aborting commit.", e);
|
||||
} finally {
|
||||
util.deleteTable(table);
|
||||
if (util.getHBaseAdmin().tableExists(table)) {
|
||||
util.deleteTable(table);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -362,6 +367,10 @@ public class IntegrationTestImportTsv implements Configurable, Tool {
|
|||
JobLaunchingOutputFormat.setOutputPath(job, outputPath);
|
||||
TableMapReduceUtil.addDependencyJars(job);
|
||||
addTestDependencyJars(job.getConfiguration());
|
||||
TableMapReduceUtil.initCredentials(job);
|
||||
JobClient jc = new JobClient(job.getConfiguration());
|
||||
job.getCredentials().addToken(new Text("my_mr_token"),
|
||||
jc.getDelegationToken(new Text("renewer")));
|
||||
|
||||
// Job launched by the OutputCommitter will fail if dependency jars are
|
||||
// not shipped properly.
|
||||
|
|
|
@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.mapreduce;
|
|||
|
||||
import static java.lang.String.format;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -49,6 +51,7 @@ import org.apache.hadoop.mapreduce.Job;
|
|||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
@ -83,6 +86,9 @@ public class ImportTsv extends Configured implements Tool {
|
|||
public final static String COLUMNS_CONF_KEY = "importtsv.columns";
|
||||
public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
|
||||
public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator";
|
||||
//This config is used to propagate credentials from parent MR jobs which launch
|
||||
//ImportTSV jobs. SEE IntegrationTestImportTsv.
|
||||
public final static String CREDENTIALS_LOCATION = "credentials_location";
|
||||
final static String DEFAULT_SEPARATOR = "\t";
|
||||
final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>";
|
||||
final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ",";
|
||||
|
@ -394,7 +400,6 @@ public class ImportTsv extends Configured implements Tool {
|
|||
throws IOException, ClassNotFoundException {
|
||||
|
||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||
|
||||
// Support non-XML supported characters
|
||||
// by re-encoding the passed separator as a Base64 string.
|
||||
String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
|
||||
|
@ -416,9 +421,14 @@ public class ImportTsv extends Configured implements Tool {
|
|||
FileInputFormat.setInputPaths(job, inputDir);
|
||||
job.setInputFormatClass(TextInputFormat.class);
|
||||
job.setMapperClass(mapperClass);
|
||||
|
||||
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
|
||||
String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
|
||||
if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
|
||||
String fileLoc = conf.get(CREDENTIALS_LOCATION);
|
||||
Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
|
||||
job.getCredentials().addAll(cred);
|
||||
}
|
||||
|
||||
if (hfileOutPath != null) {
|
||||
if (!admin.tableExists(tableName)) {
|
||||
LOG.warn(format("Table '%s' does not exist.", tableName));
|
||||
|
|
Loading…
Reference in New Issue