HBASE-18975 Fix backup / restore hadoop3 incompatibility (Vladimir Rodionov)
This commit is contained in:
parent
b212bf936e
commit
19336cadce
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.mapreduce;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Arrays;
|
||||
|
@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.backup.BackupInfo;
|
|||
import org.apache.hadoop.hbase.backup.BackupType;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupManager;
|
||||
import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
|
||||
import org.apache.hadoop.mapreduce.Cluster;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
|
@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.JobID;
|
|||
import org.apache.hadoop.tools.DistCp;
|
||||
import org.apache.hadoop.tools.DistCpConstants;
|
||||
import org.apache.hadoop.tools.DistCpOptions;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
|
||||
/**
|
||||
|
@ -154,30 +155,19 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
|
|||
this.backupManager = backupManager;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Job execute() throws Exception {
|
||||
|
||||
// reflection preparation for private methods and fields
|
||||
Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
|
||||
Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath");
|
||||
Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
|
||||
Method methodCreateInputFileListing =
|
||||
classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
|
||||
Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
|
||||
|
||||
Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
|
||||
Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
|
||||
Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
|
||||
Field fieldInputOptions = getInputOptionsField(classDistCp);
|
||||
Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
|
||||
|
||||
methodCreateMetaFolderPath.setAccessible(true);
|
||||
methodCreateJob.setAccessible(true);
|
||||
methodCreateInputFileListing.setAccessible(true);
|
||||
methodCleanup.setAccessible(true);
|
||||
|
||||
fieldInputOptions.setAccessible(true);
|
||||
fieldMetaFolder.setAccessible(true);
|
||||
fieldJobFS.setAccessible(true);
|
||||
fieldSubmitted.setAccessible(true);
|
||||
|
||||
// execute() logic starts here
|
||||
|
@ -185,16 +175,8 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
|
|||
|
||||
Job job = null;
|
||||
try {
|
||||
synchronized (this) {
|
||||
// Don't cleanup while we are setting up.
|
||||
fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
|
||||
fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(super.getConf()));
|
||||
job = (Job) methodCreateJob.invoke(this);
|
||||
}
|
||||
methodCreateInputFileListing.invoke(this, job);
|
||||
|
||||
// Get the total length of the source files
|
||||
List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths();
|
||||
List<Path> srcs = getSourcePaths(fieldInputOptions);
|
||||
|
||||
long totalSrcLgth = 0;
|
||||
for (Path aSrc : srcs) {
|
||||
|
@ -202,14 +184,9 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
|
|||
BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc);
|
||||
}
|
||||
|
||||
// submit the copy job
|
||||
job.submit();
|
||||
fieldSubmitted.set(this, true);
|
||||
|
||||
// after submit the MR job, set its handler in backup handler for cancel process
|
||||
// this.backupHandler.copyJob = job;
|
||||
|
||||
// Update the copy progress to ZK every 0.5s if progress value changed
|
||||
// Async call
|
||||
job = super.execute();
|
||||
// Update the copy progress to system table every 0.5s if progress value changed
|
||||
int progressReportFreq =
|
||||
MapReduceBackupCopyJob.this.getConf().getInt("hbase.backup.progressreport.frequency",
|
||||
500);
|
||||
|
@ -251,10 +228,6 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
|
|||
} catch (Throwable t) {
|
||||
LOG.error("distcp " + job == null ? "" : job.getJobID() + " encountered error", t);
|
||||
throw t;
|
||||
} finally {
|
||||
if (!fieldSubmitted.getBoolean(this)) {
|
||||
methodCleanup.invoke(this);
|
||||
}
|
||||
}
|
||||
|
||||
String jobID = job.getJobID().toString();
|
||||
|
@ -271,6 +244,43 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
|
|||
return job;
|
||||
}
|
||||
|
||||
private Field getInputOptionsField(Class<?> classDistCp) throws IOException{
|
||||
Field f = null;
|
||||
try {
|
||||
f = classDistCp.getDeclaredField("inputOptions");
|
||||
} catch(Exception e) {
|
||||
// Haddop 3
|
||||
try {
|
||||
f = classDistCp.getDeclaredField("context");
|
||||
} catch (NoSuchFieldException | SecurityException e1) {
|
||||
throw new IOException(e1);
|
||||
}
|
||||
}
|
||||
return f;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private List<Path> getSourcePaths(Field fieldInputOptions) throws IOException{
|
||||
Object options;
|
||||
try {
|
||||
options = fieldInputOptions.get(this);
|
||||
if (options instanceof DistCpOptions) {
|
||||
return ((DistCpOptions) options).getSourcePaths();
|
||||
} else {
|
||||
// Hadoop 3
|
||||
Class<?> classContext = Class.forName("org.apache.hadoop.tools.DistCpContext");
|
||||
Method methodGetSourcePaths = classContext.getDeclaredMethod("getSourcePaths");
|
||||
methodGetSourcePaths.setAccessible(true);
|
||||
|
||||
return (List<Path>) methodGetSourcePaths.invoke(options);
|
||||
}
|
||||
} catch (IllegalArgumentException | IllegalAccessException |
|
||||
ClassNotFoundException | NoSuchMethodException |
|
||||
SecurityException | InvocationTargetException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -306,11 +316,14 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
|
|||
// We need to create the target dir before run distcp.
|
||||
LOG.debug("DistCp options: " + Arrays.toString(options));
|
||||
Path dest = new Path(options[options.length - 1]);
|
||||
String[] newOptions = new String[options.length + 1];
|
||||
System.arraycopy(options, 0, newOptions, 1, options.length);
|
||||
newOptions[0] = "-async"; // run DisCp in async mode
|
||||
FileSystem destfs = dest.getFileSystem(conf);
|
||||
if (!destfs.exists(dest)) {
|
||||
destfs.mkdirs(dest);
|
||||
}
|
||||
res = distcp.run(options);
|
||||
res = distcp.run(newOptions);
|
||||
}
|
||||
return res;
|
||||
|
||||
|
|
Loading…
Reference in New Issue