diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java index 29e71e79516..07e9fcce457 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.mapreduce; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.math.BigDecimal; import java.util.Arrays; @@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.snapshot.ExportSnapshot; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Counters; @@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpOptions; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException.NoNodeException; /** @@ -154,30 +155,19 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { this.backupManager = backupManager; } + @Override public Job execute() throws Exception { // reflection preparation for private methods and fields Class classDistCp = org.apache.hadoop.tools.DistCp.class; - Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath"); - Method methodCreateJob = classDistCp.getDeclaredMethod("createJob"); - Method methodCreateInputFileListing = - classDistCp.getDeclaredMethod("createInputFileListing", Job.class); Method methodCleanup = classDistCp.getDeclaredMethod("cleanup"); - Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions"); - Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder"); - Field fieldJobFS = classDistCp.getDeclaredField("jobFS"); + Field fieldInputOptions = getInputOptionsField(classDistCp); Field fieldSubmitted = classDistCp.getDeclaredField("submitted"); - methodCreateMetaFolderPath.setAccessible(true); - methodCreateJob.setAccessible(true); - methodCreateInputFileListing.setAccessible(true); methodCleanup.setAccessible(true); - fieldInputOptions.setAccessible(true); - fieldMetaFolder.setAccessible(true); - fieldJobFS.setAccessible(true); fieldSubmitted.setAccessible(true); // execute() logic starts here @@ -185,16 +175,8 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { Job job = null; try { - synchronized (this) { - // Don't cleanup while we are setting up. - fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this)); - fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(super.getConf())); - job = (Job) methodCreateJob.invoke(this); - } - methodCreateInputFileListing.invoke(this, job); - // Get the total length of the source files - List srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths(); + List srcs = getSourcePaths(fieldInputOptions); long totalSrcLgth = 0; for (Path aSrc : srcs) { @@ -202,14 +184,9 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc); } - // submit the copy job - job.submit(); - fieldSubmitted.set(this, true); - - // after submit the MR job, set its handler in backup handler for cancel process - // this.backupHandler.copyJob = job; - - // Update the copy progress to ZK every 0.5s if progress value changed + // Async call + job = super.execute(); + // Update the copy progress to system table every 0.5s if progress value changed int progressReportFreq = MapReduceBackupCopyJob.this.getConf().getInt("hbase.backup.progressreport.frequency", 500); @@ -251,10 +228,6 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { } catch (Throwable t) { LOG.error("distcp " + job == null ? "" : job.getJobID() + " encountered error", t); throw t; - } finally { - if (!fieldSubmitted.getBoolean(this)) { - methodCleanup.invoke(this); - } } String jobID = job.getJobID().toString(); @@ -271,6 +244,43 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { return job; } + private Field getInputOptionsField(Class classDistCp) throws IOException{ + Field f = null; + try { + f = classDistCp.getDeclaredField("inputOptions"); + } catch(Exception e) { + // Haddop 3 + try { + f = classDistCp.getDeclaredField("context"); + } catch (NoSuchFieldException | SecurityException e1) { + throw new IOException(e1); + } + } + return f; + } + + @SuppressWarnings("unchecked") + private List getSourcePaths(Field fieldInputOptions) throws IOException{ + Object options; + try { + options = fieldInputOptions.get(this); + if (options instanceof DistCpOptions) { + return ((DistCpOptions) options).getSourcePaths(); + } else { + // Hadoop 3 + Class classContext = Class.forName("org.apache.hadoop.tools.DistCpContext"); + Method methodGetSourcePaths = classContext.getDeclaredMethod("getSourcePaths"); + methodGetSourcePaths.setAccessible(true); + + return (List) methodGetSourcePaths.invoke(options); + } + } catch (IllegalArgumentException | IllegalAccessException | + ClassNotFoundException | NoSuchMethodException | + SecurityException | InvocationTargetException e) { + throw new IOException(e); + } + + } } /** @@ -306,11 +316,14 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { // We need to create the target dir before run distcp. LOG.debug("DistCp options: " + Arrays.toString(options)); Path dest = new Path(options[options.length - 1]); + String[] newOptions = new String[options.length + 1]; + System.arraycopy(options, 0, newOptions, 1, options.length); + newOptions[0] = "-async"; // run DisCp in async mode FileSystem destfs = dest.getFileSystem(conf); if (!destfs.exists(dest)) { destfs.mkdirs(dest); } - res = distcp.run(options); + res = distcp.run(newOptions); } return res;