HBASE-18975 Fix backup / restore hadoop3 incompatibility (Vladimir Rodionov)

This commit is contained in:
tedyu 2017-10-11 12:26:34 -07:00
parent b4ed130083
commit c4ced0b3d5
1 changed files with 49 additions and 36 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.mapreduce;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.util.Arrays;
@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Counters;
@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException.NoNodeException;
/**
@ -154,30 +155,19 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
this.backupManager = backupManager;
}
@Override
public Job execute() throws Exception {
// reflection preparation for private methods and fields
Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath");
Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
Method methodCreateInputFileListing =
classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
Field fieldInputOptions = getInputOptionsField(classDistCp);
Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
methodCreateMetaFolderPath.setAccessible(true);
methodCreateJob.setAccessible(true);
methodCreateInputFileListing.setAccessible(true);
methodCleanup.setAccessible(true);
fieldInputOptions.setAccessible(true);
fieldMetaFolder.setAccessible(true);
fieldJobFS.setAccessible(true);
fieldSubmitted.setAccessible(true);
// execute() logic starts here
@ -185,16 +175,8 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
Job job = null;
try {
synchronized (this) {
// Don't cleanup while we are setting up.
fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(super.getConf()));
job = (Job) methodCreateJob.invoke(this);
}
methodCreateInputFileListing.invoke(this, job);
// Get the total length of the source files
List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths();
List<Path> srcs = getSourcePaths(fieldInputOptions);
long totalSrcLgth = 0;
for (Path aSrc : srcs) {
@ -202,14 +184,9 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc);
}
// submit the copy job
job.submit();
fieldSubmitted.set(this, true);
// after submit the MR job, set its handler in backup handler for cancel process
// this.backupHandler.copyJob = job;
// Update the copy progress to ZK every 0.5s if progress value changed
// Async call
job = super.execute();
// Update the copy progress to system table every 0.5s if progress value changed
int progressReportFreq =
MapReduceBackupCopyJob.this.getConf().getInt("hbase.backup.progressreport.frequency",
500);
@ -251,10 +228,6 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
} catch (Throwable t) {
LOG.error("distcp " + job == null ? "" : job.getJobID() + " encountered error", t);
throw t;
} finally {
if (!fieldSubmitted.getBoolean(this)) {
methodCleanup.invoke(this);
}
}
String jobID = job.getJobID().toString();
@ -271,6 +244,43 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
return job;
}
private Field getInputOptionsField(Class<?> classDistCp) throws IOException{
Field f = null;
try {
f = classDistCp.getDeclaredField("inputOptions");
} catch(Exception e) {
// Haddop 3
try {
f = classDistCp.getDeclaredField("context");
} catch (NoSuchFieldException | SecurityException e1) {
throw new IOException(e1);
}
}
return f;
}
@SuppressWarnings("unchecked")
private List<Path> getSourcePaths(Field fieldInputOptions) throws IOException{
Object options;
try {
options = fieldInputOptions.get(this);
if (options instanceof DistCpOptions) {
return ((DistCpOptions) options).getSourcePaths();
} else {
// Hadoop 3
Class<?> classContext = Class.forName("org.apache.hadoop.tools.DistCpContext");
Method methodGetSourcePaths = classContext.getDeclaredMethod("getSourcePaths");
methodGetSourcePaths.setAccessible(true);
return (List<Path>) methodGetSourcePaths.invoke(options);
}
} catch (IllegalArgumentException | IllegalAccessException |
ClassNotFoundException | NoSuchMethodException |
SecurityException | InvocationTargetException e) {
throw new IOException(e);
}
}
}
/**
@ -306,11 +316,14 @@ public class MapReduceBackupCopyJob implements BackupCopyJob {
// We need to create the target dir before run distcp.
LOG.debug("DistCp options: " + Arrays.toString(options));
Path dest = new Path(options[options.length - 1]);
String[] newOptions = new String[options.length + 1];
System.arraycopy(options, 0, newOptions, 1, options.length);
newOptions[0] = "-async"; // run DisCp in async mode
FileSystem destfs = dest.getFileSystem(conf);
if (!destfs.exists(dest)) {
destfs.mkdirs(dest);
}
res = distcp.run(options);
res = distcp.run(newOptions);
}
return res;