From 6b58fa6494eb9f7cca91ccd86b4b518b59b4aa5e Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Thu, 3 May 2012 21:58:08 +0000 Subject: [PATCH] Amend previous commit of HADOOP-8350 (missed new SocketInputWrapper file) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1333650 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/net/SocketInputWrapper.java | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java new file mode 100644 index 00000000000..f5cbe17519d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.net; + +import java.io.FilterInputStream; + +import java.io.InputStream; +import java.net.Socket; +import java.net.SocketException; +import java.nio.channels.ReadableByteChannel; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; + +/** + * A wrapper stream around a socket which allows setting of its timeout. If the + * socket has a channel, this uses non-blocking IO via the package-private + * {@link SocketInputStream} implementation. Otherwise, timeouts are managed by + * setting the underlying socket timeout itself. + */ +@InterfaceAudience.LimitedPrivate("HDFS") +@InterfaceStability.Unstable +public class SocketInputWrapper extends FilterInputStream { + private final Socket socket; + private final boolean hasChannel; + + SocketInputWrapper(Socket s, InputStream is) { + super(is); + this.socket = s; + this.hasChannel = s.getChannel() != null; + if (hasChannel) { + Preconditions.checkArgument(is instanceof SocketInputStream, + "Expected a SocketInputStream when there is a channel. " + + "Got: %s", is); + } + } + + /** + * Set the timeout for reads from this stream. + * + * Note: the behavior here can differ subtly depending on whether the + * underlying socket has an associated Channel. In particular, if there is no + * channel, then this call will affect the socket timeout for all + * readers of this socket. If there is a channel, then this call will affect + * the timeout only for this stream. As such, it is recommended to + * only create one {@link SocketInputWrapper} instance per socket. + * + * @param timeoutMs + * the new timeout, 0 for no timeout + * @throws SocketException + * if the timeout cannot be set + */ + public void setTimeout(long timeoutMs) throws SocketException { + if (hasChannel) { + ((SocketInputStream)in).setTimeout(timeoutMs); + } else { + socket.setSoTimeout((int)timeoutMs); + } + } + + /** + * @return an underlying ReadableByteChannel implementation. + * @throws IllegalStateException if this socket does not have a channel + */ + public ReadableByteChannel getReadableByteChannel() { + Preconditions.checkState(hasChannel, + "Socket %s does not have a channel", + this.socket); + return (SocketInputStream)in; + } +} \ No newline at end of file