HBASE-14280 Bulk Upload from HA cluster to remote HA hbase cluster fails (Ankit Singhal)

This commit is contained in:
tedyu 2015-09-22 09:21:05 -07:00
parent 73ec3fdd5c
commit 4b5dd8ee5a
1 changed files with 23 additions and 2 deletions

View File

@ -27,6 +27,7 @@ import java.net.URI;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Collection;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
@ -66,10 +67,19 @@ public class FSHDFSUtils extends FSUtils {
dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
}
if (getNNAddressesMethod == null) {
try {
// getNNServiceRpcAddressesForCluster is available only in version
// equal to or later than Hadoop 2.6
getNNAddressesMethod =
dfsUtilClazz.getMethod("getNNServiceRpcAddressesForCluster", Configuration.class);
} catch (NoSuchMethodException e) {
// If hadoop version is older than hadoop 2.6
getNNAddressesMethod =
dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
}
}
Map<String, Map<String, InetSocketAddress>> addressMap =
(Map<String, Map<String, InetSocketAddress>>) getNNAddressesMethod
.invoke(null, conf);
@ -115,6 +125,17 @@ public class FSHDFSUtils extends FSUtils {
if (srcServiceName.equals(desServiceName)) {
return true;
}
if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
Collection<String> internalNameServices =
conf.getTrimmedStringCollection("dfs.internal.nameservices");
if (!internalNameServices.isEmpty()) {
if (internalNameServices.contains(srcServiceName.split(":")[1])) {
return true;
} else {
return false;
}
}
}
if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
//If one serviceName is an HA format while the other is a non-HA format,
// maybe they refer to the same FileSystem.