From 512763f50f438de6c47a01c98e876a3320dc8f61 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Thu, 3 May 2012 21:58:34 +0000 Subject: [PATCH] HADOOP-8350. Improve NetUtils.getInputStream to return a stream which has a tunable timeout. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1333651 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../java/org/apache/hadoop/net/NetUtils.java | 55 ++++----- .../hadoop/net/SocketIOWithTimeout.java | 4 + .../apache/hadoop/net/SocketInputStream.java | 11 +- .../apache/hadoop/net/SocketInputWrapper.java | 88 ++++++++++++++ .../org/apache/hadoop/net/TestNetUtils.java | 87 ++++++++++++++ .../hadoop/net/TestSocketIOWithTimeout.java | 107 ++++++++++-------- .../hadoop/hdfs/RemoteBlockReader2.java | 11 +- 8 files changed, 275 insertions(+), 91 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index af94c1b898a..421490b32fc 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -155,6 +155,9 @@ Release 2.0.0 - UNRELEASED HADOOP-8347. Hadoop Common logs misspell 'successful'. (Philip Zeyliger via eli) + HADOOP-8350. Improve NetUtils.getInputStream to return a stream which has + a tunable timeout. (todd) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java index d6bf5d92c3d..0fe61ad21c5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java @@ -375,53 +375,44 @@ public static InetSocketAddress getConnectAddress(InetSocketAddress addr) { } /** - * Same as getInputStream(socket, socket.getSoTimeout()).

+ * Same as getInputStream(socket, socket.getSoTimeout()). + *

* - * From documentation for {@link #getInputStream(Socket, long)}:
- * Returns InputStream for the socket. If the socket has an associated - * SocketChannel then it returns a - * {@link SocketInputStream} with the given timeout. If the socket does not - * have a channel, {@link Socket#getInputStream()} is returned. In the later - * case, the timeout argument is ignored and the timeout set with - * {@link Socket#setSoTimeout(int)} applies for reads.

- * - * Any socket created using socket factories returned by {@link NetUtils}, - * must use this interface instead of {@link Socket#getInputStream()}. - * * @see #getInputStream(Socket, long) - * - * @param socket - * @return InputStream for reading from the socket. - * @throws IOException */ - public static InputStream getInputStream(Socket socket) + public static SocketInputWrapper getInputStream(Socket socket) throws IOException { return getInputStream(socket, socket.getSoTimeout()); } - + /** - * Returns InputStream for the socket. If the socket has an associated - * SocketChannel then it returns a - * {@link SocketInputStream} with the given timeout. If the socket does not - * have a channel, {@link Socket#getInputStream()} is returned. In the later - * case, the timeout argument is ignored and the timeout set with - * {@link Socket#setSoTimeout(int)} applies for reads.

+ * Return a {@link SocketInputWrapper} for the socket and set the given + * timeout. If the socket does not have an associated channel, then its socket + * timeout will be set to the specified value. Otherwise, a + * {@link SocketInputStream} will be created which reads with the configured + * timeout. * - * Any socket created using socket factories returned by {@link NetUtils}, + * Any socket created using socket factories returned by {@link #NetUtils}, * must use this interface instead of {@link Socket#getInputStream()}. - * + * + * In general, this should be called only once on each socket: see the note + * in {@link SocketInputWrapper#setTimeout(long)} for more information. + * * @see Socket#getChannel() * * @param socket - * @param timeout timeout in milliseconds. This may not always apply. zero - * for waiting as long as necessary. - * @return InputStream for reading from the socket. + * @param timeout timeout in milliseconds. zero for waiting as + * long as necessary. + * @return SocketInputWrapper for reading from the socket. * @throws IOException */ - public static InputStream getInputStream(Socket socket, long timeout) + public static SocketInputWrapper getInputStream(Socket socket, long timeout) throws IOException { - return (socket.getChannel() == null) ? - socket.getInputStream() : new SocketInputStream(socket, timeout); + InputStream stm = (socket.getChannel() == null) ? + socket.getInputStream() : new SocketInputStream(socket); + SocketInputWrapper w = new SocketInputWrapper(socket, stm); + w.setTimeout(timeout); + return w; } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java index e51602ff058..18874ecf91b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java @@ -247,6 +247,10 @@ void waitForIO(int ops) throws IOException { ops)); } } + + public void setTimeout(long timeoutMs) { + this.timeout = timeoutMs; + } private static String timeoutExceptionString(SelectableChannel channel, long timeout, int ops) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java index ef8c02b7dda..a0b0c3ed0f9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java @@ -28,9 +28,6 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - /** * This implements an input stream that can have a timeout while reading. * This sets non-blocking flag on the socket channel. @@ -40,9 +37,7 @@ * IllegalBlockingModeException. * Please use {@link SocketOutputStream} for writing. */ -@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) -@InterfaceStability.Unstable -public class SocketInputStream extends InputStream +class SocketInputStream extends InputStream implements ReadableByteChannel { private Reader reader; @@ -171,4 +166,8 @@ public int read(ByteBuffer dst) throws IOException { public void waitForReadable() throws IOException { reader.waitForIO(SelectionKey.OP_READ); } + + public void setTimeout(long timeoutMs) { + reader.setTimeout(timeoutMs); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java new file mode 100644 index 00000000000..f5cbe17519d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.net; + +import java.io.FilterInputStream; + +import java.io.InputStream; +import java.net.Socket; +import java.net.SocketException; +import java.nio.channels.ReadableByteChannel; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; + +/** + * A wrapper stream around a socket which allows setting of its timeout. If the + * socket has a channel, this uses non-blocking IO via the package-private + * {@link SocketInputStream} implementation. Otherwise, timeouts are managed by + * setting the underlying socket timeout itself. + */ +@InterfaceAudience.LimitedPrivate("HDFS") +@InterfaceStability.Unstable +public class SocketInputWrapper extends FilterInputStream { + private final Socket socket; + private final boolean hasChannel; + + SocketInputWrapper(Socket s, InputStream is) { + super(is); + this.socket = s; + this.hasChannel = s.getChannel() != null; + if (hasChannel) { + Preconditions.checkArgument(is instanceof SocketInputStream, + "Expected a SocketInputStream when there is a channel. " + + "Got: %s", is); + } + } + + /** + * Set the timeout for reads from this stream. + * + * Note: the behavior here can differ subtly depending on whether the + * underlying socket has an associated Channel. In particular, if there is no + * channel, then this call will affect the socket timeout for all + * readers of this socket. If there is a channel, then this call will affect + * the timeout only for this stream. As such, it is recommended to + * only create one {@link SocketInputWrapper} instance per socket. + * + * @param timeoutMs + * the new timeout, 0 for no timeout + * @throws SocketException + * if the timeout cannot be set + */ + public void setTimeout(long timeoutMs) throws SocketException { + if (hasChannel) { + ((SocketInputStream)in).setTimeout(timeoutMs); + } else { + socket.setSoTimeout((int)timeoutMs); + } + } + + /** + * @return an underlying ReadableByteChannel implementation. + * @throws IllegalStateException if this socket does not have a channel + */ + public ReadableByteChannel getReadableByteChannel() { + Preconditions.checkState(hasChannel, + "Socket %s does not have a channel", + this.socket); + return (SocketInputStream)in; + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java index f10323b8273..61ac35c5cc9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java @@ -25,11 +25,14 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; +import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.net.URI; import java.net.UnknownHostException; import java.util.Enumeration; +import java.util.concurrent.TimeUnit; import junit.framework.AssertionFailedError; @@ -37,7 +40,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.NetUtilsTestResolver; +import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread; +import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -50,6 +57,13 @@ public class TestNetUtils { private static final int LOCAL_PORT = 8080; private static final String LOCAL_PORT_NAME = Integer.toString(LOCAL_PORT); + /** + * Some slop around expected times when making sure timeouts behave + * as expected. We assume that they will be accurate to within + * this threshold. + */ + static final long TIME_FUDGE_MILLIS = 200; + /** * Test that we can't accidentally connect back to the connecting socket due * to a quirk in the TCP spec. @@ -81,6 +95,79 @@ public void testAvoidLoopbackTcpSockets() throws Exception { } } + @Test + public void testSocketReadTimeoutWithChannel() throws Exception { + doSocketReadTimeoutTest(true); + } + + @Test + public void testSocketReadTimeoutWithoutChannel() throws Exception { + doSocketReadTimeoutTest(false); + } + + + private void doSocketReadTimeoutTest(boolean withChannel) + throws IOException { + // Binding a ServerSocket is enough to accept connections. + // Rely on the backlog to accept for us. + ServerSocket ss = new ServerSocket(0); + + Socket s; + if (withChannel) { + s = NetUtils.getDefaultSocketFactory(new Configuration()) + .createSocket(); + Assume.assumeNotNull(s.getChannel()); + } else { + s = new Socket(); + assertNull(s.getChannel()); + } + + SocketInputWrapper stm = null; + try { + NetUtils.connect(s, ss.getLocalSocketAddress(), 1000); + + stm = NetUtils.getInputStream(s, 1000); + assertReadTimeout(stm, 1000); + + // Change timeout, make sure it applies. + stm.setTimeout(1); + assertReadTimeout(stm, 1); + + // If there is a channel, then setting the socket timeout + // should not matter. If there is not a channel, it will + // take effect. + s.setSoTimeout(1000); + if (withChannel) { + assertReadTimeout(stm, 1); + } else { + assertReadTimeout(stm, 1000); + } + } finally { + IOUtils.closeStream(stm); + IOUtils.closeSocket(s); + ss.close(); + } + } + + private void assertReadTimeout(SocketInputWrapper stm, int timeoutMillis) + throws IOException { + long st = System.nanoTime(); + try { + stm.read(); + fail("Didn't time out"); + } catch (SocketTimeoutException ste) { + assertTimeSince(st, timeoutMillis); + } + } + + private void assertTimeSince(long startNanos, int expectedMillis) { + long durationNano = System.nanoTime() - startNanos; + long millis = TimeUnit.MILLISECONDS.convert( + durationNano, TimeUnit.NANOSECONDS); + assertTrue("Expected " + expectedMillis + "ms, but took " + millis, + Math.abs(millis - expectedMillis) < TIME_FUDGE_MILLIS); + } + /** * Test for { * @throws UnknownHostException @link NetUtils#getLocalInetAddress(String) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java index 0c887eb82b8..5e3116e89ed 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.SocketTimeoutException; import java.nio.channels.Pipe; @@ -26,8 +27,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.MultithreadedTestUtil; +import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread; -import junit.framework.TestCase; +import org.junit.Test; +import static org.junit.Assert.*; /** * This tests timout out from SocketInputStream and @@ -36,14 +42,17 @@ * Normal read and write using these streams are tested by pretty much * every DFS unit test. */ -public class TestSocketIOWithTimeout extends TestCase { +public class TestSocketIOWithTimeout { static Log LOG = LogFactory.getLog(TestSocketIOWithTimeout.class); private static int TIMEOUT = 1*1000; private static String TEST_STRING = "1234567890"; + + private MultithreadedTestUtil.TestContext ctx = new TestContext(); - private void doIO(InputStream in, OutputStream out) throws IOException { + private void doIO(InputStream in, OutputStream out, + int expectedTimeout) throws IOException { /* Keep on writing or reading until we get SocketTimeoutException. * It expects this exception to occur within 100 millis of TIMEOUT. */ @@ -61,34 +70,15 @@ private void doIO(InputStream in, OutputStream out) throws IOException { long diff = System.currentTimeMillis() - start; LOG.info("Got SocketTimeoutException as expected after " + diff + " millis : " + e.getMessage()); - assertTrue(Math.abs(TIMEOUT - diff) <= 200); + assertTrue(Math.abs(expectedTimeout - diff) <= + TestNetUtils.TIME_FUDGE_MILLIS); break; } } } - /** - * Just reads one byte from the input stream. - */ - static class ReadRunnable implements Runnable { - private InputStream in; - - public ReadRunnable(InputStream in) { - this.in = in; - } - public void run() { - try { - in.read(); - } catch (IOException e) { - LOG.info("Got expection while reading as expected : " + - e.getMessage()); - return; - } - assertTrue(false); - } - } - - public void testSocketIOWithTimeout() throws IOException { + @Test + public void testSocketIOWithTimeout() throws Exception { // first open pipe: Pipe pipe = Pipe.open(); @@ -96,7 +86,7 @@ public void testSocketIOWithTimeout() throws IOException { Pipe.SinkChannel sink = pipe.sink(); try { - InputStream in = new SocketInputStream(source, TIMEOUT); + final InputStream in = new SocketInputStream(source, TIMEOUT); OutputStream out = new SocketOutputStream(sink, TIMEOUT); byte[] writeBytes = TEST_STRING.getBytes(); @@ -105,37 +95,62 @@ public void testSocketIOWithTimeout() throws IOException { out.write(writeBytes); out.write(byteWithHighBit); - doIO(null, out); + doIO(null, out, TIMEOUT); in.read(readBytes); assertTrue(Arrays.equals(writeBytes, readBytes)); assertEquals(byteWithHighBit & 0xff, in.read()); - doIO(in, null); + doIO(in, null, TIMEOUT); + + // Change timeout on the read side. + ((SocketInputStream)in).setTimeout(TIMEOUT * 2); + doIO(in, null, TIMEOUT * 2); + /* * Verify that it handles interrupted threads properly. - * Use a large timeout and expect the thread to return quickly. + * Use a large timeout and expect the thread to return quickly + * upon interruption. */ - in = new SocketInputStream(source, 0); - Thread thread = new Thread(new ReadRunnable(in)); - thread.start(); - - try { - Thread.sleep(1000); - } catch (InterruptedException ignored) {} - + ((SocketInputStream)in).setTimeout(0); + TestingThread thread = new TestingThread(ctx) { + @Override + public void doWork() throws Exception { + try { + in.read(); + fail("Did not fail with interrupt"); + } catch (InterruptedIOException ste) { + LOG.info("Got expection while reading as expected : " + + ste.getMessage()); + } + } + }; + ctx.addThread(thread); + ctx.startThreads(); + // If the thread is interrupted before it calls read() + // then it throws ClosedByInterruptException due to + // some Java quirk. Waiting for it to call read() + // gets it into select(), so we get the expected + // InterruptedIOException. + Thread.sleep(1000); thread.interrupt(); - - try { - thread.join(); - } catch (InterruptedException e) { - throw new IOException("Unexpected InterruptedException : " + e); - } - + ctx.stop(); + //make sure the channels are still open assertTrue(source.isOpen()); assertTrue(sink.isOpen()); - + + // Nevertheless, the output stream is closed, because + // a partial write may have succeeded (see comment in + // SocketOutputStream#write(byte[]), int, int) + try { + out.write(1); + fail("Did not throw"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "stream is closed", ioe); + } + out.close(); assertFalse(sink.isOpen()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index ea247775714..55ce4c8fe13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.util.DirectBufferPool; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.SocketInputStream; +import org.apache.hadoop.net.SocketInputWrapper; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -428,11 +428,8 @@ public static BlockReader newBlockReader( Socket sock, String file, // // Get bytes in block, set streams // - Preconditions.checkArgument(sock.getChannel() != null, - "Socket %s does not have an associated Channel.", - sock); - SocketInputStream sin = - (SocketInputStream)NetUtils.getInputStream(sock); + SocketInputWrapper sin = NetUtils.getInputStream(sock); + ReadableByteChannel ch = sin.getReadableByteChannel(); DataInputStream in = new DataInputStream(sin); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( @@ -455,7 +452,7 @@ public static BlockReader newBlockReader( Socket sock, String file, } return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), - sin, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); + ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); } static void checkSuccess(