MAPREDUCE-7059. Downward Compatibility issue: MR job fails because of unknown setErasureCodingPolicy method from 3.x client to HDFS 2.x cluster. Contributed by Jiandan Yang.

This commit is contained in:
Weiwei Yang 2018-03-01 10:18:53 +08:00
parent 17f387e3c2
commit 6e6945cd78
1 changed files with 22 additions and 7 deletions

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcNoSuchMethodException;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -171,7 +173,7 @@ class JobResourceUploader {
if (!conf.getBoolean(MRJobConfig.MR_AM_STAGING_DIR_ERASURECODING_ENABLED, if (!conf.getBoolean(MRJobConfig.MR_AM_STAGING_DIR_ERASURECODING_ENABLED,
MRJobConfig.DEFAULT_MR_AM_STAGING_ERASURECODING_ENABLED)) { MRJobConfig.DEFAULT_MR_AM_STAGING_ERASURECODING_ENABLED)) {
disableErasureCodingForPath(jtFs, submitJobDir); disableErasureCodingForPath(submitJobDir);
} }
// Get the resources that have been added via command line arguments in the // Get the resources that have been added via command line arguments in the
@ -874,13 +876,26 @@ class JobResourceUploader {
return finalPath; return finalPath;
} }
private void disableErasureCodingForPath(FileSystem fs, Path path) private void disableErasureCodingForPath(Path path)
throws IOException { throws IOException {
try {
if (jtFs instanceof DistributedFileSystem) { if (jtFs instanceof DistributedFileSystem) {
LOG.info("Disabling Erasure Coding for path: " + path); LOG.info("Disabling Erasure Coding for path: " + path);
DistributedFileSystem dfs = (DistributedFileSystem) jtFs; DistributedFileSystem dfs = (DistributedFileSystem) jtFs;
dfs.setErasureCodingPolicy(path, dfs.setErasureCodingPolicy(path,
SystemErasureCodingPolicies.getReplicationPolicy().getName()); SystemErasureCodingPolicies.getReplicationPolicy().getName());
} }
} catch (RemoteException e) {
if (!RpcNoSuchMethodException.class.getName().equals(e.getClassName())) {
throw e;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Ignore disabling erasure coding for path {} because method "
+ "disableErasureCodingForPath doesn't exist, probably "
+ "talking to a lower version HDFS.", path.toString(), e);
}
}
}
} }
} }