HADOOP-12159. Move DistCpUtils#compareFs() to org.apache.hadoop.fs.FileUtil and fix for HA namespaces (rchiang via rkanter)

This commit is contained in:
Robert Kanter 2015-06-30 16:42:59 -07:00
parent ad60807238
commit aaafa0b2ee
5 changed files with 155 additions and 44 deletions

View File

@ -910,6 +910,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-10798. globStatus() should always return a sorted list of files HADOOP-10798. globStatus() should always return a sorted list of files
(cmccabe) (cmccabe)
HADOOP-12159. Move DistCpUtils#compareFs() to org.apache.hadoop.fs.FileUtil
and fix for HA namespaces (rchiang via rkanter)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.*; import java.io.*;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Enumeration; import java.util.Enumeration;
@ -1329,4 +1332,43 @@ public class FileUtil {
unexpandedWildcardClasspath.toString()}; unexpandedWildcardClasspath.toString()};
return jarCp; return jarCp;
} }
public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
if (srcFs==null || destFs==null) {
return false;
}
URI srcUri = srcFs.getUri();
URI dstUri = destFs.getUri();
if (srcUri.getScheme()==null) {
return false;
}
if (!srcUri.getScheme().equals(dstUri.getScheme())) {
return false;
}
String srcHost = srcUri.getHost();
String dstHost = dstUri.getHost();
if ((srcHost!=null) && (dstHost!=null)) {
if (srcHost.equals(dstHost)) {
return srcUri.getPort()==dstUri.getPort();
}
try {
srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
} catch (UnknownHostException ue) {
if (LOG.isDebugEnabled()) {
LOG.debug("Could not compare file-systems. Unknown host: ", ue);
}
return false;
}
if (!srcHost.equals(dstHost)) {
return false;
}
} else if (srcHost==null && dstHost!=null) {
return false;
} else if (srcHost!=null) {
return false;
}
// check for ports
return srcUri.getPort()==dstUri.getPort();
}
} }

View File

@ -17,7 +17,8 @@
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import org.junit.Before; import org.junit.*;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -25,8 +26,11 @@ import java.io.FileOutputStream;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -44,10 +48,12 @@ import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.tools.tar.TarEntry; import org.apache.tools.tar.TarEntry;
import org.apache.tools.tar.TarOutputStream; import org.apache.tools.tar.TarOutputStream;
import org.junit.After;
import org.junit.Assert; import javax.print.attribute.URISyntax;
import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestFileUtil { public class TestFileUtil {
private static final Log LOG = LogFactory.getLog(TestFileUtil.class); private static final Log LOG = LogFactory.getLog(TestFileUtil.class);
@ -64,6 +70,25 @@ public class TestFileUtil {
private final File dir2 = new File(del, DIR + "2"); private final File dir2 = new File(del, DIR + "2");
private final File partitioned = new File(TEST_DIR, "partitioned"); private final File partitioned = new File(TEST_DIR, "partitioned");
private InetAddress inet1;
private InetAddress inet2;
private InetAddress inet3;
private InetAddress inet4;
private InetAddress inet5;
private InetAddress inet6;
private URI uri1;
private URI uri2;
private URI uri3;
private URI uri4;
private URI uri5;
private URI uri6;
private FileSystem fs1;
private FileSystem fs2;
private FileSystem fs3;
private FileSystem fs4;
private FileSystem fs5;
private FileSystem fs6;
/** /**
* Creates multiple directories for testing. * Creates multiple directories for testing.
* *
@ -80,6 +105,7 @@ public class TestFileUtil {
* file: part-r-00000, contents: "foo" * file: part-r-00000, contents: "foo"
* file: part-r-00001, contents: "bar" * file: part-r-00001, contents: "bar"
*/ */
@Ignore
private void setupDirs() throws IOException { private void setupDirs() throws IOException {
Assert.assertFalse(del.exists()); Assert.assertFalse(del.exists());
Assert.assertFalse(tmp.exists()); Assert.assertFalse(tmp.exists());
@ -1096,4 +1122,82 @@ public class TestFileUtil {
} }
} }
} }
@Ignore
public void setupCompareFs() {
// Set up Strings
String host1 = "1.2.3.4";
String host2 = "2.3.4.5";
int port1 = 7000;
int port2 = 7001;
String uris1 = "hdfs://" + host1 + ":" + Integer.toString(port1) + "/tmp/foo";
String uris2 = "hdfs://" + host1 + ":" + Integer.toString(port2) + "/tmp/foo";
String uris3 = "hdfs://" + host2 + ":" + Integer.toString(port2) + "/tmp/foo";
String uris4 = "hdfs://" + host2 + ":" + Integer.toString(port2) + "/tmp/foo";
String uris5 = "file:///" + host1 + ":" + Integer.toString(port1) + "/tmp/foo";
String uris6 = "hdfs:///" + host1 + "/tmp/foo";
// Set up URI objects
try {
uri1 = new URI(uris1);
uri2 = new URI(uris2);
uri3 = new URI(uris3);
uri4 = new URI(uris4);
uri5 = new URI(uris5);
uri6 = new URI(uris6);
} catch (URISyntaxException use) {
}
// Set up InetAddress
inet1 = mock(InetAddress.class);
when(inet1.getCanonicalHostName()).thenReturn(host1);
inet2 = mock(InetAddress.class);
when(inet2.getCanonicalHostName()).thenReturn(host1);
inet3 = mock(InetAddress.class);
when(inet3.getCanonicalHostName()).thenReturn(host2);
inet4 = mock(InetAddress.class);
when(inet4.getCanonicalHostName()).thenReturn(host2);
inet5 = mock(InetAddress.class);
when(inet5.getCanonicalHostName()).thenReturn(host1);
inet6 = mock(InetAddress.class);
when(inet6.getCanonicalHostName()).thenReturn(host1);
// Link of InetAddress to corresponding URI
try {
when(InetAddress.getByName(uris1)).thenReturn(inet1);
when(InetAddress.getByName(uris2)).thenReturn(inet2);
when(InetAddress.getByName(uris3)).thenReturn(inet3);
when(InetAddress.getByName(uris4)).thenReturn(inet4);
when(InetAddress.getByName(uris5)).thenReturn(inet5);
} catch (UnknownHostException ue) {
}
fs1 = mock(FileSystem.class);
when(fs1.getUri()).thenReturn(uri1);
fs2 = mock(FileSystem.class);
when(fs2.getUri()).thenReturn(uri2);
fs3 = mock(FileSystem.class);
when(fs3.getUri()).thenReturn(uri3);
fs4 = mock(FileSystem.class);
when(fs4.getUri()).thenReturn(uri4);
fs5 = mock(FileSystem.class);
when(fs5.getUri()).thenReturn(uri5);
fs6 = mock(FileSystem.class);
when(fs6.getUri()).thenReturn(uri6);
}
@Test
public void testCompareFsNull() throws Exception {
setupCompareFs();
assertEquals(FileUtil.compareFs(null,fs1),false);
assertEquals(FileUtil.compareFs(fs1,null),false);
}
@Test
public void testCompareFsDirectories() throws Exception {
setupCompareFs();
assertEquals(FileUtil.compareFs(fs1,fs1),true);
assertEquals(FileUtil.compareFs(fs1,fs2),false);
assertEquals(FileUtil.compareFs(fs1,fs5),false);
assertEquals(FileUtil.compareFs(fs3,fs4),true);
assertEquals(FileUtil.compareFs(fs1,fs6),false);
}
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Cluster;
@ -347,7 +348,7 @@ public class DistCp extends Configured implements Tool {
workDir = new Path(workDir, WIP_PREFIX + targetPath.getName() workDir = new Path(workDir, WIP_PREFIX + targetPath.getName()
+ rand.nextInt()); + rand.nextInt());
FileSystem workFS = workDir.getFileSystem(configuration); FileSystem workFS = workDir.getFileSystem(configuration);
if (!DistCpUtils.compareFs(targetFS, workFS)) { if (!FileUtil.compareFs(targetFS, workFS)) {
throw new IllegalArgumentException("Work path " + workDir + throw new IllegalArgumentException("Work path " + workDir +
" and target path " + targetPath + " are in different file system"); " and target path " + targetPath + " are in different file system");
} }

View File

@ -468,43 +468,4 @@ public class DistCpUtils {
return (sourceChecksum == null || targetChecksum == null || return (sourceChecksum == null || targetChecksum == null ||
sourceChecksum.equals(targetChecksum)); sourceChecksum.equals(targetChecksum));
} }
/* see if two file systems are the same or not
*
*/
public static boolean compareFs(FileSystem srcFs, FileSystem destFs) {
URI srcUri = srcFs.getUri();
URI dstUri = destFs.getUri();
if (srcUri.getScheme() == null) {
return false;
}
if (!srcUri.getScheme().equals(dstUri.getScheme())) {
return false;
}
String srcHost = srcUri.getHost();
String dstHost = dstUri.getHost();
if ((srcHost != null) && (dstHost != null)) {
try {
srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
} catch(UnknownHostException ue) {
if (LOG.isDebugEnabled())
LOG.debug("Could not compare file-systems. Unknown host: ", ue);
return false;
}
if (!srcHost.equals(dstHost)) {
return false;
}
}
else if (srcHost == null && dstHost != null) {
return false;
}
else if (srcHost != null) {
return false;
}
//check for ports
return srcUri.getPort() == dstUri.getPort();
}
} }