merge -r 1325009:1325010 from trunk. FIXES: MAPREDUCE-4107
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1325011 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
462f9a5249
commit
d31abfe3f8
|
@ -134,6 +134,9 @@ Release 2.0.0 - UNRELEASED
|
|||
MAPREDUCE-4108. Fix tests in org.apache.hadoop.util.TestRunJar
|
||||
(Devaraj K via tgraves)
|
||||
|
||||
MAPREDUCE-4107. Fix tests in org.apache.hadoop.ipc.TestSocketFactory
|
||||
(Devaraj K via tgraves)
|
||||
|
||||
Release 0.23.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -22,42 +22,96 @@ import java.net.InetSocketAddress;
|
|||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.JobStatus;
|
||||
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||
import org.apache.hadoop.net.StandardSocketFactory;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This class checks that RPCs can use specialized socket factories.
|
||||
*/
|
||||
@Ignore
|
||||
public class TestSocketFactory extends TestCase {
|
||||
public class TestSocketFactory {
|
||||
|
||||
/**
|
||||
* Check that we can reach a NameNode or a JobTracker using a specific
|
||||
* Check that we can reach a NameNode or Resource Manager using a specific
|
||||
* socket factory
|
||||
*/
|
||||
@Test
|
||||
public void testSocketFactory() throws IOException {
|
||||
// Create a standard mini-cluster
|
||||
Configuration sconf = new Configuration();
|
||||
MiniDFSCluster cluster = new MiniDFSCluster(sconf, 1, true, null);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(sconf).numDataNodes(1)
|
||||
.build();
|
||||
final int nameNodePort = cluster.getNameNodePort();
|
||||
|
||||
// Get a reference to its DFS directly
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
assertTrue(fs instanceof DistributedFileSystem);
|
||||
Assert.assertTrue(fs instanceof DistributedFileSystem);
|
||||
DistributedFileSystem directDfs = (DistributedFileSystem) fs;
|
||||
|
||||
Configuration cconf = getCustomSocketConfigs(nameNodePort);
|
||||
|
||||
fs = FileSystem.get(cconf);
|
||||
Assert.assertTrue(fs instanceof DistributedFileSystem);
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||
|
||||
JobClient client = null;
|
||||
MiniMRYarnCluster miniMRYarnCluster = null;
|
||||
try {
|
||||
// This will test RPC to the NameNode only.
|
||||
// could we test Client-DataNode connections?
|
||||
Path filePath = new Path("/dir");
|
||||
|
||||
Assert.assertFalse(directDfs.exists(filePath));
|
||||
Assert.assertFalse(dfs.exists(filePath));
|
||||
|
||||
directDfs.mkdirs(filePath);
|
||||
Assert.assertTrue(directDfs.exists(filePath));
|
||||
Assert.assertTrue(dfs.exists(filePath));
|
||||
|
||||
// This will test RPC to a Resource Manager
|
||||
fs = FileSystem.get(sconf);
|
||||
JobConf jobConf = new JobConf();
|
||||
FileSystem.setDefaultUri(jobConf, fs.getUri().toString());
|
||||
miniMRYarnCluster = initAndStartMiniMRYarnCluster(jobConf);
|
||||
JobConf jconf = new JobConf(cconf);
|
||||
jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
||||
String rmAddress = jconf.get("yarn.resourcemanager.address");
|
||||
String[] split = rmAddress.split(":");
|
||||
jconf.set("yarn.resourcemanager.address", split[0] + ':'
|
||||
+ (Integer.parseInt(split[1]) + 10));
|
||||
client = new JobClient(jconf);
|
||||
|
||||
JobStatus[] jobs = client.jobsToComplete();
|
||||
Assert.assertTrue(jobs.length == 0);
|
||||
|
||||
} finally {
|
||||
closeClient(client);
|
||||
closeDfs(dfs);
|
||||
closeDfs(directDfs);
|
||||
stopMiniMRYarnCluster(miniMRYarnCluster);
|
||||
shutdownDFSCluster(cluster);
|
||||
}
|
||||
}
|
||||
|
||||
private MiniMRYarnCluster initAndStartMiniMRYarnCluster(JobConf jobConf) {
|
||||
MiniMRYarnCluster miniMRYarnCluster;
|
||||
miniMRYarnCluster = new MiniMRYarnCluster(this.getClass().getName(), 1);
|
||||
miniMRYarnCluster.init(jobConf);
|
||||
miniMRYarnCluster.start();
|
||||
return miniMRYarnCluster;
|
||||
}
|
||||
|
||||
private Configuration getCustomSocketConfigs(final int nameNodePort) {
|
||||
// Get another reference via network using a specific socket factory
|
||||
Configuration cconf = new Configuration();
|
||||
FileSystem.setDefaultUri(cconf, String.format("hdfs://localhost:%s/",
|
||||
|
@ -68,63 +122,10 @@ public class TestSocketFactory extends TestCase {
|
|||
"org.apache.hadoop.ipc.DummySocketFactory");
|
||||
cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol",
|
||||
"org.apache.hadoop.ipc.DummySocketFactory");
|
||||
|
||||
fs = FileSystem.get(cconf);
|
||||
assertTrue(fs instanceof DistributedFileSystem);
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||
|
||||
JobClient client = null;
|
||||
MiniMRCluster mr = null;
|
||||
try {
|
||||
// This will test RPC to the NameNode only.
|
||||
// could we test Client-DataNode connections?
|
||||
Path filePath = new Path("/dir");
|
||||
|
||||
assertFalse(directDfs.exists(filePath));
|
||||
assertFalse(dfs.exists(filePath));
|
||||
|
||||
directDfs.mkdirs(filePath);
|
||||
assertTrue(directDfs.exists(filePath));
|
||||
assertTrue(dfs.exists(filePath));
|
||||
|
||||
// This will test TPC to a JobTracker
|
||||
fs = FileSystem.get(sconf);
|
||||
mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
|
||||
final int jobTrackerPort = mr.getJobTrackerPort();
|
||||
|
||||
JobConf jconf = new JobConf(cconf);
|
||||
jconf.set("mapred.job.tracker", String.format("localhost:%d",
|
||||
jobTrackerPort + 10));
|
||||
jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
|
||||
client = new JobClient(jconf);
|
||||
|
||||
JobStatus[] jobs = client.jobsToComplete();
|
||||
assertTrue(jobs.length == 0);
|
||||
|
||||
} finally {
|
||||
try {
|
||||
if (client != null)
|
||||
client.close();
|
||||
} catch (Exception ignored) {
|
||||
// nothing we can do
|
||||
ignored.printStackTrace();
|
||||
return cconf;
|
||||
}
|
||||
try {
|
||||
if (dfs != null)
|
||||
dfs.close();
|
||||
|
||||
} catch (Exception ignored) {
|
||||
// nothing we can do
|
||||
ignored.printStackTrace();
|
||||
}
|
||||
try {
|
||||
if (directDfs != null)
|
||||
directDfs.close();
|
||||
|
||||
} catch (Exception ignored) {
|
||||
// nothing we can do
|
||||
ignored.printStackTrace();
|
||||
}
|
||||
private void shutdownDFSCluster(MiniDFSCluster cluster) {
|
||||
try {
|
||||
if (cluster != null)
|
||||
cluster.shutdown();
|
||||
|
@ -133,13 +134,37 @@ public class TestSocketFactory extends TestCase {
|
|||
// nothing we can do
|
||||
ignored.printStackTrace();
|
||||
}
|
||||
if (mr != null) {
|
||||
}
|
||||
|
||||
private void stopMiniMRYarnCluster(MiniMRYarnCluster miniMRYarnCluster) {
|
||||
try {
|
||||
mr.shutdown();
|
||||
if (miniMRYarnCluster != null)
|
||||
miniMRYarnCluster.stop();
|
||||
|
||||
} catch (Exception ignored) {
|
||||
// nothing we can do
|
||||
ignored.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private void closeDfs(DistributedFileSystem dfs) {
|
||||
try {
|
||||
if (dfs != null)
|
||||
dfs.close();
|
||||
|
||||
} catch (Exception ignored) {
|
||||
// nothing we can do
|
||||
ignored.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private void closeClient(JobClient client) {
|
||||
try {
|
||||
if (client != null)
|
||||
client.close();
|
||||
} catch (Exception ignored) {
|
||||
// nothing we can do
|
||||
ignored.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -155,32 +180,27 @@ class DummySocketFactory extends StandardSocketFactory {
|
|||
public DummySocketFactory() {
|
||||
}
|
||||
|
||||
/* @inheritDoc */
|
||||
@Override
|
||||
public Socket createSocket() throws IOException {
|
||||
return new Socket() {
|
||||
@Override
|
||||
public void connect(SocketAddress addr, int timeout)
|
||||
throws IOException {
|
||||
public void connect(SocketAddress addr, int timeout) throws IOException {
|
||||
|
||||
assert (addr instanceof InetSocketAddress);
|
||||
InetSocketAddress iaddr = (InetSocketAddress) addr;
|
||||
SocketAddress newAddr = null;
|
||||
if (iaddr.isUnresolved())
|
||||
newAddr =
|
||||
new InetSocketAddress(iaddr.getHostName(),
|
||||
newAddr = new InetSocketAddress(iaddr.getHostName(),
|
||||
iaddr.getPort() - 10);
|
||||
else
|
||||
newAddr =
|
||||
new InetSocketAddress(iaddr.getAddress(), iaddr.getPort() - 10);
|
||||
System.out.printf("Test socket: rerouting %s to %s\n", iaddr,
|
||||
newAddr);
|
||||
newAddr = new InetSocketAddress(iaddr.getAddress(),
|
||||
iaddr.getPort() - 10);
|
||||
System.out.printf("Test socket: rerouting %s to %s\n", iaddr, newAddr);
|
||||
super.connect(newAddr, timeout);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/* @inheritDoc */
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
|
@ -191,11 +211,4 @@ class DummySocketFactory extends StandardSocketFactory {
|
|||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
/* @inheritDoc */
|
||||
@Override
|
||||
public int hashCode() {
|
||||
// Dummy hash code (to make find bugs happy)
|
||||
return 53;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue