From 3a299fd7bbacc69d8316a445fdf8c8bcbb79847f Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 2 Jan 2014 20:00:07 +0000 Subject: [PATCH] HADOOP-10198. DomainSocket: add support for socketpair. Contributed by Colin Patrick McCabe. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1554888 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 ++ .../apache/hadoop/net/unix/DomainSocket.java | 18 ++++++++ .../org/apache/hadoop/net/unix/DomainSocket.c | 44 +++++++++++++++++++ .../hadoop/net/unix/TestDomainSocket.java | 40 +++++++++++++---- 4 files changed, 97 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 345f09e1789..029a5efb8fa 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -108,6 +108,9 @@ Trunk (Unreleased) HADOOP-10141. Create KeyProvider API to separate encryption key storage from the applications. (omalley) + HADOOP-19198. DomainSocket: add support for socketpair. + (Colin Patrick McCabe via wang) + BUG FIXES HADOOP-9451. Fault single-layer config if node group topology is enabled. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java index 4c6ae0592c2..bdf4d67a1af 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java @@ -276,6 +276,24 @@ public class DomainSocket implements Closeable { return new DomainSocket(path, fd); } + /** + * Create a pair of UNIX domain sockets which are connected to each other + * by calling socketpair(2). + * + * @return An array of two UNIX domain sockets connected to + * each other. + * @throws IOException on error. + */ + public static DomainSocket[] socketpair() throws IOException { + int fds[] = socketpair0(); + return new DomainSocket[] { + new DomainSocket("(anonymous0)", fds[0]), + new DomainSocket("(anonymous1)", fds[1]) + }; + } + + private static native int[] socketpair0() throws IOException; + private static native int accept0(int fd) throws IOException; /** diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c index 26423f8d836..48c4252fe7c 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocket.c @@ -364,6 +364,50 @@ JNIEnv *env, jclass clazz, jstring path) return fd; } +#define SOCKETPAIR_ARRAY_LEN 2 + +JNIEXPORT jarray JNICALL +Java_org_apache_hadoop_net_unix_DomainSocket_socketpair0( +JNIEnv *env, jclass clazz) +{ + jarray arr = NULL; + int idx, err, fds[SOCKETPAIR_ARRAY_LEN] = { -1, -1 }; + jthrowable jthr = NULL; + + arr = (*env)->NewIntArray(env, SOCKETPAIR_ARRAY_LEN); + jthr = (*env)->ExceptionOccurred(env); + if (jthr) { + (*env)->ExceptionClear(env); + goto done; + } + if (socketpair(PF_UNIX, SOCK_STREAM, 0, fds) < 0) { + err = errno; + jthr = newSocketException(env, err, + "socketpair(2) error: %s", terror(err)); + goto done; + } + (*env)->SetIntArrayRegion(env, arr, 0, SOCKETPAIR_ARRAY_LEN, fds); + jthr = (*env)->ExceptionOccurred(env); + if (jthr) { + (*env)->ExceptionClear(env); + goto done; + } + +done: + if (jthr) { + (*env)->DeleteLocalRef(env, arr); + arr = NULL; + for (idx = 0; idx < SOCKETPAIR_ARRAY_LEN; idx++) { + if (fds[idx] >= 0) { + close(fds[idx]); + fds[idx] = -1; + } + } + (*env)->Throw(env, jthr); + } + return arr; +} + JNIEXPORT jint JNICALL Java_org_apache_hadoop_net_unix_DomainSocket_accept0( JNIEnv *env, jclass clazz, jint fd) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java index d512027d45d..d6d9591ddd0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java @@ -420,7 +420,8 @@ public class TestDomainSocket { * @throws IOException */ void testClientServer1(final Class writeStrategyClass, - final Class readStrategyClass) throws Exception { + final Class readStrategyClass, + final DomainSocket preConnectedSockets[]) throws Exception { final String TEST_PATH = new File(sockDir.getDir(), "test_sock_client_server1").getAbsolutePath(); final byte clientMsg1[] = new byte[] { 0x1, 0x2, 0x3, 0x4, 0x5, 0x6 }; @@ -428,13 +429,15 @@ public class TestDomainSocket { final byte clientMsg2 = 0x45; final ArrayBlockingQueue threadResults = new ArrayBlockingQueue(2); - final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH); + final DomainSocket serv = (preConnectedSockets != null) ? + null : DomainSocket.bindAndListen(TEST_PATH); Thread serverThread = new Thread() { public void run(){ // Run server DomainSocket conn = null; try { - conn = serv.accept(); + conn = preConnectedSockets != null ? + preConnectedSockets[0] : serv.accept(); byte in1[] = new byte[clientMsg1.length]; ReadStrategy reader = readStrategyClass.newInstance(); reader.init(conn); @@ -459,7 +462,8 @@ public class TestDomainSocket { Thread clientThread = new Thread() { public void run(){ try { - DomainSocket client = DomainSocket.connect(TEST_PATH); + DomainSocket client = preConnectedSockets != null ? + preConnectedSockets[1] : DomainSocket.connect(TEST_PATH); WriteStrategy writer = writeStrategyClass.newInstance(); writer.init(client); writer.write(clientMsg1); @@ -487,25 +491,45 @@ public class TestDomainSocket { } serverThread.join(120000); clientThread.join(120000); - serv.close(); + if (serv != null) { + serv.close(); + } } @Test(timeout=180000) public void testClientServerOutStreamInStream() throws Exception { testClientServer1(OutputStreamWriteStrategy.class, - InputStreamReadStrategy.class); + InputStreamReadStrategy.class, null); + } + + @Test(timeout=180000) + public void testClientServerOutStreamInStreamWithSocketpair() throws Exception { + testClientServer1(OutputStreamWriteStrategy.class, + InputStreamReadStrategy.class, DomainSocket.socketpair()); } @Test(timeout=180000) public void testClientServerOutStreamInDbb() throws Exception { testClientServer1(OutputStreamWriteStrategy.class, - DirectByteBufferReadStrategy.class); + DirectByteBufferReadStrategy.class, null); + } + + @Test(timeout=180000) + public void testClientServerOutStreamInDbbWithSocketpair() throws Exception { + testClientServer1(OutputStreamWriteStrategy.class, + DirectByteBufferReadStrategy.class, DomainSocket.socketpair()); } @Test(timeout=180000) public void testClientServerOutStreamInAbb() throws Exception { testClientServer1(OutputStreamWriteStrategy.class, - ArrayBackedByteBufferReadStrategy.class); + ArrayBackedByteBufferReadStrategy.class, null); + } + + @Test(timeout=180000) + public void testClientServerOutStreamInAbbWithSocketpair() throws Exception { + testClientServer1(OutputStreamWriteStrategy.class, + ArrayBackedByteBufferReadStrategy.class, DomainSocket.socketpair()); } static private class PassedFile {