HBASE-15038 ExportSnapshot should support separate configs for source and destination

This commit is contained in:
Gary Helmling 2015-12-23 18:49:58 -08:00
parent a82f7fc94a
commit 9589a7d8be

View File

@ -89,6 +89,10 @@ import org.apache.hadoop.util.ToolRunner;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ExportSnapshot extends Configured implements Tool { public class ExportSnapshot extends Configured implements Tool {
public static final String NAME = "exportsnapshot"; public static final String NAME = "exportsnapshot";
/** Configuration prefix for overrides for the source filesystem */
public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
/** Configuration prefix for overrides for the destination filesystem */
public static final String CONF_DEST_PREFIX = NAME + ".to.";
private static final Log LOG = LogFactory.getLog(ExportSnapshot.class); private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
@ -141,6 +145,9 @@ public class ExportSnapshot extends Configured implements Tool {
@Override @Override
public void setup(Context context) throws IOException { public void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration(); Configuration conf = context.getConfiguration();
Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
filesGroup = conf.get(CONF_FILES_GROUP); filesGroup = conf.get(CONF_FILES_GROUP);
@ -155,15 +162,15 @@ public class ExportSnapshot extends Configured implements Tool {
testFailures = conf.getBoolean(CONF_TEST_FAILURE, false); testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
try { try {
conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
inputFs = FileSystem.get(inputRoot.toUri(), conf); inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
} catch (IOException e) { } catch (IOException e) {
throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
} }
try { try {
conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
outputFs = FileSystem.get(outputRoot.toUri(), conf); outputFs = FileSystem.get(outputRoot.toUri(), destConf);
} catch (IOException e) { } catch (IOException e) {
throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e); throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
} }
@ -789,8 +796,12 @@ public class ExportSnapshot extends Configured implements Tool {
job.setNumReduceTasks(0); job.setNumReduceTasks(0);
// Acquire the delegation Tokens // Acquire the delegation Tokens
Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
TokenCache.obtainTokensForNamenodes(job.getCredentials(), TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { inputRoot, outputRoot }, conf); new Path[] { inputRoot }, srcConf);
Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outputRoot }, destConf);
// Run the MR Job // Run the MR Job
if (!job.waitForCompletion(true)) { if (!job.waitForCompletion(true)) {
@ -913,11 +924,13 @@ public class ExportSnapshot extends Configured implements Tool {
targetName = snapshotName; targetName = snapshotName;
} }
conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf); srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot); LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf); destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString()); LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false); boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
@ -1004,7 +1017,7 @@ public class ExportSnapshot extends Configured implements Tool {
// Step 4 - Verify snapshot integrity // Step 4 - Verify snapshot integrity
if (verifyTarget) { if (verifyTarget) {
LOG.info("Verify snapshot integrity"); LOG.info("Verify snapshot integrity");
verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir); verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
} }
LOG.info("Export Completed: " + targetName); LOG.info("Export Completed: " + targetName);