HDFS-14161. RBF: Throw StandbyException instead of IOException so that client can retry when can not get connection. Contributed by Fei Hui.
This commit is contained in:
parent
1dc01e59af
commit
f3cbf0eb9a
|
@ -0,0 +1,33 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exception when can not get a non-null connection.
|
||||||
|
*/
|
||||||
|
public class ConnectionNullException extends IOException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public ConnectionNullException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
|
@ -270,7 +270,8 @@ public class RouterRpcClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
throw new IOException("Cannot get a connection to " + rpcAddress);
|
throw new ConnectionNullException("Cannot get a connection to "
|
||||||
|
+ rpcAddress);
|
||||||
}
|
}
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
@ -363,9 +364,9 @@ public class RouterRpcClient {
|
||||||
Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
|
Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
|
||||||
for (FederationNamenodeContext namenode : namenodes) {
|
for (FederationNamenodeContext namenode : namenodes) {
|
||||||
ConnectionContext connection = null;
|
ConnectionContext connection = null;
|
||||||
|
String nsId = namenode.getNameserviceId();
|
||||||
|
String rpcAddress = namenode.getRpcAddress();
|
||||||
try {
|
try {
|
||||||
String nsId = namenode.getNameserviceId();
|
|
||||||
String rpcAddress = namenode.getRpcAddress();
|
|
||||||
connection = this.getConnection(ugi, nsId, rpcAddress, protocol);
|
connection = this.getConnection(ugi, nsId, rpcAddress, protocol);
|
||||||
ProxyAndInfo<?> client = connection.getClient();
|
ProxyAndInfo<?> client = connection.getClient();
|
||||||
final Object proxy = client.getProxy();
|
final Object proxy = client.getProxy();
|
||||||
|
@ -394,6 +395,16 @@ public class RouterRpcClient {
|
||||||
}
|
}
|
||||||
// RemoteException returned by NN
|
// RemoteException returned by NN
|
||||||
throw (RemoteException) ioe;
|
throw (RemoteException) ioe;
|
||||||
|
} else if (ioe instanceof ConnectionNullException) {
|
||||||
|
if (this.rpcMonitor != null) {
|
||||||
|
this.rpcMonitor.proxyOpFailureCommunicate();
|
||||||
|
}
|
||||||
|
LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress,
|
||||||
|
ioe.getMessage());
|
||||||
|
// Throw StandbyException so that client can retry
|
||||||
|
StandbyException se = new StandbyException(ioe.getMessage());
|
||||||
|
se.initCause(ioe);
|
||||||
|
throw se;
|
||||||
} else {
|
} else {
|
||||||
// Other communication error, this is a failure
|
// Other communication error, this is a failure
|
||||||
// Communication retries are handled by the retry policy
|
// Communication retries are handled by the retry policy
|
||||||
|
@ -425,7 +436,8 @@ public class RouterRpcClient {
|
||||||
String addr = namenode.getRpcAddress();
|
String addr = namenode.getRpcAddress();
|
||||||
IOException ioe = entry.getValue();
|
IOException ioe = entry.getValue();
|
||||||
if (ioe instanceof StandbyException) {
|
if (ioe instanceof StandbyException) {
|
||||||
LOG.error("{} {} at {} is in Standby", nsId, nnId, addr);
|
LOG.error("{} {} at {} is in Standby: {}", nsId, nnId, addr,
|
||||||
|
ioe.getMessage());
|
||||||
} else {
|
} else {
|
||||||
LOG.error("{} {} at {} error: \"{}\"",
|
LOG.error("{} {} at {} error: \"{}\"",
|
||||||
nsId, nnId, addr, ioe.getMessage());
|
nsId, nnId, addr, ioe.getMessage());
|
||||||
|
|
|
@ -52,6 +52,9 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.router.ConnectionManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||||
|
@ -60,6 +63,7 @@ import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
|
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.Whitebox;
|
import org.apache.hadoop.test.Whitebox;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
@ -343,4 +347,31 @@ public final class FederationTestUtils {
|
||||||
}
|
}
|
||||||
}, 100, timeout);
|
}, 100, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate that a RouterRpcServer, the ConnectionManager of its
|
||||||
|
* RouterRpcClient throws IOException when call getConnection. So the
|
||||||
|
* RouterRpcClient will get a null Connection.
|
||||||
|
* @param server RouterRpcServer
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void simulateThrowExceptionRouterRpcServer(
|
||||||
|
final RouterRpcServer server) throws IOException {
|
||||||
|
RouterRpcClient rpcClient = server.getRPCClient();
|
||||||
|
ConnectionManager connectionManager =
|
||||||
|
new ConnectionManager(server.getConfig());
|
||||||
|
ConnectionManager spyConnectionManager = spy(connectionManager);
|
||||||
|
doAnswer(new Answer() {
|
||||||
|
@Override
|
||||||
|
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
LOG.info("Simulating connectionManager throw IOException {}",
|
||||||
|
invocation.getMock());
|
||||||
|
throw new IOException("Simulate connectionManager throw IOException");
|
||||||
|
}
|
||||||
|
}).when(spyConnectionManager).getConnection(
|
||||||
|
any(UserGroupInformation.class), any(String.class), any(Class.class));
|
||||||
|
|
||||||
|
Whitebox.setInternalState(rpcClient, "connectionManager",
|
||||||
|
spyConnectionManager);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.federation.router;
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer;
|
||||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -240,4 +241,43 @@ public class TestRouterClientRejectOverload {
|
||||||
num <= expOverloadMax);
|
num <= expOverloadMax);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConnectionNullException() throws Exception {
|
||||||
|
setupCluster(false);
|
||||||
|
|
||||||
|
// Choose 1st router
|
||||||
|
RouterContext routerContext = cluster.getRouters().get(0);
|
||||||
|
Router router = routerContext.getRouter();
|
||||||
|
// This router will throw ConnectionNullException
|
||||||
|
simulateThrowExceptionRouterRpcServer(router.getRpcServer());
|
||||||
|
|
||||||
|
// Set dfs.client.failover.random.order false, to pick 1st router at first
|
||||||
|
Configuration conf = cluster.getRouterClientConf();
|
||||||
|
conf.setBoolean("dfs.client.failover.random.order", false);
|
||||||
|
// Client to access Router Cluster
|
||||||
|
DFSClient routerClient =
|
||||||
|
new DFSClient(new URI("hdfs://fed"), conf);
|
||||||
|
|
||||||
|
// Get router0 metrics
|
||||||
|
FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0)
|
||||||
|
.getRouter().getRpcServer().getRPCMetrics();
|
||||||
|
// Get router1 metrics
|
||||||
|
FederationRPCMetrics rpcMetrics1 = cluster.getRouters().get(1)
|
||||||
|
.getRouter().getRpcServer().getRPCMetrics();
|
||||||
|
|
||||||
|
// Original failures
|
||||||
|
long originalRouter0Failures = rpcMetrics0.getProxyOpFailureCommunicate();
|
||||||
|
long originalRouter1Failures = rpcMetrics1.getProxyOpFailureCommunicate();
|
||||||
|
|
||||||
|
// RPC call must be successful
|
||||||
|
routerClient.getFileInfo("/");
|
||||||
|
|
||||||
|
// Router 0 failures will increase
|
||||||
|
assertEquals(originalRouter0Failures + 1,
|
||||||
|
rpcMetrics0.getProxyOpFailureCommunicate());
|
||||||
|
// Router 1 failures will not change
|
||||||
|
assertEquals(originalRouter1Failures,
|
||||||
|
rpcMetrics1.getProxyOpFailureCommunicate());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue