HBASE-8304 Bulkload fails to remove files if fs.default.name / fs.defaultFS is configured without default port (Haosdent)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1575590 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2014-03-08 18:48:55 +00:00
parent 6563749a4c
commit ae1abc8b1b
3 changed files with 134 additions and 2 deletions

View File

@ -49,8 +49,8 @@ import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
/** /**
* View to an on-disk Region. * View to an on-disk Region.
@ -403,7 +403,7 @@ public class HRegionFileSystem {
// We can't compare FileSystem instances as equals() includes UGI instance // We can't compare FileSystem instances as equals() includes UGI instance
// as part of the comparison and won't work when doing SecureBulkLoad // as part of the comparison and won't work when doing SecureBulkLoad
// TODO deal with viewFS // TODO deal with viewFS
if (!srcFs.getUri().equals(desFs.getUri())) { if (!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) {
LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " + LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
"the destination store. Copying file over to destination filesystem."); "the destination store. Copying file over to destination filesystem.");
Path tmpPath = createTempName(); Path tmpPath = createTempName();

View File

@ -22,7 +22,13 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -41,6 +47,82 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class FSHDFSUtils extends FSUtils { public class FSHDFSUtils extends FSUtils {
private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class); private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class);
private static Class dfsUtilClazz;
private static Method getNNAddressesMethod;
/**
* @param fs
* @param conf
* @return A set containing all namenode addresses of fs
*/
private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
Configuration conf) {
Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>();
String serviceName = fs.getCanonicalServiceName();
if (serviceName.startsWith("ha-hdfs")) {
try {
if (dfsUtilClazz == null) {
dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
}
if (getNNAddressesMethod == null) {
getNNAddressesMethod =
dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
}
Map<String, Map<String, InetSocketAddress>> addressMap =
(Map<String, Map<String, InetSocketAddress>>) getNNAddressesMethod
.invoke(null, conf);
for (Map.Entry<String, Map<String, InetSocketAddress>> entry : addressMap.entrySet()) {
Map<String, InetSocketAddress> nnMap = entry.getValue();
for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
InetSocketAddress addr = e2.getValue();
addresses.add(addr);
}
}
} catch (Exception e) {
LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
}
} else {
URI uri = fs.getUri();
InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort());
addresses.add(addr);
}
return addresses;
}
/**
* @param conf the Configuration of HBase
* @param srcFs
* @param desFs
* @return Whether srcFs and desFs are on same hdfs or not
*/
public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
// By getCanonicalServiceName, we could make sure both srcFs and desFs
// show a unified format which contains scheme, host and port.
String srcServiceName = srcFs.getCanonicalServiceName();
String desServiceName = desFs.getCanonicalServiceName();
if (srcServiceName == null || desServiceName == null) {
return false;
}
if (srcServiceName.equals(desServiceName)) {
return true;
}
if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
//If one serviceName is an HA format while the other is a non-HA format,
// maybe they refer to the same FileSystem.
//For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
return true;
}
}
return false;
}
/** /**
* Recover the lease from HDFS, retrying multiple times. * Recover the lease from HDFS, retrying multiple times.

View File

@ -21,8 +21,12 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -36,6 +40,7 @@ import org.mockito.Mockito;
*/ */
@Category(MediumTests.class) @Category(MediumTests.class)
public class TestFSHDFSUtils { public class TestFSHDFSUtils {
private static final Log LOG = LogFactory.getLog(TestFSHDFSUtils.class);
private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
static { static {
Configuration conf = HTU.getConfiguration(); Configuration conf = HTU.getConfiguration();
@ -94,6 +99,51 @@ public class TestFSHDFSUtils {
Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE); Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
} }
@Test
public void testIsSameHdfs() throws IOException {
try {
Class dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
} catch (Exception e) {
LOG.info("Skip testIsSameHdfs test case because of the no-HA hadoop version.");
return;
}
Configuration conf = HBaseConfiguration.create();
Path srcPath = new Path("hdfs://localhost:8020/");
Path desPath = new Path("hdfs://127.0.0.1/");
FileSystem srcFs = srcPath.getFileSystem(conf);
FileSystem desFs = desPath.getFileSystem(conf);
assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
desPath = new Path("hdfs://127.0.0.1:8070/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
desPath = new Path("hdfs://127.0.1.1:8020/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
conf.set("dfs.nameservices", "haosong-hadoop");
conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:8020");
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
desPath = new Path("/");
desFs = desPath.getFileSystem(conf);
assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:8020");
conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
desPath = new Path("/");
desFs = desPath.getFileSystem(conf);
assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
}
/** /**
* Version of DFS that has HDFS-4525 in it. * Version of DFS that has HDFS-4525 in it.
*/ */