Adds error handling when filling up the queue of the crypto thread pool. Also reduce queue size of the crypto thread pool to 10 so that the queue can be cleared out in time. Test testAuthenticationReturns429WhenThreadPoolIsSaturated has seen failure on CI when it tries to push 1000 tasks into the queue (setup phase). Since multiple tests share the same internal test cluster, it may be possible that there are lingering requests not fully cleared out from the queue. When it happens, we will not be able to push all 1000 tasks into the queue. But since what we need is just queue saturation, so as long as we can be sure that the queue is fully filled, it is safe to ignore rejection error and just move on. A number of 1000 tasks also take some to clear out, which could cause the test suite to time out. This PR change the queue to 10 so the tests would have better chance to complete in time.
This commit is contained in:
parent
e8181fc627
commit
f84b76661d
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.collect.Tuple;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.SecurityIntegTestCase;
|
||||
|
@ -63,9 +64,9 @@ import java.util.Set;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
||||
|
@ -82,6 +83,7 @@ import static org.hamcrest.Matchers.nullValue;
|
|||
|
||||
public class ApiKeyIntegTests extends SecurityIntegTestCase {
|
||||
private static final long DELETE_INTERVAL_MILLIS = 100L;
|
||||
private static final int CRYPTO_THREAD_POOL_QUEUE_SIZE = 10;
|
||||
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
|
@ -90,6 +92,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
|
|||
.put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true)
|
||||
.put(ApiKeyService.DELETE_INTERVAL.getKey(), TimeValue.timeValueMillis(DELETE_INTERVAL_MILLIS))
|
||||
.put(ApiKeyService.DELETE_TIMEOUT.getKey(), TimeValue.timeValueSeconds(5L))
|
||||
.put("xpack.security.crypto.thread_pool.queue_size", CRYPTO_THREAD_POOL_QUEUE_SIZE)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -855,7 +858,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
|
|||
assertApiKeyNotCreated(client,"key-5");
|
||||
}
|
||||
|
||||
public void testAuthenticationReturns429WhenThreadPoolIsSaturated() throws IOException, InterruptedException {
|
||||
public void testAuthenticationReturns429WhenThreadPoolIsSaturated() throws IOException, InterruptedException, ExecutionException {
|
||||
final String nodeName = randomFrom(internalCluster().getNodeNames());
|
||||
final Settings settings = internalCluster().getInstance(Settings.class, nodeName);
|
||||
final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
|
||||
|
@ -882,7 +885,6 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
|
|||
final int numberOfThreads = (allocatedProcessors + 1) / 2;
|
||||
final CountDownLatch blockingLatch = new CountDownLatch(1);
|
||||
final CountDownLatch readyLatch = new CountDownLatch(numberOfThreads);
|
||||
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
executorService.submit(() -> {
|
||||
readyLatch.countDown();
|
||||
|
@ -894,8 +896,13 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
|
|||
});
|
||||
}
|
||||
// Fill the whole queue for the crypto thread pool
|
||||
final int queueSize = 1000;
|
||||
IntStream.range(0, queueSize).forEach(i -> executorService.submit(() -> {}));
|
||||
Future<?> lastTaskFuture = null;
|
||||
try {
|
||||
for (int i = 0; i < CRYPTO_THREAD_POOL_QUEUE_SIZE; i++) {
|
||||
lastTaskFuture = executorService.submit(() -> { });
|
||||
}
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
}
|
||||
readyLatch.await();
|
||||
|
||||
try (RestClient restClient = createRestClient(nodeInfos, null, "http")) {
|
||||
|
@ -910,6 +917,9 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
|
|||
assertThat(responseException.getResponse().getStatusLine().getStatusCode(), is(429));
|
||||
} finally {
|
||||
blockingLatch.countDown();
|
||||
if (lastTaskFuture != null) {
|
||||
lastTaskFuture.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ public class ApiKeyServiceTests extends ESTestCase {
|
|||
threadPool = Mockito.spy(
|
||||
new TestThreadPool("api key service tests",
|
||||
new FixedExecutorBuilder(Settings.EMPTY, SECURITY_CRYPTO_THREAD_POOL_NAME, 1, 1000,
|
||||
"xpack.security.authc.api_key.thread_pool", false))
|
||||
"xpack.security.crypto.thread_pool", false))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -211,7 +211,7 @@ public class AuthenticationServiceTests extends ESTestCase {
|
|||
new FixedExecutorBuilder(settings, THREAD_POOL_NAME, 1, 1000,
|
||||
"xpack.security.authc.token.thread_pool", false),
|
||||
new FixedExecutorBuilder(Settings.EMPTY, SECURITY_CRYPTO_THREAD_POOL_NAME, 1, 1000,
|
||||
"xpack.security.authc.api_key.thread_pool", false)
|
||||
"xpack.security.crypto.thread_pool", false)
|
||||
);
|
||||
threadContext = threadPool.getThreadContext();
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
|
|
Loading…
Reference in New Issue