diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java index 5c29a33312d..9d99c47b32f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java @@ -35,6 +35,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.net.ConnectTimeoutException; /** *

@@ -543,6 +544,7 @@ public class RetryPolicies { e instanceof NoRouteToHostException || e instanceof UnknownHostException || e instanceof StandbyException || + e instanceof ConnectTimeoutException || isWrappedStandbyException(e)) { return new RetryAction( RetryAction.RetryDecision.FAILOVER_AND_RETRY, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 272ece5a1db..b09d9753acc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -68,6 +68,7 @@ import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderPro import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto; +import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.SaslRpcClient; @@ -510,14 +511,14 @@ public class Client { } this.socket.setSoTimeout(pingInterval); return; - } catch (SocketTimeoutException toe) { + } catch (ConnectTimeoutException toe) { /* Check for an address change and update the local reference. * Reset the failure counter if the address was changed */ if (updateAddress()) { timeoutFailures = ioFailures = 0; } - handleConnectionFailure(timeoutFailures++, + handleConnectionTimeout(timeoutFailures++, maxRetriesOnSocketTimeouts, toe); } catch (IOException ie) { if (updateAddress()) { @@ -679,7 +680,7 @@ public class Client { socket = null; } - /* Handle connection failures + /* Handle connection failures due to timeout on connect * * If the current number of retries is equal to the max number of retries, * stop retrying and throw the exception; Otherwise backoff 1 second and @@ -693,7 +694,7 @@ public class Client { * @param ioe failure reason * @throws IOException if max number of retries is reached */ - private void handleConnectionFailure( + private void handleConnectionTimeout( int curRetries, int maxRetries, IOException ioe) throws IOException { closeConnection(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ConnectTimeoutException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ConnectTimeoutException.java new file mode 100644 index 00000000000..12d7d175906 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ConnectTimeoutException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.net; + +import java.net.SocketTimeoutException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Thrown by {@link NetUtils#connect(java.net.Socket, java.net.SocketAddress, int)} + * if it times out while connecting to the remote host. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class ConnectTimeoutException extends SocketTimeoutException { + private static final long serialVersionUID = 1L; + + public ConnectTimeoutException(String msg) { + super(msg); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java index 29d8af9fd7f..7ff9030f94a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java @@ -20,6 +20,7 @@ package org.apache.hadoop.net; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Constructor; import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -517,11 +518,15 @@ public class NetUtils { socket.bind(localAddr); } - if (ch == null) { - // let the default implementation handle it. - socket.connect(endpoint, timeout); - } else { - SocketIOWithTimeout.connect(ch, endpoint, timeout); + try { + if (ch == null) { + // let the default implementation handle it. + socket.connect(endpoint, timeout); + } else { + SocketIOWithTimeout.connect(ch, endpoint, timeout); + } + } catch (SocketTimeoutException ste) { + throw new ConnectTimeoutException(ste.getMessage()); } // There is a very rare case allowed by the TCP specification, such that @@ -719,7 +724,7 @@ public class NetUtils { + see("BindException")); } else if (exception instanceof ConnectException) { // connection refused; include the host:port in the error - return (ConnectException) new ConnectException( + return wrapWithMessage(exception, "Call From " + localHost + " to " @@ -729,32 +734,28 @@ public class NetUtils { + " failed on connection exception: " + exception + ";" - + see("ConnectionRefused")) - .initCause(exception); + + see("ConnectionRefused")); } else if (exception instanceof UnknownHostException) { - return (UnknownHostException) new UnknownHostException( + return wrapWithMessage(exception, "Invalid host name: " + getHostDetailsAsString(destHost, destPort, localHost) + exception + ";" - + see("UnknownHost")) - .initCause(exception); + + see("UnknownHost")); } else if (exception instanceof SocketTimeoutException) { - return (SocketTimeoutException) new SocketTimeoutException( + return wrapWithMessage(exception, "Call From " + localHost + " to " + destHost + ":" + destPort + " failed on socket timeout exception: " + exception + ";" - + see("SocketTimeout")) - .initCause(exception); + + see("SocketTimeout")); } else if (exception instanceof NoRouteToHostException) { - return (NoRouteToHostException) new NoRouteToHostException( + return wrapWithMessage(exception, "No Route to Host from " + localHost + " to " + destHost + ":" + destPort + " failed on socket timeout exception: " + exception + ";" - + see("NoRouteToHost")) - .initCause(exception); + + see("NoRouteToHost")); } else { return (IOException) new IOException("Failed on local exception: " @@ -769,6 +770,21 @@ public class NetUtils { private static String see(final String entry) { return FOR_MORE_DETAILS_SEE + HADOOP_WIKI + entry; } + + @SuppressWarnings("unchecked") + private static T wrapWithMessage( + T exception, String msg) { + Class clazz = exception.getClass(); + try { + Constructor ctor = clazz.getConstructor(String.class); + Throwable t = ctor.newInstance(msg); + return (T)(t.initCause(exception)); + } catch (Throwable e) { + LOG.warn("Unable to wrap exception of type " + + clazz + ": it has no (String) constructor", e); + return exception; + } + } /** * Get the host details as a string diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 5762b56b9a0..3847bfd0814 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -26,6 +26,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; import java.util.Random; @@ -586,7 +587,7 @@ public class TestIPC { private void assertRetriesOnSocketTimeouts(Configuration conf, int maxTimeoutRetries) throws IOException, InterruptedException { SocketFactory mockFactory = Mockito.mock(SocketFactory.class); - doThrow(new SocketTimeoutException()).when(mockFactory).createSocket(); + doThrow(new ConnectTimeoutException("fake")).when(mockFactory).createSocket(); Client client = new Client(IntWritable.class, conf, mockFactory); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 458c537e1b9..fa51edeeecc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -457,6 +457,9 @@ Release 2.0.3-alpha - Unreleased HDFS-4462. 2NN will fail to checkpoint after an HDFS upgrade from a pre-federation version of HDFS. (atm) + HDFS-4404. Create file failure when the machine of first attempted NameNode + is down. (Todd Lipcon via atm) + BREAKDOWN OF HDFS-3077 SUBTASKS HDFS-3077. Quorum-based protocol for reading and writing edit logs. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java index a88e8a74edf..86bcae3a0a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java @@ -23,22 +23,34 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; +import javax.net.SocketFactory; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.ConnectTimeoutException; +import org.apache.hadoop.net.StandardSocketFactory; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class TestDFSClientFailover { @@ -91,6 +103,63 @@ public class TestDFSClientFailover { fs.close(); } + /** + * Test that even a non-idempotent method will properly fail-over if the + * first IPC attempt times out trying to connect. Regression test for + * HDFS-4404. + */ + @Test + public void testFailoverOnConnectTimeout() throws Exception { + conf.setClass(CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, + InjectingSocketFactory.class, SocketFactory.class); + // Set up the InjectingSocketFactory to throw a ConnectTimeoutException + // when connecting to the first NN. + InjectingSocketFactory.portToInjectOn = cluster.getNameNodePort(0); + + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + + // Make the second NN the active one. + cluster.shutdownNameNode(0); + cluster.transitionToActive(1); + + // Call a non-idempotent method, and ensure the failover of the call proceeds + // successfully. + IOUtils.closeStream(fs.create(TEST_FILE)); + } + + private static class InjectingSocketFactory extends StandardSocketFactory { + + static SocketFactory defaultFactory = SocketFactory.getDefault(); + + static int portToInjectOn; + + @Override + public Socket createSocket() throws IOException { + Socket spy = Mockito.spy(defaultFactory.createSocket()); + // Simplify our spying job by not having to also spy on the channel + Mockito.doReturn(null).when(spy).getChannel(); + // Throw a ConnectTimeoutException when connecting to our target "bad" + // host. + Mockito.doThrow(new ConnectTimeoutException("injected")) + .when(spy).connect( + Mockito.argThat(new MatchesPort()), + Mockito.anyInt()); + return spy; + } + + private class MatchesPort extends BaseMatcher { + @Override + public boolean matches(Object arg0) { + return ((InetSocketAddress)arg0).getPort() == portToInjectOn; + } + + @Override + public void describeTo(Description desc) { + desc.appendText("matches port " + portToInjectOn); + } + } + } + /** * Regression test for HDFS-2683. */