HADOOP-10198. DomainSocket: add support for socketpair. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1554888 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-01-02 20:00:07 +00:00
parent bb2e2fee60
commit 3a299fd7bb
4 changed files with 97 additions and 8 deletions

View File

@ -108,6 +108,9 @@ Trunk (Unreleased)
HADOOP-10141. Create KeyProvider API to separate encryption key storage
from the applications. (omalley)
HADOOP-19198. DomainSocket: add support for socketpair.
(Colin Patrick McCabe via wang)
BUG FIXES
HADOOP-9451. Fault single-layer config if node group topology is enabled.

View File

@ -276,6 +276,24 @@ public static DomainSocket bindAndListen(String path) throws IOException {
return new DomainSocket(path, fd);
}
/**
* Create a pair of UNIX domain sockets which are connected to each other
* by calling socketpair(2).
*
* @return An array of two UNIX domain sockets connected to
* each other.
* @throws IOException on error.
*/
public static DomainSocket[] socketpair() throws IOException {
int fds[] = socketpair0();
return new DomainSocket[] {
new DomainSocket("(anonymous0)", fds[0]),
new DomainSocket("(anonymous1)", fds[1])
};
}
private static native int[] socketpair0() throws IOException;
private static native int accept0(int fd) throws IOException;
/**

View File

@ -364,6 +364,50 @@ JNIEnv *env, jclass clazz, jstring path)
return fd;
}
#define SOCKETPAIR_ARRAY_LEN 2
JNIEXPORT jarray JNICALL
Java_org_apache_hadoop_net_unix_DomainSocket_socketpair0(
JNIEnv *env, jclass clazz)
{
jarray arr = NULL;
int idx, err, fds[SOCKETPAIR_ARRAY_LEN] = { -1, -1 };
jthrowable jthr = NULL;
arr = (*env)->NewIntArray(env, SOCKETPAIR_ARRAY_LEN);
jthr = (*env)->ExceptionOccurred(env);
if (jthr) {
(*env)->ExceptionClear(env);
goto done;
}
if (socketpair(PF_UNIX, SOCK_STREAM, 0, fds) < 0) {
err = errno;
jthr = newSocketException(env, err,
"socketpair(2) error: %s", terror(err));
goto done;
}
(*env)->SetIntArrayRegion(env, arr, 0, SOCKETPAIR_ARRAY_LEN, fds);
jthr = (*env)->ExceptionOccurred(env);
if (jthr) {
(*env)->ExceptionClear(env);
goto done;
}
done:
if (jthr) {
(*env)->DeleteLocalRef(env, arr);
arr = NULL;
for (idx = 0; idx < SOCKETPAIR_ARRAY_LEN; idx++) {
if (fds[idx] >= 0) {
close(fds[idx]);
fds[idx] = -1;
}
}
(*env)->Throw(env, jthr);
}
return arr;
}
JNIEXPORT jint JNICALL
Java_org_apache_hadoop_net_unix_DomainSocket_accept0(
JNIEnv *env, jclass clazz, jint fd)

View File

@ -420,7 +420,8 @@ public int read(byte b[], int off, int length) throws IOException {
* @throws IOException
*/
void testClientServer1(final Class<? extends WriteStrategy> writeStrategyClass,
final Class<? extends ReadStrategy> readStrategyClass) throws Exception {
final Class<? extends ReadStrategy> readStrategyClass,
final DomainSocket preConnectedSockets[]) throws Exception {
final String TEST_PATH = new File(sockDir.getDir(),
"test_sock_client_server1").getAbsolutePath();
final byte clientMsg1[] = new byte[] { 0x1, 0x2, 0x3, 0x4, 0x5, 0x6 };
@ -428,13 +429,15 @@ void testClientServer1(final Class<? extends WriteStrategy> writeStrategyClass,
final byte clientMsg2 = 0x45;
final ArrayBlockingQueue<Throwable> threadResults =
new ArrayBlockingQueue<Throwable>(2);
final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
final DomainSocket serv = (preConnectedSockets != null) ?
null : DomainSocket.bindAndListen(TEST_PATH);
Thread serverThread = new Thread() {
public void run(){
// Run server
DomainSocket conn = null;
try {
conn = serv.accept();
conn = preConnectedSockets != null ?
preConnectedSockets[0] : serv.accept();
byte in1[] = new byte[clientMsg1.length];
ReadStrategy reader = readStrategyClass.newInstance();
reader.init(conn);
@ -459,7 +462,8 @@ public void run(){
Thread clientThread = new Thread() {
public void run(){
try {
DomainSocket client = DomainSocket.connect(TEST_PATH);
DomainSocket client = preConnectedSockets != null ?
preConnectedSockets[1] : DomainSocket.connect(TEST_PATH);
WriteStrategy writer = writeStrategyClass.newInstance();
writer.init(client);
writer.write(clientMsg1);
@ -487,25 +491,45 @@ public void run(){
}
serverThread.join(120000);
clientThread.join(120000);
serv.close();
if (serv != null) {
serv.close();
}
}
@Test(timeout=180000)
public void testClientServerOutStreamInStream() throws Exception {
testClientServer1(OutputStreamWriteStrategy.class,
InputStreamReadStrategy.class);
InputStreamReadStrategy.class, null);
}
@Test(timeout=180000)
public void testClientServerOutStreamInStreamWithSocketpair() throws Exception {
testClientServer1(OutputStreamWriteStrategy.class,
InputStreamReadStrategy.class, DomainSocket.socketpair());
}
@Test(timeout=180000)
public void testClientServerOutStreamInDbb() throws Exception {
testClientServer1(OutputStreamWriteStrategy.class,
DirectByteBufferReadStrategy.class);
DirectByteBufferReadStrategy.class, null);
}
@Test(timeout=180000)
public void testClientServerOutStreamInDbbWithSocketpair() throws Exception {
testClientServer1(OutputStreamWriteStrategy.class,
DirectByteBufferReadStrategy.class, DomainSocket.socketpair());
}
@Test(timeout=180000)
public void testClientServerOutStreamInAbb() throws Exception {
testClientServer1(OutputStreamWriteStrategy.class,
ArrayBackedByteBufferReadStrategy.class);
ArrayBackedByteBufferReadStrategy.class, null);
}
@Test(timeout=180000)
public void testClientServerOutStreamInAbbWithSocketpair() throws Exception {
testClientServer1(OutputStreamWriteStrategy.class,
ArrayBackedByteBufferReadStrategy.class, DomainSocket.socketpair());
}
static private class PassedFile {