Improve threadpool usage and error handling for API key validation (#58090) (#59047)

The PR introduces following two changes:

Move API key validation into a new separate threadpool. The new threadpool is created separately with half of the available processors and 1000 in queue size. We could combine it with the existing TokenService's threadpool. Technically it is straightforward, but I am not sure whether it could be a rushed optimization since I am not clear about potential impact on the token service.

On threadpoool saturation, it now fails with EsRejectedExecutionException which in turns gives back a 429, instead of 401 status code to users.
This commit is contained in:
Yang Wang 2020-07-06 21:21:07 +10:00 committed by GitHub
parent 4a791e835b
commit 66c0231895
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 248 additions and 30 deletions

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -289,6 +290,8 @@ import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECU
public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin, NetworkPlugin, ClusterPlugin,
DiscoveryPlugin, MapperPlugin, ExtensiblePlugin {
public static final String SECURITY_CRYPTO_THREAD_POOL_NAME = XPackField.SECURITY + "-crypto";
private static final Logger logger = LogManager.getLogger(Security.class);
private final Settings settings;
@ -1027,8 +1030,14 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) {
if (enabled && transportClientMode == false) {
return Collections.singletonList(
new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000, "xpack.security.authc.token.thread_pool"));
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
return org.elasticsearch.common.collect.List.of(
new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000,
"xpack.security.authc.token.thread_pool", false),
new FixedExecutorBuilder(settings, SECURITY_CRYPTO_THREAD_POOL_NAME,
(allocatedProcessors + 1) / 2, 1000,
"xpack.security.crypto.thread_pool", false)
);
}
return Collections.emptyList();
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@ -42,6 +43,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -106,6 +108,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.security.authc.Authentication.AuthenticationType;
import static org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames.SECURITY_MAIN_ALIAS;
import static org.elasticsearch.xpack.security.Security.SECURITY_CRYPTO_THREAD_POOL_NAME;
public class ApiKeyService {
@ -336,14 +339,26 @@ public class ApiKeyService {
executeAsyncWithOrigin(ctx, SECURITY_ORIGIN, getRequest, ActionListener.<GetResponse>wrap(response -> {
if (response.isExists()) {
final Map<String, Object> source = response.getSource();
validateApiKeyCredentials(docId, source, credentials, clock, listener);
validateApiKeyCredentials(docId, source, credentials, clock, ActionListener.delegateResponse(listener, (l, e) -> {
if (ExceptionsHelper.unwrapCause(e) instanceof EsRejectedExecutionException) {
listener.onResponse(AuthenticationResult.terminate("server is too busy to respond", e));
} else {
listener.onFailure(e);
}
}));
} else {
listener.onResponse(
AuthenticationResult.unsuccessful("unable to find apikey with id " + credentials.getId(), null));
}
},
e -> listener.onResponse(AuthenticationResult.unsuccessful(
"apikey authentication for id " + credentials.getId() + " encountered a failure", e))),
e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof EsRejectedExecutionException) {
listener.onResponse(AuthenticationResult.terminate("server is too busy to respond", e));
} else {
listener.onResponse(AuthenticationResult.unsuccessful(
"apikey authentication for id " + credentials.getId() + " encountered a failure",e));
}
}),
client::get);
}
@ -476,7 +491,8 @@ public class ApiKeyService {
}, listener::onFailure),
threadPool.generic(), threadPool.getThreadContext());
} else {
final boolean verified = verifyKeyAgainstHash(apiKeyHash, credentials);
verifyKeyAgainstHash(apiKeyHash, credentials, ActionListener.wrap(
verified -> {
listenableCacheEntry.onResponse(new CachedApiKeyHashResult(verified, credentials.getKey()));
if (verified) {
// move on
@ -484,15 +500,22 @@ public class ApiKeyService {
} else {
listener.onResponse(AuthenticationResult.unsuccessful("invalid credentials", null));
}
}, listener::onFailure
));
}
} else {
final boolean verified = verifyKeyAgainstHash(apiKeyHash, credentials);
verifyKeyAgainstHash(apiKeyHash, credentials, ActionListener.wrap(
verified -> {
if (verified) {
// move on
validateApiKeyExpiration(source, credentials, clock, listener);
} else {
listener.onResponse(AuthenticationResult.unsuccessful("invalid credentials", null));
}
},
listener::onFailure
));
}
}
}
@ -560,14 +583,16 @@ public class ApiKeyService {
}
// Protected instance method so this can be mocked
protected boolean verifyKeyAgainstHash(String apiKeyHash, ApiKeyCredentials credentials) {
protected void verifyKeyAgainstHash(String apiKeyHash, ApiKeyCredentials credentials, ActionListener<Boolean> listener) {
threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(ActionRunnable.supply(listener, () -> {
Hasher hasher = Hasher.resolveFromHash(apiKeyHash.toCharArray());
final char[] apiKeyHashChars = apiKeyHash.toCharArray();
try {
Hasher hasher = Hasher.resolveFromHash(apiKeyHash.toCharArray());
return hasher.verify(credentials.getKey(), apiKeyHashChars);
} finally {
Arrays.fill(apiKeyHashChars, (char) 0);
}
}));
}
private Instant getApiKeyExpiration(Instant now, CreateApiKeyRequest request) {

View File

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.security.authc;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@ -16,11 +17,16 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.SecurityIntegTestCase;
@ -43,6 +49,7 @@ import org.elasticsearch.xpack.security.transport.filter.IPFilter;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
@ -53,13 +60,17 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames.SECURITY_MAIN_ALIAS;
import static org.elasticsearch.xpack.security.Security.SECURITY_CRYPTO_THREAD_POOL_NAME;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -82,6 +93,11 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
.build();
}
@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}
@Before
public void waitForSecurityIndexWritable() throws Exception {
assertSecurityIndexActive();
@ -839,6 +855,64 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
assertApiKeyNotCreated(client,"key-5");
}
public void testAuthenticationReturns429WhenThreadPoolIsSaturated() throws IOException, InterruptedException {
final String nodeName = randomFrom(internalCluster().getNodeNames());
final Settings settings = internalCluster().getInstance(Settings.class, nodeName);
final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
final RoleDescriptor descriptor = new RoleDescriptor("auth_only", new String[] { }, null, null);
final Client client = client().filterWithHeader(Collections.singletonMap("Authorization",
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_SUPERUSER,
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
SecurityClient securityClient = new SecurityClient(client);
final CreateApiKeyResponse createApiKeyResponse = securityClient.prepareCreateApiKey()
.setName("auth only key")
.setRoleDescriptors(Collections.singletonList(descriptor))
.get();
assertNotNull(createApiKeyResponse.getId());
assertNotNull(createApiKeyResponse.getKey());
final List<NodeInfo> nodeInfos = client().admin().cluster().prepareNodesInfo().get().getNodes().stream()
.filter(nodeInfo -> nodeInfo.getNode().getName().equals(nodeName))
.collect(Collectors.toList());
final ExecutorService executorService = threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME);
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
final int numberOfThreads = (allocatedProcessors + 1) / 2;
final CountDownLatch blockingLatch = new CountDownLatch(1);
final CountDownLatch readyLatch = new CountDownLatch(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
executorService.submit(() -> {
readyLatch.countDown();
try {
blockingLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
// Fill the whole queue for the crypto thread pool
final int queueSize = 1000;
IntStream.range(0, queueSize).forEach(i -> executorService.submit(() -> {}));
readyLatch.await();
try (RestClient restClient = createRestClient(nodeInfos, null, "http")) {
final String base64ApiKeyKeyValue = Base64.getEncoder().encodeToString(
(createApiKeyResponse.getId() + ":" + createApiKeyResponse.getKey().toString()).getBytes(StandardCharsets.UTF_8));
final Request authRequest = new Request("GET", "_security/_authenticate");
authRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader(
"Authorization", "ApiKey " + base64ApiKeyKeyValue).build());
final ResponseException responseException = expectThrows(ResponseException.class, () -> restClient.performRequest(authRequest));
assertThat(responseException.getMessage(), containsString("429 Too Many Requests"));
assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(429));
} finally {
blockingLatch.countDown();
}
}
private void assertApiKeyNotCreated(Client client, String keyName) throws ExecutionException, InterruptedException {
new RefreshRequestBuilder(client, RefreshAction.INSTANCE).setIndices(SECURITY_MAIN_ALIAS).execute().get();
PlainActionFuture<GetApiKeyResponse> getApiKeyResponseListener = new PlainActionFuture<>();

View File

@ -15,6 +15,8 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -25,6 +27,7 @@ import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.license.XPackLicenseState.Feature;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackSettings;
@ -59,11 +62,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.test.TestMatchers.throwableWithMessage;
import static org.elasticsearch.xpack.core.security.authz.store.ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR;
import static org.elasticsearch.xpack.security.Security.SECURITY_CRYPTO_THREAD_POOL_NAME;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyArray;
@ -88,7 +94,11 @@ public class ApiKeyServiceTests extends ESTestCase {
@Before
public void createThreadPool() {
threadPool = new TestThreadPool("api key service tests");
threadPool = Mockito.spy(
new TestThreadPool("api key service tests",
new FixedExecutorBuilder(Settings.EMPTY, SECURITY_CRYPTO_THREAD_POOL_NAME, 1, 1000,
"xpack.security.authc.api_key.thread_pool", false))
);
}
@After
@ -526,7 +536,7 @@ public class ApiKeyServiceTests extends ESTestCase {
hashCounter.incrementAndGet();
hashWait.acquire();
return invocationOnMock.callRealMethod();
}).when(service).verifyKeyAgainstHash(any(String.class), any(ApiKeyCredentials.class));
}).when(service).verifyKeyAgainstHash(any(String.class), any(ApiKeyCredentials.class), any(ActionListener.class));
final ApiKeyCredentials creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey.toCharArray()));
final PlainActionFuture<AuthenticationResult> future1 = new PlainActionFuture<>();
@ -602,6 +612,91 @@ public class ApiKeyServiceTests extends ESTestCase {
assertEquals("looked_up_by_type", ApiKeyService.getCreatorRealmType(authentication));
}
public void testAuthWillTerminateIfGetThreadPoolIsSaturated() throws ExecutionException, InterruptedException {
final String apiKey = randomAlphaOfLength(16);
final ApiKeyCredentials creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey.toCharArray()));
writeCredentialsToThreadContext(creds);
SecurityMocks.mockGetRequestException(client, new EsRejectedExecutionException("rejected"));
ApiKeyService service = createApiKeyService(Settings.EMPTY);
final PlainActionFuture<AuthenticationResult> future = new PlainActionFuture<>();
service.authenticateWithApiKeyIfPresent(threadPool.getThreadContext(), future);
final AuthenticationResult authenticationResult = future.get();
assertEquals(AuthenticationResult.Status.TERMINATE, authenticationResult.getStatus());
assertThat(authenticationResult.getMessage(), containsString("server is too busy to respond"));
}
public void testAuthWillTerminateIfHashingThreadPoolIsSaturated() throws IOException, ExecutionException, InterruptedException {
final String apiKey = randomAlphaOfLength(16);
final ApiKeyCredentials creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey.toCharArray()));
writeCredentialsToThreadContext(creds);
Hasher hasher = randomFrom(Hasher.PBKDF2, Hasher.BCRYPT4, Hasher.BCRYPT);
final char[] hash = hasher.hash(new SecureString(apiKey.toCharArray()));
Map<String, Object> sourceMap = buildApiKeySourceDoc(hash);
mockSourceDocument(creds.getId(), sourceMap);
final ExecutorService mockExecutorService = mock(ExecutorService.class);
when(threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME)).thenReturn(mockExecutorService);
Mockito.doAnswer(invocationOnMock -> {
final AbstractRunnable actionRunnable = (AbstractRunnable) invocationOnMock.getArguments()[0];
actionRunnable.onRejection(new EsRejectedExecutionException("rejected"));
return null;
}).when(mockExecutorService).execute(any(Runnable.class));
ApiKeyService service = createApiKeyService(Settings.EMPTY);
final PlainActionFuture<AuthenticationResult> future = new PlainActionFuture<>();
service.authenticateWithApiKeyIfPresent(threadPool.getThreadContext(), future);
final AuthenticationResult authenticationResult = future.get();
assertEquals(AuthenticationResult.Status.TERMINATE, authenticationResult.getStatus());
assertThat(authenticationResult.getMessage(), containsString("server is too busy to respond"));
}
public void testCachedApiKeyValidationWillNotBeBlockedByUnCachedApiKey() throws IOException, ExecutionException, InterruptedException {
final String apiKey1 = randomAlphaOfLength(16);
final ApiKeyCredentials creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey1.toCharArray()));
writeCredentialsToThreadContext(creds);
Hasher hasher = randomFrom(Hasher.PBKDF2, Hasher.BCRYPT4, Hasher.BCRYPT);
final char[] hash = hasher.hash(new SecureString(apiKey1.toCharArray()));
Map<String, Object> sourceMap = buildApiKeySourceDoc(hash);
mockSourceDocument(creds.getId(), sourceMap);
// Authenticate the key once to cache it
ApiKeyService service = createApiKeyService(Settings.EMPTY);
final PlainActionFuture<AuthenticationResult> future = new PlainActionFuture<>();
service.authenticateWithApiKeyIfPresent(threadPool.getThreadContext(), future);
final AuthenticationResult authenticationResult = future.get();
assertEquals(AuthenticationResult.Status.SUCCESS, authenticationResult.getStatus());
// Now force the hashing thread pool to saturate so that any un-cached keys cannot be validated
final ExecutorService mockExecutorService = mock(ExecutorService.class);
when(threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME)).thenReturn(mockExecutorService);
Mockito.doAnswer(invocationOnMock -> {
final AbstractRunnable actionRunnable = (AbstractRunnable) invocationOnMock.getArguments()[0];
actionRunnable.onRejection(new EsRejectedExecutionException("rejected"));
return null;
}).when(mockExecutorService).execute(any(Runnable.class));
// A new API key trying to connect that must go through full hash computation
final String apiKey2 = randomAlphaOfLength(16);
final ApiKeyCredentials creds2 = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey2.toCharArray()));
mockSourceDocument(creds2.getId(), buildApiKeySourceDoc(hasher.hash(new SecureString(apiKey2.toCharArray()))));
final PlainActionFuture<AuthenticationResult> future2 = new PlainActionFuture<>();
final ThreadContext.StoredContext storedContext = threadPool.getThreadContext().stashContext();
writeCredentialsToThreadContext(creds2);
service.authenticateWithApiKeyIfPresent(threadPool.getThreadContext(), future2);
final AuthenticationResult authenticationResult2 = future2.get();
assertEquals(AuthenticationResult.Status.TERMINATE, authenticationResult2.getStatus());
assertThat(authenticationResult2.getMessage(), containsString("server is too busy to respond"));
// The cached API key should not be affected
mockSourceDocument(creds.getId(), sourceMap);
final PlainActionFuture<AuthenticationResult> future3 = new PlainActionFuture<>();
storedContext.restore();
service.authenticateWithApiKeyIfPresent(threadPool.getThreadContext(), future3);
final AuthenticationResult authenticationResult3 = future3.get();
assertEquals(AuthenticationResult.Status.SUCCESS, authenticationResult3.getStatus());
}
private ApiKeyService createApiKeyService(Settings baseSettings) {
final Settings settings = Settings.builder()
.put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true)

View File

@ -106,6 +106,8 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_T
import static org.elasticsearch.test.SecurityTestsUtils.assertAuthenticationException;
import static org.elasticsearch.test.TestMatchers.throwableWithMessage;
import static org.elasticsearch.xpack.core.security.support.Exceptions.authenticationError;
import static org.elasticsearch.xpack.security.Security.SECURITY_CRYPTO_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.security.authc.TokenService.THREAD_POOL_NAME;
import static org.elasticsearch.xpack.security.authc.TokenServiceTests.mockGetTokenFromId;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.contains;
@ -206,7 +208,11 @@ public class AuthenticationServiceTests extends ESTestCase {
auditTrailService = new AuditTrailService(Collections.singletonList(auditTrail), licenseState);
client = mock(Client.class);
threadPool = new ThreadPool(settings,
new FixedExecutorBuilder(settings, TokenService.THREAD_POOL_NAME, 1, 1000, "xpack.security.authc.token.thread_pool"));
new FixedExecutorBuilder(settings, THREAD_POOL_NAME, 1, 1000,
"xpack.security.authc.token.thread_pool", false),
new FixedExecutorBuilder(Settings.EMPTY, SECURITY_CRYPTO_THREAD_POOL_NAME, 1, 1000,
"xpack.security.authc.api_key.thread_pool", false)
);
threadContext = threadPool.getThreadContext();
when(client.threadPool()).thenReturn(threadPool);
when(client.settings()).thenReturn(settings);

View File

@ -125,6 +125,15 @@ public final class SecurityMocks {
}).when(client).get(any(GetRequest.class), any(ActionListener.class));
}
public static void mockGetRequestException(Client client, Exception e) {
when(client.prepareGet(anyString(), anyString(), anyString())).thenReturn(new GetRequestBuilder(client, GetAction.INSTANCE));
doAnswer(inv -> {
ActionListener<GetResponse> listener = (ActionListener<GetResponse>) inv.getArguments()[1];
listener.onFailure(e);
return null;
}).when(client).get(any(GetRequest.class), any(ActionListener.class));
}
public static void mockIndexRequest(Client client, String indexAliasName, Consumer<IndexRequest> consumer) {
doAnswer(inv -> {
final Object[] args = inv.getArguments();