HADOOP-13546. Override equals and hashCode of the default retry policy to avoid connection leakage. Contributed by Xiaobing Zhou.

(cherry picked from commit 08d8e0ba259f01465a83d8db09466dfd46b7ec81)
(cherry picked from commit 55600b38a81b822f2bdc5d71dbfb1b7336c2c024)
This commit is contained in:
Jing Zhao 2016-09-13 11:12:52 -07:00
parent 1ac8206a85
commit 26597aa59b
5 changed files with 429 additions and 42 deletions

View File

@ -183,6 +183,20 @@ public RetryAction shouldRetry(Exception e, int retries, int failovers,
return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "try once " + return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "try once " +
"and fail."); "and fail.");
} }
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else {
return obj != null && obj.getClass() == this.getClass();
}
}
@Override
public int hashCode() {
return this.getClass().hashCode();
}
} }
static class RetryForever implements RetryPolicy { static class RetryForever implements RetryPolicy {

View File

@ -22,6 +22,7 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies.MultipleLinearRandomRetry;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -79,48 +80,85 @@ public static RetryPolicy getDefaultRetryPolicy(
//no retry //no retry
return RetryPolicies.TRY_ONCE_THEN_FAIL; return RetryPolicies.TRY_ONCE_THEN_FAIL;
} else { } else {
return new RetryPolicy() { return new WrapperRetryPolicy(
@Override (MultipleLinearRandomRetry) multipleLinearRandomRetry,
public RetryAction shouldRetry(Exception e, int retries, int failovers, remoteExceptionToRetry);
boolean isMethodIdempotent) throws Exception { }
if (e instanceof ServiceException) { }
//unwrap ServiceException
final Throwable cause = e.getCause();
if (cause != null && cause instanceof Exception) {
e = (Exception)cause;
}
}
//see (1) and (2) in the javadoc of this method. private static final class WrapperRetryPolicy implements RetryPolicy {
final RetryPolicy p; private MultipleLinearRandomRetry multipleLinearRandomRetry;
if (e instanceof RetriableException private String remoteExceptionToRetry;
|| RetryPolicies.getWrappedRetriableException(e) != null) {
// RetriableException or RetriableException wrapped
p = multipleLinearRandomRetry;
} else if (e instanceof RemoteException) {
final RemoteException re = (RemoteException)e;
p = remoteExceptionToRetry.equals(re.getClassName())?
multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
} else if (e instanceof IOException || e instanceof ServiceException) {
p = multipleLinearRandomRetry;
} else { //non-IOException
p = RetryPolicies.TRY_ONCE_THEN_FAIL;
}
if (LOG.isDebugEnabled()) { private WrapperRetryPolicy(
LOG.debug("RETRY " + retries + ") policy=" final MultipleLinearRandomRetry multipleLinearRandomRetry,
+ p.getClass().getSimpleName() + ", exception=" + e); final String remoteExceptionToRetry) {
} this.multipleLinearRandomRetry = multipleLinearRandomRetry;
return p.shouldRetry(e, retries, failovers, isMethodIdempotent); this.remoteExceptionToRetry = remoteExceptionToRetry;
}
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isMethodIdempotent) throws Exception {
if (e instanceof ServiceException) {
//unwrap ServiceException
final Throwable cause = e.getCause();
if (cause != null && cause instanceof Exception) {
e = (Exception)cause;
} }
}
@Override //see (1) and (2) in the javadoc of this method.
public String toString() { final RetryPolicy p;
return "RetryPolicy[" + multipleLinearRandomRetry + ", " if (e instanceof RetriableException
+ RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName() || RetryPolicies.getWrappedRetriableException(e) != null) {
+ "]"; // RetriableException or RetriableException wrapped
} p = multipleLinearRandomRetry;
}; } else if (e instanceof RemoteException) {
final RemoteException re = (RemoteException)e;
p = re.getClassName().equals(remoteExceptionToRetry)
? multipleLinearRandomRetry : RetryPolicies.TRY_ONCE_THEN_FAIL;
} else if (e instanceof IOException || e instanceof ServiceException) {
p = multipleLinearRandomRetry;
} else { //non-IOException
p = RetryPolicies.TRY_ONCE_THEN_FAIL;
}
if (LOG.isDebugEnabled()) {
LOG.debug("RETRY " + retries + ") policy="
+ p.getClass().getSimpleName() + ", exception=" + e);
}
return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
}
/**
* remoteExceptionToRetry is ignored as part of equals since it does not
* affect connection failure handling.
*/
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
} else {
return (obj instanceof WrapperRetryPolicy)
&& this.multipleLinearRandomRetry
.equals(((WrapperRetryPolicy) obj).multipleLinearRandomRetry);
}
}
/**
* Similarly, remoteExceptionToRetry is ignored as part of hashCode since it
* does not affect connection failure handling.
*/
@Override
public int hashCode() {
return multipleLinearRandomRetry.hashCode();
}
@Override
public String toString() {
return "RetryPolicy[" + multipleLinearRandomRetry + ", "
+ RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName() + "]";
} }
} }

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.retry;
import static org.junit.Assert.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RpcNoSuchMethodException;
import org.junit.Test;
/**
* This class mainly tests behaviors of various retry policies in connection
* level.
*/
public class TestConnectionRetryPolicy {
private static RetryPolicy getDefaultRetryPolicy(
final boolean defaultRetryPolicyEnabled,
final String defaultRetryPolicySpec,
final String remoteExceptionToRetry) {
return getDefaultRetryPolicy(
new Configuration(),
defaultRetryPolicyEnabled,
defaultRetryPolicySpec,
remoteExceptionToRetry);
}
private static RetryPolicy getDefaultRetryPolicy(
final boolean defaultRetryPolicyEnabled,
final String defaultRetryPolicySpec) {
return getDefaultRetryPolicy(
new Configuration(),
defaultRetryPolicyEnabled,
defaultRetryPolicySpec,
"");
}
public static RetryPolicy getDefaultRetryPolicy(
final Configuration conf,
final boolean defaultRetryPolicyEnabled,
final String defaultRetryPolicySpec,
final String remoteExceptionToRetry) {
return RetryUtils.getDefaultRetryPolicy(
conf,
"org.apache.hadoop.io.retry.TestConnectionRetryPolicy.No.Such.Key",
defaultRetryPolicyEnabled,
"org.apache.hadoop.io.retry.TestConnectionRetryPolicy.No.Such.Key",
defaultRetryPolicySpec,
"");
}
@Test(timeout = 60000)
public void testDefaultRetryPolicyEquivalence() {
RetryPolicy rp1 = null;
RetryPolicy rp2 = null;
RetryPolicy rp3 = null;
/* test the same setting */
rp1 = getDefaultRetryPolicy(true, "10000,2");
rp2 = getDefaultRetryPolicy(true, "10000,2");
rp3 = getDefaultRetryPolicy(true, "10000,2");
verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3});
/* test different remoteExceptionToRetry */
rp1 = getDefaultRetryPolicy(
true,
"10000,2",
new RemoteException(
PathIOException.class.getName(),
"path IO exception").getClassName());
rp2 = getDefaultRetryPolicy(
true,
"10000,2",
new RemoteException(
RpcNoSuchMethodException.class.getName(),
"no such method exception").getClassName());
rp3 = getDefaultRetryPolicy(
true,
"10000,2",
new RemoteException(
RetriableException.class.getName(),
"retriable exception").getClassName());
verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3});
/* test enabled and different specifications */
rp1 = getDefaultRetryPolicy(true, "20000,3");
rp2 = getDefaultRetryPolicy(true, "30000,4");
assertNotEquals("should not be equal", rp1, rp2);
assertNotEquals(
"should not have the same hash code",
rp1.hashCode(),
rp2.hashCode());
/* test disabled and the same specifications */
rp1 = getDefaultRetryPolicy(false, "40000,5");
rp2 = getDefaultRetryPolicy(false, "40000,5");
assertEquals("should be equal", rp1, rp2);
assertEquals(
"should have the same hash code",
rp1, rp2);
/* test the disabled and different specifications */
rp1 = getDefaultRetryPolicy(false, "50000,6");
rp2 = getDefaultRetryPolicy(false, "60000,7");
assertEquals("should be equal", rp1, rp2);
assertEquals(
"should have the same hash code",
rp1, rp2);
}
public static RetryPolicy newTryOnceThenFail() {
return new RetryPolicies.TryOnceThenFail();
}
@Test(timeout = 60000)
public void testTryOnceThenFailEquivalence() throws Exception {
final RetryPolicy rp1 = newTryOnceThenFail();
final RetryPolicy rp2 = newTryOnceThenFail();
final RetryPolicy rp3 = newTryOnceThenFail();
verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3});
}
private void verifyRetryPolicyEquivalence(RetryPolicy[] polices) {
for (int i = 0; i < polices.length; i++) {
for (int j = 0; j < polices.length; j++) {
if (i != j) {
assertEquals("should be equal", polices[i], polices[j]);
assertEquals(
"should have the same hash code",
polices[i].hashCode(),
polices[j].hashCode());
}
}
}
}
}

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import static org.junit.Assert.*;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.TestConnectionRetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.TestRpcBase.TestRpcService;
import org.junit.Before;
import org.junit.Test;
/**
* This class mainly tests behaviors of reusing RPC connections for various
* retry policies.
*/
public class TestReuseRpcConnections extends TestRpcBase {
@Before
public void setup() {
setupConf();
}
private static RetryPolicy getDefaultRetryPolicy(
final boolean defaultRetryPolicyEnabled,
final String defaultRetryPolicySpec) {
return TestConnectionRetryPolicy.getDefaultRetryPolicy(
conf,
defaultRetryPolicyEnabled,
defaultRetryPolicySpec,
"");
}
private static RetryPolicy getDefaultRetryPolicy(
final boolean defaultRetryPolicyEnabled,
final String defaultRetryPolicySpec,
final String remoteExceptionToRetry) {
return TestConnectionRetryPolicy.getDefaultRetryPolicy(
conf,
defaultRetryPolicyEnabled,
defaultRetryPolicySpec,
remoteExceptionToRetry);
}
@Test(timeout = 60000)
public void testDefaultRetryPolicyReuseConnections() throws Exception {
RetryPolicy rp1 = null;
RetryPolicy rp2 = null;
RetryPolicy rp3 = null;
/* test the same setting */
rp1 = getDefaultRetryPolicy(true, "10000,2");
rp2 = getDefaultRetryPolicy(true, "10000,2");
verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
/* test enabled and different specifications */
rp1 = getDefaultRetryPolicy(true, "20000,3");
rp2 = getDefaultRetryPolicy(true, "20000,3");
rp3 = getDefaultRetryPolicy(true, "30000,4");
verifyRetryPolicyReuseConnections(rp1, rp2, rp3);
/* test disabled and the same specifications */
rp1 = getDefaultRetryPolicy(false, "40000,5");
rp2 = getDefaultRetryPolicy(false, "40000,5");
verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
/* test disabled and different specifications */
rp1 = getDefaultRetryPolicy(false, "50000,6");
rp2 = getDefaultRetryPolicy(false, "60000,7");
verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
/* test different remoteExceptionToRetry */
rp1 = getDefaultRetryPolicy(
true,
"70000,8",
new RemoteException(
RpcNoSuchMethodException.class.getName(),
"no such method exception").getClassName());
rp2 = getDefaultRetryPolicy(
true,
"70000,8",
new RemoteException(
PathIOException.class.getName(),
"path IO exception").getClassName());
verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
}
@Test(timeout = 60000)
public void testRetryPolicyTryOnceThenFail() throws Exception {
final RetryPolicy rp1 = TestConnectionRetryPolicy.newTryOnceThenFail();
final RetryPolicy rp2 = TestConnectionRetryPolicy.newTryOnceThenFail();
verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
}
private void verifyRetryPolicyReuseConnections(
final RetryPolicy retryPolicy1,
final RetryPolicy retryPolicy2,
final RetryPolicy anotherRetryPolicy) throws Exception {
final Server server = setupTestServer(conf, 2);
final Configuration newConf = new Configuration(conf);
newConf.set(
CommonConfigurationKeysPublic
.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
"");
Client client = null;
TestRpcService proxy1 = null;
TestRpcService proxy2 = null;
TestRpcService proxy3 = null;
try {
proxy1 = getClient(addr, newConf, retryPolicy1);
proxy1.ping(null, newEmptyRequest());
client = ProtobufRpcEngine.getClient(newConf);
final Set<ConnectionId> conns = client.getConnectionIds();
assertEquals("number of connections in cache is wrong", 1, conns.size());
/*
* another equivalent retry policy, reuse connection
*/
proxy2 = getClient(addr, newConf, retryPolicy2);
proxy2.ping(null, newEmptyRequest());
assertEquals("number of connections in cache is wrong", 1, conns.size());
/*
* different retry policy, create a new connection
*/
proxy3 = getClient(addr, newConf, anotherRetryPolicy);
proxy3.ping(null, newEmptyRequest());
assertEquals("number of connections in cache is wrong", 2, conns.size());
} finally {
server.stop();
// this is dirty, but clear out connection cache for next run
if (client != null) {
client.getConnectionIds().clear();
}
if (proxy1 != null) {
RPC.stopProxy(proxy1);
}
if (proxy2 != null) {
RPC.stopProxy(proxy2);
}
if (proxy3 != null) {
RPC.stopProxy(proxy3);
}
}
}
}

View File

@ -30,18 +30,15 @@
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.protobuf.TestProtos; import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenSelector;
import org.junit.Assert;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
@ -132,6 +129,24 @@ protected static TestRpcService getClient(InetSocketAddress serverAddr,
} }
} }
protected static TestRpcService getClient(InetSocketAddress serverAddr,
Configuration clientConf, final RetryPolicy connectionRetryPolicy)
throws ServiceException {
try {
return RPC.getProtocolProxy(
TestRpcService.class,
0,
serverAddr,
UserGroupInformation.getCurrentUser(),
clientConf,
NetUtils.getDefaultSocketFactory(clientConf),
RPC.getRpcTimeout(clientConf),
connectionRetryPolicy, null).getProxy();
} catch (IOException e) {
throw new ServiceException(e);
}
}
protected static void stop(Server server, TestRpcService proxy) { protected static void stop(Server server, TestRpcService proxy) {
if (proxy != null) { if (proxy != null) {
try { try {