HDFS-4404. Create file failure when the machine of first attempted NameNode is down. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1442462 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2013-02-05 03:23:46 +00:00
parent 992ba4c79c
commit 0da09ea945
7 changed files with 151 additions and 22 deletions

View File

@ -35,6 +35,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException;
/** /**
* <p> * <p>
@ -543,6 +544,7 @@ public class RetryPolicies {
e instanceof NoRouteToHostException || e instanceof NoRouteToHostException ||
e instanceof UnknownHostException || e instanceof UnknownHostException ||
e instanceof StandbyException || e instanceof StandbyException ||
e instanceof ConnectTimeoutException ||
isWrappedStandbyException(e)) { isWrappedStandbyException(e)) {
return new RetryAction( return new RetryAction(
RetryAction.RetryDecision.FAILOVER_AND_RETRY, RetryAction.RetryDecision.FAILOVER_AND_RETRY,

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderPro
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto; import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcStatusProto;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SaslRpcClient; import org.apache.hadoop.security.SaslRpcClient;
@ -510,14 +511,14 @@ public class Client {
} }
this.socket.setSoTimeout(pingInterval); this.socket.setSoTimeout(pingInterval);
return; return;
} catch (SocketTimeoutException toe) { } catch (ConnectTimeoutException toe) {
/* Check for an address change and update the local reference. /* Check for an address change and update the local reference.
* Reset the failure counter if the address was changed * Reset the failure counter if the address was changed
*/ */
if (updateAddress()) { if (updateAddress()) {
timeoutFailures = ioFailures = 0; timeoutFailures = ioFailures = 0;
} }
handleConnectionFailure(timeoutFailures++, handleConnectionTimeout(timeoutFailures++,
maxRetriesOnSocketTimeouts, toe); maxRetriesOnSocketTimeouts, toe);
} catch (IOException ie) { } catch (IOException ie) {
if (updateAddress()) { if (updateAddress()) {
@ -679,7 +680,7 @@ public class Client {
socket = null; socket = null;
} }
/* Handle connection failures /* Handle connection failures due to timeout on connect
* *
* If the current number of retries is equal to the max number of retries, * If the current number of retries is equal to the max number of retries,
* stop retrying and throw the exception; Otherwise backoff 1 second and * stop retrying and throw the exception; Otherwise backoff 1 second and
@ -693,7 +694,7 @@ public class Client {
* @param ioe failure reason * @param ioe failure reason
* @throws IOException if max number of retries is reached * @throws IOException if max number of retries is reached
*/ */
private void handleConnectionFailure( private void handleConnectionTimeout(
int curRetries, int maxRetries, IOException ioe) throws IOException { int curRetries, int maxRetries, IOException ioe) throws IOException {
closeConnection(); closeConnection();

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.net;
import java.net.SocketTimeoutException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Thrown by {@link NetUtils#connect(java.net.Socket, java.net.SocketAddress, int)}
* if it times out while connecting to the remote host.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ConnectTimeoutException extends SocketTimeoutException {
private static final long serialVersionUID = 1L;
public ConnectTimeoutException(String msg) {
super(msg);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.net;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.net.BindException; import java.net.BindException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -517,12 +518,16 @@ public class NetUtils {
socket.bind(localAddr); socket.bind(localAddr);
} }
try {
if (ch == null) { if (ch == null) {
// let the default implementation handle it. // let the default implementation handle it.
socket.connect(endpoint, timeout); socket.connect(endpoint, timeout);
} else { } else {
SocketIOWithTimeout.connect(ch, endpoint, timeout); SocketIOWithTimeout.connect(ch, endpoint, timeout);
} }
} catch (SocketTimeoutException ste) {
throw new ConnectTimeoutException(ste.getMessage());
}
// There is a very rare case allowed by the TCP specification, such that // There is a very rare case allowed by the TCP specification, such that
// if we are trying to connect to an endpoint on the local machine, // if we are trying to connect to an endpoint on the local machine,
@ -719,7 +724,7 @@ public class NetUtils {
+ see("BindException")); + see("BindException"));
} else if (exception instanceof ConnectException) { } else if (exception instanceof ConnectException) {
// connection refused; include the host:port in the error // connection refused; include the host:port in the error
return (ConnectException) new ConnectException( return wrapWithMessage(exception,
"Call From " "Call From "
+ localHost + localHost
+ " to " + " to "
@ -729,32 +734,28 @@ public class NetUtils {
+ " failed on connection exception: " + " failed on connection exception: "
+ exception + exception
+ ";" + ";"
+ see("ConnectionRefused")) + see("ConnectionRefused"));
.initCause(exception);
} else if (exception instanceof UnknownHostException) { } else if (exception instanceof UnknownHostException) {
return (UnknownHostException) new UnknownHostException( return wrapWithMessage(exception,
"Invalid host name: " "Invalid host name: "
+ getHostDetailsAsString(destHost, destPort, localHost) + getHostDetailsAsString(destHost, destPort, localHost)
+ exception + exception
+ ";" + ";"
+ see("UnknownHost")) + see("UnknownHost"));
.initCause(exception);
} else if (exception instanceof SocketTimeoutException) { } else if (exception instanceof SocketTimeoutException) {
return (SocketTimeoutException) new SocketTimeoutException( return wrapWithMessage(exception,
"Call From " "Call From "
+ localHost + " to " + destHost + ":" + destPort + localHost + " to " + destHost + ":" + destPort
+ " failed on socket timeout exception: " + exception + " failed on socket timeout exception: " + exception
+ ";" + ";"
+ see("SocketTimeout")) + see("SocketTimeout"));
.initCause(exception);
} else if (exception instanceof NoRouteToHostException) { } else if (exception instanceof NoRouteToHostException) {
return (NoRouteToHostException) new NoRouteToHostException( return wrapWithMessage(exception,
"No Route to Host from " "No Route to Host from "
+ localHost + " to " + destHost + ":" + destPort + localHost + " to " + destHost + ":" + destPort
+ " failed on socket timeout exception: " + exception + " failed on socket timeout exception: " + exception
+ ";" + ";"
+ see("NoRouteToHost")) + see("NoRouteToHost"));
.initCause(exception);
} }
else { else {
return (IOException) new IOException("Failed on local exception: " return (IOException) new IOException("Failed on local exception: "
@ -770,6 +771,21 @@ public class NetUtils {
return FOR_MORE_DETAILS_SEE + HADOOP_WIKI + entry; return FOR_MORE_DETAILS_SEE + HADOOP_WIKI + entry;
} }
@SuppressWarnings("unchecked")
private static <T extends IOException> T wrapWithMessage(
T exception, String msg) {
Class<? extends Throwable> clazz = exception.getClass();
try {
Constructor<? extends Throwable> ctor = clazz.getConstructor(String.class);
Throwable t = ctor.newInstance(msg);
return (T)(t.initCause(exception));
} catch (Throwable e) {
LOG.warn("Unable to wrap exception of type " +
clazz + ": it has no (String) constructor", e);
return exception;
}
}
/** /**
* Get the host details as a string * Get the host details as a string
* @param destHost destinatioon host (nullable) * @param destHost destinatioon host (nullable)

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import java.util.Random; import java.util.Random;
@ -586,7 +587,7 @@ public class TestIPC {
private void assertRetriesOnSocketTimeouts(Configuration conf, private void assertRetriesOnSocketTimeouts(Configuration conf,
int maxTimeoutRetries) throws IOException, InterruptedException { int maxTimeoutRetries) throws IOException, InterruptedException {
SocketFactory mockFactory = Mockito.mock(SocketFactory.class); SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
doThrow(new SocketTimeoutException()).when(mockFactory).createSocket(); doThrow(new ConnectTimeoutException("fake")).when(mockFactory).createSocket();
Client client = new Client(IntWritable.class, conf, mockFactory); Client client = new Client(IntWritable.class, conf, mockFactory);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090);
try { try {

View File

@ -457,6 +457,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4462. 2NN will fail to checkpoint after an HDFS upgrade from a HDFS-4462. 2NN will fail to checkpoint after an HDFS upgrade from a
pre-federation version of HDFS. (atm) pre-federation version of HDFS. (atm)
HDFS-4404. Create file failure when the machine of first attempted NameNode
is down. (Todd Lipcon via atm)
BREAKDOWN OF HDFS-3077 SUBTASKS BREAKDOWN OF HDFS-3077 SUBTASKS
HDFS-3077. Quorum-based protocol for reading and writing edit logs. HDFS-3077. Quorum-based protocol for reading and writing edit logs.

View File

@ -23,22 +23,34 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.StandardSocketFactory;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
public class TestDFSClientFailover { public class TestDFSClientFailover {
@ -91,6 +103,63 @@ public class TestDFSClientFailover {
fs.close(); fs.close();
} }
/**
* Test that even a non-idempotent method will properly fail-over if the
* first IPC attempt times out trying to connect. Regression test for
* HDFS-4404.
*/
@Test
public void testFailoverOnConnectTimeout() throws Exception {
conf.setClass(CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
InjectingSocketFactory.class, SocketFactory.class);
// Set up the InjectingSocketFactory to throw a ConnectTimeoutException
// when connecting to the first NN.
InjectingSocketFactory.portToInjectOn = cluster.getNameNodePort(0);
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
// Make the second NN the active one.
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
// Call a non-idempotent method, and ensure the failover of the call proceeds
// successfully.
IOUtils.closeStream(fs.create(TEST_FILE));
}
private static class InjectingSocketFactory extends StandardSocketFactory {
static SocketFactory defaultFactory = SocketFactory.getDefault();
static int portToInjectOn;
@Override
public Socket createSocket() throws IOException {
Socket spy = Mockito.spy(defaultFactory.createSocket());
// Simplify our spying job by not having to also spy on the channel
Mockito.doReturn(null).when(spy).getChannel();
// Throw a ConnectTimeoutException when connecting to our target "bad"
// host.
Mockito.doThrow(new ConnectTimeoutException("injected"))
.when(spy).connect(
Mockito.argThat(new MatchesPort()),
Mockito.anyInt());
return spy;
}
private class MatchesPort extends BaseMatcher<SocketAddress> {
@Override
public boolean matches(Object arg0) {
return ((InetSocketAddress)arg0).getPort() == portToInjectOn;
}
@Override
public void describeTo(Description desc) {
desc.appendText("matches port " + portToInjectOn);
}
}
}
/** /**
* Regression test for HDFS-2683. * Regression test for HDFS-2683.
*/ */