HADOOP-15655. Enhance KMS client retry behavior. Contributed by Kitti Nanasi.
This commit is contained in:
parent
865650052b
commit
a630a27c53
|
@ -113,8 +113,8 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
return providers;
|
return providers;
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> T doOp(ProviderCallable<T> op, int currPos)
|
private <T> T doOp(ProviderCallable<T> op, int currPos,
|
||||||
throws IOException {
|
boolean isIdempotent) throws IOException {
|
||||||
if (providers.length == 0) {
|
if (providers.length == 0) {
|
||||||
throw new IOException("No providers configured !");
|
throw new IOException("No providers configured !");
|
||||||
}
|
}
|
||||||
|
@ -143,7 +143,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
}
|
}
|
||||||
RetryAction action = null;
|
RetryAction action = null;
|
||||||
try {
|
try {
|
||||||
action = retryPolicy.shouldRetry(ioe, 0, numFailovers, false);
|
action = retryPolicy.shouldRetry(ioe, 0, numFailovers, isIdempotent);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (e instanceof IOException) {
|
if (e instanceof IOException) {
|
||||||
throw (IOException)e;
|
throw (IOException)e;
|
||||||
|
@ -201,7 +201,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
public Token<?>[] call(KMSClientProvider provider) throws IOException {
|
public Token<?>[] call(KMSClientProvider provider) throws IOException {
|
||||||
return provider.addDelegationTokens(renewer, credentials);
|
return provider.addDelegationTokens(renewer, credentials);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -211,7 +211,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
public Long call(KMSClientProvider provider) throws IOException {
|
public Long call(KMSClientProvider provider) throws IOException {
|
||||||
return provider.renewDelegationToken(token);
|
return provider.renewDelegationToken(token);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -222,7 +222,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
provider.cancelDelegationToken(token);
|
provider.cancelDelegationToken(token);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// This request is sent to all providers in the load-balancing group
|
// This request is sent to all providers in the load-balancing group
|
||||||
|
@ -275,7 +275,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
throws IOException, GeneralSecurityException {
|
throws IOException, GeneralSecurityException {
|
||||||
return provider.generateEncryptedKey(encryptionKeyName);
|
return provider.generateEncryptedKey(encryptionKeyName);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), true);
|
||||||
} catch (WrapperException we) {
|
} catch (WrapperException we) {
|
||||||
if (we.getCause() instanceof GeneralSecurityException) {
|
if (we.getCause() instanceof GeneralSecurityException) {
|
||||||
throw (GeneralSecurityException) we.getCause();
|
throw (GeneralSecurityException) we.getCause();
|
||||||
|
@ -295,7 +295,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
throws IOException, GeneralSecurityException {
|
throws IOException, GeneralSecurityException {
|
||||||
return provider.decryptEncryptedKey(encryptedKeyVersion);
|
return provider.decryptEncryptedKey(encryptedKeyVersion);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), true);
|
||||||
} catch (WrapperException we) {
|
} catch (WrapperException we) {
|
||||||
if (we.getCause() instanceof GeneralSecurityException) {
|
if (we.getCause() instanceof GeneralSecurityException) {
|
||||||
throw (GeneralSecurityException) we.getCause();
|
throw (GeneralSecurityException) we.getCause();
|
||||||
|
@ -315,7 +315,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
throws IOException, GeneralSecurityException {
|
throws IOException, GeneralSecurityException {
|
||||||
return provider.reencryptEncryptedKey(ekv);
|
return provider.reencryptEncryptedKey(ekv);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), true);
|
||||||
} catch (WrapperException we) {
|
} catch (WrapperException we) {
|
||||||
if (we.getCause() instanceof GeneralSecurityException) {
|
if (we.getCause() instanceof GeneralSecurityException) {
|
||||||
throw (GeneralSecurityException) we.getCause();
|
throw (GeneralSecurityException) we.getCause();
|
||||||
|
@ -335,7 +335,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
provider.reencryptEncryptedKeys(ekvs);
|
provider.reencryptEncryptedKeys(ekvs);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), true);
|
||||||
} catch (WrapperException we) {
|
} catch (WrapperException we) {
|
||||||
if (we.getCause() instanceof GeneralSecurityException) {
|
if (we.getCause() instanceof GeneralSecurityException) {
|
||||||
throw (GeneralSecurityException) we.getCause();
|
throw (GeneralSecurityException) we.getCause();
|
||||||
|
@ -351,7 +351,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
public KeyVersion call(KMSClientProvider provider) throws IOException {
|
public KeyVersion call(KMSClientProvider provider) throws IOException {
|
||||||
return provider.getKeyVersion(versionName);
|
return provider.getKeyVersion(versionName);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -361,7 +361,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
public List<String> call(KMSClientProvider provider) throws IOException {
|
public List<String> call(KMSClientProvider provider) throws IOException {
|
||||||
return provider.getKeys();
|
return provider.getKeys();
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -371,7 +371,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
public Metadata[] call(KMSClientProvider provider) throws IOException {
|
public Metadata[] call(KMSClientProvider provider) throws IOException {
|
||||||
return provider.getKeysMetadata(names);
|
return provider.getKeysMetadata(names);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -382,7 +382,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return provider.getKeyVersions(name);
|
return provider.getKeyVersions(name);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -392,8 +392,9 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
public KeyVersion call(KMSClientProvider provider) throws IOException {
|
public KeyVersion call(KMSClientProvider provider) throws IOException {
|
||||||
return provider.getCurrentKey(name);
|
return provider.getCurrentKey(name);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Metadata getMetadata(final String name) throws IOException {
|
public Metadata getMetadata(final String name) throws IOException {
|
||||||
return doOp(new ProviderCallable<Metadata>() {
|
return doOp(new ProviderCallable<Metadata>() {
|
||||||
|
@ -401,7 +402,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
public Metadata call(KMSClientProvider provider) throws IOException {
|
public Metadata call(KMSClientProvider provider) throws IOException {
|
||||||
return provider.getMetadata(name);
|
return provider.getMetadata(name);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -412,7 +413,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
public KeyVersion call(KMSClientProvider provider) throws IOException {
|
public KeyVersion call(KMSClientProvider provider) throws IOException {
|
||||||
return provider.createKey(name, material, options);
|
return provider.createKey(name, material, options);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -425,7 +426,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
NoSuchAlgorithmException {
|
NoSuchAlgorithmException {
|
||||||
return provider.createKey(name, options);
|
return provider.createKey(name, options);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), false);
|
||||||
} catch (WrapperException e) {
|
} catch (WrapperException e) {
|
||||||
if (e.getCause() instanceof GeneralSecurityException) {
|
if (e.getCause() instanceof GeneralSecurityException) {
|
||||||
throw (NoSuchAlgorithmException) e.getCause();
|
throw (NoSuchAlgorithmException) e.getCause();
|
||||||
|
@ -442,7 +443,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
provider.deleteKey(name);
|
provider.deleteKey(name);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -453,7 +454,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
public KeyVersion call(KMSClientProvider provider) throws IOException {
|
public KeyVersion call(KMSClientProvider provider) throws IOException {
|
||||||
return provider.rollNewVersion(name, material);
|
return provider.rollNewVersion(name, material);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), false);
|
||||||
invalidateCache(name);
|
invalidateCache(name);
|
||||||
return newVersion;
|
return newVersion;
|
||||||
}
|
}
|
||||||
|
@ -468,7 +469,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
||||||
NoSuchAlgorithmException {
|
NoSuchAlgorithmException {
|
||||||
return provider.rollNewVersion(name);
|
return provider.rollNewVersion(name);
|
||||||
}
|
}
|
||||||
}, nextIdx());
|
}, nextIdx(), false);
|
||||||
invalidateCache(name);
|
invalidateCache(name);
|
||||||
return newVersion;
|
return newVersion;
|
||||||
} catch (WrapperException e) {
|
} catch (WrapperException e) {
|
||||||
|
|
|
@ -29,10 +29,13 @@ import static org.mockito.Mockito.verify;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.net.NoRouteToHostException;
|
import java.net.NoRouteToHostException;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.security.GeneralSecurityException;
|
import java.security.GeneralSecurityException;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import javax.net.ssl.SSLHandshakeException;
|
import javax.net.ssl.SSLHandshakeException;
|
||||||
|
|
||||||
|
@ -355,24 +358,27 @@ public class TestLoadBalancingKMSClientProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests whether retryPolicy fails immediately, after trying each provider
|
* Tests whether retryPolicy fails immediately on non-idempotent operations,
|
||||||
* once, on encountering IOException which is not SocketException.
|
* after trying each provider once,
|
||||||
|
* on encountering IOException which is not SocketException.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testClientRetriesWithIOException() throws Exception {
|
public void testClientRetriesNonIdempotentOpWithIOExceptionFailsImmediately()
|
||||||
|
throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
final String keyName = "test";
|
||||||
// Setting total failover attempts to .
|
// Setting total failover attempts to .
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10);
|
CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10);
|
||||||
KMSClientProvider p1 = mock(KMSClientProvider.class);
|
KMSClientProvider p1 = mock(KMSClientProvider.class);
|
||||||
when(p1.getMetadata(Mockito.anyString()))
|
when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class)))
|
||||||
.thenThrow(new IOException("p1"));
|
.thenThrow(new IOException("p1"));
|
||||||
KMSClientProvider p2 = mock(KMSClientProvider.class);
|
KMSClientProvider p2 = mock(KMSClientProvider.class);
|
||||||
when(p2.getMetadata(Mockito.anyString()))
|
when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class)))
|
||||||
.thenThrow(new IOException("p2"));
|
.thenThrow(new IOException("p2"));
|
||||||
KMSClientProvider p3 = mock(KMSClientProvider.class);
|
KMSClientProvider p3 = mock(KMSClientProvider.class);
|
||||||
when(p3.getMetadata(Mockito.anyString()))
|
when(p3.createKey(Mockito.anyString(), Mockito.any(Options.class)))
|
||||||
.thenThrow(new IOException("p3"));
|
.thenThrow(new IOException("p3"));
|
||||||
|
|
||||||
when(p1.getKMSUrl()).thenReturn("p1");
|
when(p1.getKMSUrl()).thenReturn("p1");
|
||||||
|
@ -381,17 +387,61 @@ public class TestLoadBalancingKMSClientProvider {
|
||||||
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
|
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
|
||||||
new KMSClientProvider[] {p1, p2, p3}, 0, conf);
|
new KMSClientProvider[] {p1, p2, p3}, 0, conf);
|
||||||
try {
|
try {
|
||||||
kp.getMetadata("test3");
|
kp.createKey(keyName, new Options(conf));
|
||||||
fail("Should fail since all providers threw an IOException");
|
fail("Should fail since all providers threw an IOException");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
assertTrue(e instanceof IOException);
|
assertTrue(e instanceof IOException);
|
||||||
}
|
}
|
||||||
verify(kp.getProviders()[0], Mockito.times(1))
|
verify(kp.getProviders()[0], Mockito.times(1))
|
||||||
.getMetadata(Mockito.eq("test3"));
|
.createKey(Mockito.eq(keyName), Mockito.any(Options.class));
|
||||||
verify(kp.getProviders()[1], Mockito.times(1))
|
verify(kp.getProviders()[1], Mockito.times(1))
|
||||||
.getMetadata(Mockito.eq("test3"));
|
.createKey(Mockito.eq(keyName), Mockito.any(Options.class));
|
||||||
verify(kp.getProviders()[2], Mockito.times(1))
|
verify(kp.getProviders()[2], Mockito.times(1))
|
||||||
.getMetadata(Mockito.eq("test3"));
|
.createKey(Mockito.eq(keyName), Mockito.any(Options.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests whether retryPolicy retries on idempotent operations
|
||||||
|
* when encountering IOException.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testClientRetriesIdempotentOpWithIOExceptionSucceedsSecondTime()
|
||||||
|
throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
final String keyName = "test";
|
||||||
|
final KeyProvider.KeyVersion keyVersion
|
||||||
|
= new KMSClientProvider.KMSKeyVersion(keyName, "v1",
|
||||||
|
new byte[0]);
|
||||||
|
// Setting total failover attempts to .
|
||||||
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10);
|
||||||
|
KMSClientProvider p1 = mock(KMSClientProvider.class);
|
||||||
|
when(p1.getCurrentKey(Mockito.anyString()))
|
||||||
|
.thenThrow(new IOException("p1"))
|
||||||
|
.thenReturn(keyVersion);
|
||||||
|
KMSClientProvider p2 = mock(KMSClientProvider.class);
|
||||||
|
when(p2.getCurrentKey(Mockito.anyString()))
|
||||||
|
.thenThrow(new IOException("p2"));
|
||||||
|
KMSClientProvider p3 = mock(KMSClientProvider.class);
|
||||||
|
when(p3.getCurrentKey(Mockito.anyString()))
|
||||||
|
.thenThrow(new IOException("p3"));
|
||||||
|
|
||||||
|
when(p1.getKMSUrl()).thenReturn("p1");
|
||||||
|
when(p2.getKMSUrl()).thenReturn("p2");
|
||||||
|
when(p3.getKMSUrl()).thenReturn("p3");
|
||||||
|
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
|
||||||
|
new KMSClientProvider[] {p1, p2, p3}, 0, conf);
|
||||||
|
|
||||||
|
KeyProvider.KeyVersion result = kp.getCurrentKey(keyName);
|
||||||
|
|
||||||
|
assertEquals(keyVersion, result);
|
||||||
|
verify(kp.getProviders()[0], Mockito.times(2))
|
||||||
|
.getCurrentKey(Mockito.eq(keyName));
|
||||||
|
verify(kp.getProviders()[1], Mockito.times(1))
|
||||||
|
.getCurrentKey(Mockito.eq(keyName));
|
||||||
|
verify(kp.getProviders()[2], Mockito.times(1))
|
||||||
|
.getCurrentKey(Mockito.eq(keyName));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -717,4 +767,115 @@ public class TestLoadBalancingKMSClientProvider {
|
||||||
verify(p2, Mockito.times(1)).createKey(Mockito.eq(keyName),
|
verify(p2, Mockito.times(1)).createKey(Mockito.eq(keyName),
|
||||||
Mockito.any(Options.class));
|
Mockito.any(Options.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that if an idempotent operation succeeds second time after
|
||||||
|
* SocketTimeoutException, then the operation is successful.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testClientRetriesIdempotentOpWithSocketTimeoutExceptionSucceeds()
|
||||||
|
throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 3);
|
||||||
|
final List<String> keys = Arrays.asList("testKey");
|
||||||
|
KMSClientProvider p1 = mock(KMSClientProvider.class);
|
||||||
|
when(p1.getKeys())
|
||||||
|
.thenThrow(new SocketTimeoutException("p1"))
|
||||||
|
.thenReturn(keys);
|
||||||
|
KMSClientProvider p2 = mock(KMSClientProvider.class);
|
||||||
|
when(p2.getKeys()).thenThrow(new SocketTimeoutException("p2"));
|
||||||
|
|
||||||
|
when(p1.getKMSUrl()).thenReturn("p1");
|
||||||
|
when(p2.getKMSUrl()).thenReturn("p2");
|
||||||
|
|
||||||
|
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
|
||||||
|
new KMSClientProvider[] {p1, p2}, 0, conf);
|
||||||
|
|
||||||
|
List<String> result = kp.getKeys();
|
||||||
|
assertEquals(keys, result);
|
||||||
|
verify(p1, Mockito.times(2)).getKeys();
|
||||||
|
verify(p2, Mockito.times(1)).getKeys();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that if a non idempotent operation fails at every attempt
|
||||||
|
* after SocketTimeoutException, then SocketTimeoutException is thrown.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testClientRetriesIdempotentOpWithSocketTimeoutExceptionFails()
|
||||||
|
throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 2);
|
||||||
|
final String keyName = "test";
|
||||||
|
final String exceptionMessage = "p1 exception message";
|
||||||
|
KMSClientProvider p1 = mock(KMSClientProvider.class);
|
||||||
|
Exception originalEx = new SocketTimeoutException(exceptionMessage);
|
||||||
|
when(p1.getKeyVersions(Mockito.anyString()))
|
||||||
|
.thenThrow(originalEx);
|
||||||
|
KMSClientProvider p2 = mock(KMSClientProvider.class);
|
||||||
|
when(p2.getKeyVersions(Mockito.anyString()))
|
||||||
|
.thenThrow(new SocketTimeoutException("p2 exception message"));
|
||||||
|
|
||||||
|
when(p1.getKMSUrl()).thenReturn("p1");
|
||||||
|
when(p2.getKMSUrl()).thenReturn("p2");
|
||||||
|
|
||||||
|
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
|
||||||
|
new KMSClientProvider[] {p1, p2}, 0, conf);
|
||||||
|
|
||||||
|
Exception interceptedEx = intercept(SocketTimeoutException.class,
|
||||||
|
"SocketTimeoutException: " + exceptionMessage,
|
||||||
|
()-> kp.getKeyVersions(keyName));
|
||||||
|
assertEquals(originalEx, interceptedEx);
|
||||||
|
|
||||||
|
verify(p1, Mockito.times(2))
|
||||||
|
.getKeyVersions(Mockito.eq(keyName));
|
||||||
|
verify(p2, Mockito.times(1))
|
||||||
|
.getKeyVersions(Mockito.eq(keyName));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests whether retryPolicy fails immediately on non-idempotent operations,
|
||||||
|
* after trying each provider once, on encountering SocketTimeoutException.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testClientRetriesNonIdempotentOpWithSocketTimeoutExceptionFails()
|
||||||
|
throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
final String keyName = "test";
|
||||||
|
// Setting total failover attempts to .
|
||||||
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10);
|
||||||
|
KMSClientProvider p1 = mock(KMSClientProvider.class);
|
||||||
|
when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class)))
|
||||||
|
.thenThrow(new SocketTimeoutException("p1"));
|
||||||
|
KMSClientProvider p2 = mock(KMSClientProvider.class);
|
||||||
|
when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class)))
|
||||||
|
.thenThrow(new SocketTimeoutException("p2"));
|
||||||
|
KMSClientProvider p3 = mock(KMSClientProvider.class);
|
||||||
|
when(p3.createKey(Mockito.anyString(), Mockito.any(Options.class)))
|
||||||
|
.thenThrow(new SocketTimeoutException("p3"));
|
||||||
|
|
||||||
|
when(p1.getKMSUrl()).thenReturn("p1");
|
||||||
|
when(p2.getKMSUrl()).thenReturn("p2");
|
||||||
|
when(p3.getKMSUrl()).thenReturn("p3");
|
||||||
|
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
|
||||||
|
new KMSClientProvider[] {p1, p2, p3}, 0, conf);
|
||||||
|
try {
|
||||||
|
kp.createKey(keyName, new Options(conf));
|
||||||
|
fail("Should fail since all providers threw a SocketTimeoutException");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertTrue(e instanceof SocketTimeoutException);
|
||||||
|
}
|
||||||
|
verify(kp.getProviders()[0], Mockito.times(1))
|
||||||
|
.createKey(Mockito.eq(keyName), Mockito.any(Options.class));
|
||||||
|
verify(kp.getProviders()[1], Mockito.times(1))
|
||||||
|
.createKey(Mockito.eq(keyName), Mockito.any(Options.class));
|
||||||
|
verify(kp.getProviders()[2], Mockito.times(1))
|
||||||
|
.createKey(Mockito.eq(keyName), Mockito.any(Options.class));
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue