diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java index a9688702f86..ecedf9580dc 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -83,12 +84,28 @@ public class SyncTable extends Configured implements Tool { super(conf); } + private void initCredentialsForHBase(String zookeeper, Job job) throws IOException { + Configuration peerConf = HBaseConfiguration.createClusterConf(job + .getConfiguration(), zookeeper); + if(peerConf.get("hbase.security.authentication").equals("kerberos")){ + TableMapReduceUtil.initCredentialsForCluster(job, peerConf); + } + } + public Job createSubmittableJob(String[] args) throws IOException { FileSystem fs = sourceHashDir.getFileSystem(getConf()); if (!fs.exists(sourceHashDir)) { throw new IOException("Source hash dir not found: " + sourceHashDir); } + Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name", + "syncTable_" + sourceTableName + "-" + targetTableName)); + Configuration jobConf = job.getConfiguration(); + if (jobConf.get("hadoop.security.authentication").equals("kerberos")) { + TokenCache.obtainTokensForNamenodes(job.getCredentials(), new + Path[] { sourceHashDir }, getConf()); + } + HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir); LOG.info("Read source hash manifest: " + tableHash); LOG.info("Read " + tableHash.partitions.size() + " partition keys"); @@ -118,18 +135,17 @@ public class SyncTable extends Configured implements Tool { + " found in the partitions file is " + tableHash.partitions.size()); } - Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name", - "syncTable_" + sourceTableName + "-" + targetTableName)); - Configuration jobConf = job.getConfiguration(); job.setJarByClass(HashTable.class); jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString()); jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName); jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName); if (sourceZkCluster != null) { jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster); + initCredentialsForHBase(sourceZkCluster, job); } if (targetZkCluster != null) { jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); + initCredentialsForHBase(targetZkCluster, job); } jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);