diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index af94c1b898a..421490b32fc 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -155,6 +155,9 @@ Release 2.0.0 - UNRELEASED
HADOOP-8347. Hadoop Common logs misspell 'successful'.
(Philip Zeyliger via eli)
+ HADOOP-8350. Improve NetUtils.getInputStream to return a stream which has
+ a tunable timeout. (todd)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
index d6bf5d92c3d..0fe61ad21c5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
@@ -375,53 +375,44 @@ public class NetUtils {
}
/**
- * Same as getInputStream(socket, socket.getSoTimeout()).
+ * Same as getInputStream(socket, socket.getSoTimeout()).
+ *
*
- * From documentation for {@link #getInputStream(Socket, long)}:
- * Returns InputStream for the socket. If the socket has an associated
- * SocketChannel then it returns a
- * {@link SocketInputStream} with the given timeout. If the socket does not
- * have a channel, {@link Socket#getInputStream()} is returned. In the later
- * case, the timeout argument is ignored and the timeout set with
- * {@link Socket#setSoTimeout(int)} applies for reads.
- *
- * Any socket created using socket factories returned by {@link NetUtils},
- * must use this interface instead of {@link Socket#getInputStream()}.
- *
* @see #getInputStream(Socket, long)
- *
- * @param socket
- * @return InputStream for reading from the socket.
- * @throws IOException
*/
- public static InputStream getInputStream(Socket socket)
+ public static SocketInputWrapper getInputStream(Socket socket)
throws IOException {
return getInputStream(socket, socket.getSoTimeout());
}
-
+
/**
- * Returns InputStream for the socket. If the socket has an associated
- * SocketChannel then it returns a
- * {@link SocketInputStream} with the given timeout. If the socket does not
- * have a channel, {@link Socket#getInputStream()} is returned. In the later
- * case, the timeout argument is ignored and the timeout set with
- * {@link Socket#setSoTimeout(int)} applies for reads.
+ * Return a {@link SocketInputWrapper} for the socket and set the given
+ * timeout. If the socket does not have an associated channel, then its socket
+ * timeout will be set to the specified value. Otherwise, a
+ * {@link SocketInputStream} will be created which reads with the configured
+ * timeout.
*
- * Any socket created using socket factories returned by {@link NetUtils},
+ * Any socket created using socket factories returned by {@link #NetUtils},
* must use this interface instead of {@link Socket#getInputStream()}.
- *
+ *
+ * In general, this should be called only once on each socket: see the note
+ * in {@link SocketInputWrapper#setTimeout(long)} for more information.
+ *
* @see Socket#getChannel()
*
* @param socket
- * @param timeout timeout in milliseconds. This may not always apply. zero
- * for waiting as long as necessary.
- * @return InputStream for reading from the socket.
+ * @param timeout timeout in milliseconds. zero for waiting as
+ * long as necessary.
+ * @return SocketInputWrapper for reading from the socket.
* @throws IOException
*/
- public static InputStream getInputStream(Socket socket, long timeout)
+ public static SocketInputWrapper getInputStream(Socket socket, long timeout)
throws IOException {
- return (socket.getChannel() == null) ?
- socket.getInputStream() : new SocketInputStream(socket, timeout);
+ InputStream stm = (socket.getChannel() == null) ?
+ socket.getInputStream() : new SocketInputStream(socket);
+ SocketInputWrapper w = new SocketInputWrapper(socket, stm);
+ w.setTimeout(timeout);
+ return w;
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
index e51602ff058..18874ecf91b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
@@ -247,6 +247,10 @@ abstract class SocketIOWithTimeout {
ops));
}
}
+
+ public void setTimeout(long timeoutMs) {
+ this.timeout = timeoutMs;
+ }
private static String timeoutExceptionString(SelectableChannel channel,
long timeout, int ops) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java
index ef8c02b7dda..a0b0c3ed0f9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java
@@ -28,9 +28,6 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
/**
* This implements an input stream that can have a timeout while reading.
* This sets non-blocking flag on the socket channel.
@@ -40,9 +37,7 @@ import org.apache.hadoop.classification.InterfaceStability;
* IllegalBlockingModeException.
* Please use {@link SocketOutputStream} for writing.
*/
-@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
-@InterfaceStability.Unstable
-public class SocketInputStream extends InputStream
+class SocketInputStream extends InputStream
implements ReadableByteChannel {
private Reader reader;
@@ -171,4 +166,8 @@ public class SocketInputStream extends InputStream
public void waitForReadable() throws IOException {
reader.waitForIO(SelectionKey.OP_READ);
}
+
+ public void setTimeout(long timeoutMs) {
+ reader.setTimeout(timeoutMs);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java
new file mode 100644
index 00000000000..f5cbe17519d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.io.FilterInputStream;
+
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A wrapper stream around a socket which allows setting of its timeout. If the
+ * socket has a channel, this uses non-blocking IO via the package-private
+ * {@link SocketInputStream} implementation. Otherwise, timeouts are managed by
+ * setting the underlying socket timeout itself.
+ */
+@InterfaceAudience.LimitedPrivate("HDFS")
+@InterfaceStability.Unstable
+public class SocketInputWrapper extends FilterInputStream {
+ private final Socket socket;
+ private final boolean hasChannel;
+
+ SocketInputWrapper(Socket s, InputStream is) {
+ super(is);
+ this.socket = s;
+ this.hasChannel = s.getChannel() != null;
+ if (hasChannel) {
+ Preconditions.checkArgument(is instanceof SocketInputStream,
+ "Expected a SocketInputStream when there is a channel. " +
+ "Got: %s", is);
+ }
+ }
+
+ /**
+ * Set the timeout for reads from this stream.
+ *
+ * Note: the behavior here can differ subtly depending on whether the
+ * underlying socket has an associated Channel. In particular, if there is no
+ * channel, then this call will affect the socket timeout for all
+ * readers of this socket. If there is a channel, then this call will affect
+ * the timeout only for this stream. As such, it is recommended to
+ * only create one {@link SocketInputWrapper} instance per socket.
+ *
+ * @param timeoutMs
+ * the new timeout, 0 for no timeout
+ * @throws SocketException
+ * if the timeout cannot be set
+ */
+ public void setTimeout(long timeoutMs) throws SocketException {
+ if (hasChannel) {
+ ((SocketInputStream)in).setTimeout(timeoutMs);
+ } else {
+ socket.setSoTimeout((int)timeoutMs);
+ }
+ }
+
+ /**
+ * @return an underlying ReadableByteChannel implementation.
+ * @throws IllegalStateException if this socket does not have a channel
+ */
+ public ReadableByteChannel getReadableByteChannel() {
+ Preconditions.checkState(hasChannel,
+ "Socket %s does not have a channel",
+ this.socket);
+ return (SocketInputStream)in;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java
index f10323b8273..61ac35c5cc9 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java
@@ -25,11 +25,14 @@ import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
+import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
import junit.framework.AssertionFailedError;
@@ -37,7 +40,11 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.NetUtilsTestResolver;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -50,6 +57,13 @@ public class TestNetUtils {
private static final int LOCAL_PORT = 8080;
private static final String LOCAL_PORT_NAME = Integer.toString(LOCAL_PORT);
+ /**
+ * Some slop around expected times when making sure timeouts behave
+ * as expected. We assume that they will be accurate to within
+ * this threshold.
+ */
+ static final long TIME_FUDGE_MILLIS = 200;
+
/**
* Test that we can't accidentally connect back to the connecting socket due
* to a quirk in the TCP spec.
@@ -81,6 +95,79 @@ public class TestNetUtils {
}
}
+ @Test
+ public void testSocketReadTimeoutWithChannel() throws Exception {
+ doSocketReadTimeoutTest(true);
+ }
+
+ @Test
+ public void testSocketReadTimeoutWithoutChannel() throws Exception {
+ doSocketReadTimeoutTest(false);
+ }
+
+
+ private void doSocketReadTimeoutTest(boolean withChannel)
+ throws IOException {
+ // Binding a ServerSocket is enough to accept connections.
+ // Rely on the backlog to accept for us.
+ ServerSocket ss = new ServerSocket(0);
+
+ Socket s;
+ if (withChannel) {
+ s = NetUtils.getDefaultSocketFactory(new Configuration())
+ .createSocket();
+ Assume.assumeNotNull(s.getChannel());
+ } else {
+ s = new Socket();
+ assertNull(s.getChannel());
+ }
+
+ SocketInputWrapper stm = null;
+ try {
+ NetUtils.connect(s, ss.getLocalSocketAddress(), 1000);
+
+ stm = NetUtils.getInputStream(s, 1000);
+ assertReadTimeout(stm, 1000);
+
+ // Change timeout, make sure it applies.
+ stm.setTimeout(1);
+ assertReadTimeout(stm, 1);
+
+ // If there is a channel, then setting the socket timeout
+ // should not matter. If there is not a channel, it will
+ // take effect.
+ s.setSoTimeout(1000);
+ if (withChannel) {
+ assertReadTimeout(stm, 1);
+ } else {
+ assertReadTimeout(stm, 1000);
+ }
+ } finally {
+ IOUtils.closeStream(stm);
+ IOUtils.closeSocket(s);
+ ss.close();
+ }
+ }
+
+ private void assertReadTimeout(SocketInputWrapper stm, int timeoutMillis)
+ throws IOException {
+ long st = System.nanoTime();
+ try {
+ stm.read();
+ fail("Didn't time out");
+ } catch (SocketTimeoutException ste) {
+ assertTimeSince(st, timeoutMillis);
+ }
+ }
+
+ private void assertTimeSince(long startNanos, int expectedMillis) {
+ long durationNano = System.nanoTime() - startNanos;
+ long millis = TimeUnit.MILLISECONDS.convert(
+ durationNano, TimeUnit.NANOSECONDS);
+ assertTrue("Expected " + expectedMillis + "ms, but took " + millis,
+ Math.abs(millis - expectedMillis) < TIME_FUDGE_MILLIS);
+ }
+
/**
* Test for {
* @throws UnknownHostException @link NetUtils#getLocalInetAddress(String)
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java
index 0c887eb82b8..5e3116e89ed 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.net;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.channels.Pipe;
@@ -26,8 +27,13 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MultithreadedTestUtil;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
-import junit.framework.TestCase;
+import org.junit.Test;
+import static org.junit.Assert.*;
/**
* This tests timout out from SocketInputStream and
@@ -36,14 +42,17 @@ import junit.framework.TestCase;
* Normal read and write using these streams are tested by pretty much
* every DFS unit test.
*/
-public class TestSocketIOWithTimeout extends TestCase {
+public class TestSocketIOWithTimeout {
static Log LOG = LogFactory.getLog(TestSocketIOWithTimeout.class);
private static int TIMEOUT = 1*1000;
private static String TEST_STRING = "1234567890";
+
+ private MultithreadedTestUtil.TestContext ctx = new TestContext();
- private void doIO(InputStream in, OutputStream out) throws IOException {
+ private void doIO(InputStream in, OutputStream out,
+ int expectedTimeout) throws IOException {
/* Keep on writing or reading until we get SocketTimeoutException.
* It expects this exception to occur within 100 millis of TIMEOUT.
*/
@@ -61,34 +70,15 @@ public class TestSocketIOWithTimeout extends TestCase {
long diff = System.currentTimeMillis() - start;
LOG.info("Got SocketTimeoutException as expected after " +
diff + " millis : " + e.getMessage());
- assertTrue(Math.abs(TIMEOUT - diff) <= 200);
+ assertTrue(Math.abs(expectedTimeout - diff) <=
+ TestNetUtils.TIME_FUDGE_MILLIS);
break;
}
}
}
- /**
- * Just reads one byte from the input stream.
- */
- static class ReadRunnable implements Runnable {
- private InputStream in;
-
- public ReadRunnable(InputStream in) {
- this.in = in;
- }
- public void run() {
- try {
- in.read();
- } catch (IOException e) {
- LOG.info("Got expection while reading as expected : " +
- e.getMessage());
- return;
- }
- assertTrue(false);
- }
- }
-
- public void testSocketIOWithTimeout() throws IOException {
+ @Test
+ public void testSocketIOWithTimeout() throws Exception {
// first open pipe:
Pipe pipe = Pipe.open();
@@ -96,7 +86,7 @@ public class TestSocketIOWithTimeout extends TestCase {
Pipe.SinkChannel sink = pipe.sink();
try {
- InputStream in = new SocketInputStream(source, TIMEOUT);
+ final InputStream in = new SocketInputStream(source, TIMEOUT);
OutputStream out = new SocketOutputStream(sink, TIMEOUT);
byte[] writeBytes = TEST_STRING.getBytes();
@@ -105,37 +95,62 @@ public class TestSocketIOWithTimeout extends TestCase {
out.write(writeBytes);
out.write(byteWithHighBit);
- doIO(null, out);
+ doIO(null, out, TIMEOUT);
in.read(readBytes);
assertTrue(Arrays.equals(writeBytes, readBytes));
assertEquals(byteWithHighBit & 0xff, in.read());
- doIO(in, null);
+ doIO(in, null, TIMEOUT);
+
+ // Change timeout on the read side.
+ ((SocketInputStream)in).setTimeout(TIMEOUT * 2);
+ doIO(in, null, TIMEOUT * 2);
+
/*
* Verify that it handles interrupted threads properly.
- * Use a large timeout and expect the thread to return quickly.
+ * Use a large timeout and expect the thread to return quickly
+ * upon interruption.
*/
- in = new SocketInputStream(source, 0);
- Thread thread = new Thread(new ReadRunnable(in));
- thread.start();
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignored) {}
-
+ ((SocketInputStream)in).setTimeout(0);
+ TestingThread thread = new TestingThread(ctx) {
+ @Override
+ public void doWork() throws Exception {
+ try {
+ in.read();
+ fail("Did not fail with interrupt");
+ } catch (InterruptedIOException ste) {
+ LOG.info("Got expection while reading as expected : " +
+ ste.getMessage());
+ }
+ }
+ };
+ ctx.addThread(thread);
+ ctx.startThreads();
+ // If the thread is interrupted before it calls read()
+ // then it throws ClosedByInterruptException due to
+ // some Java quirk. Waiting for it to call read()
+ // gets it into select(), so we get the expected
+ // InterruptedIOException.
+ Thread.sleep(1000);
thread.interrupt();
-
- try {
- thread.join();
- } catch (InterruptedException e) {
- throw new IOException("Unexpected InterruptedException : " + e);
- }
-
+ ctx.stop();
+
//make sure the channels are still open
assertTrue(source.isOpen());
assertTrue(sink.isOpen());
-
+
+ // Nevertheless, the output stream is closed, because
+ // a partial write may have succeeded (see comment in
+ // SocketOutputStream#write(byte[]), int, int)
+ try {
+ out.write(1);
+ fail("Did not throw");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "stream is closed", ioe);
+ }
+
out.close();
assertFalse(sink.isOpen());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index ea247775714..55ce4c8fe13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.SocketInputStream;
+import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -428,11 +428,8 @@ public class RemoteBlockReader2 implements BlockReader {
//
// Get bytes in block, set streams
//
- Preconditions.checkArgument(sock.getChannel() != null,
- "Socket %s does not have an associated Channel.",
- sock);
- SocketInputStream sin =
- (SocketInputStream)NetUtils.getInputStream(sock);
+ SocketInputWrapper sin = NetUtils.getInputStream(sock);
+ ReadableByteChannel ch = sin.getReadableByteChannel();
DataInputStream in = new DataInputStream(sin);
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
@@ -455,7 +452,7 @@ public class RemoteBlockReader2 implements BlockReader {
}
return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
- sin, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+ ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
}
static void checkSuccess(