From de48b2426b46021d2c07684cb25d211b1930bf1c Mon Sep 17 00:00:00 2001 From: jaymode Date: Wed, 27 Apr 2016 18:39:51 -0400 Subject: [PATCH] change how audit user is compared, do not setDaemon, test cleanup This commit makes a few modifications to the IndexAuditTrail class: * Use `InternalAuditUser#is` to determine if the principal is the auditor when we have a user and simply compare `InternalAuditUser#NAME` when only a string principal is available * Remove the `Thread#setDaemon` call in the QueueConsumer as this thread should be terminated as part of the shutdown of the node In terms of tests, there are some issues and changes to how we test certain aspects. The muted tests were not accurate since the tests immediately checked for the existence of an index and did not poll or wait and this operation is asynchronous so the index could be created after the exists request was executed. These tests were removed and a new class was added to test the muted behavior. In these tests we override the audit trails implementation of a queue, which will set a flag to indicate a message has been added to the queue. This is a synchronous operation so it can be checked immediately. The other tests in the IndexAuditTrail tests remain but a few changes have been made to the execution. * ensureYellow is called for the index we expect to be created before searching for documents * the remote cluster is only setup at the beginning of the suite rather than before every test to ensure quicker execution * the maximum number of shards has been reduced to three since we do not really need up to 10 shards for a single document Original commit: elastic/x-pack-elasticsearch@501b6ce9da1879c735960c98eb3358f15e0fd025 --- .../shield/audit/index/IndexAuditTrail.java | 105 ++--- .../elasticsearch/shield/user/XPackUser.java | 4 + .../index/IndexAuditTrailMutedTests.java | 274 ++++++++++++ .../audit/index/IndexAuditTrailTests.java | 422 ++++++------------ 4 files changed, 464 insertions(+), 341 deletions(-) create mode 100644 elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailMutedTests.java diff --git a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditTrail.java b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditTrail.java index 3d955b819a3..1e2b4665d66 100644 --- a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditTrail.java +++ b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditTrail.java @@ -75,6 +75,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -151,7 +152,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl private final AtomicReference state = new AtomicReference<>(State.INITIALIZED); private final String nodeName; private final Provider clientProvider; - private final LinkedBlockingQueue eventQueue; + private final BlockingQueue eventQueue; private final QueueConsumer queueConsumer; private final Transport transport; private final ThreadPool threadPool; @@ -181,9 +182,8 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl this.clusterService = clusterService; this.nodeName = settings.get("name"); this.queueConsumer = new QueueConsumer(EsExecutors.threadName(settings, "audit-queue-consumer")); - int maxQueueSize = QUEUE_SIZE_SETTING.get(settings); - this.eventQueue = new LinkedBlockingQueue<>(maxQueueSize); + this.eventQueue = createQueue(maxQueueSize); // we have to initialize this here since we use rollover in determining if we can start... rollover = ROLLOVER_SETTING.get(settings); @@ -373,7 +373,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl @Override public void authenticationFailed(AuthenticationToken token, String action, TransportMessage message) { if (events.contains(AUTHENTICATION_FAILED)) { - if (!principalIsAuditor(token.principal())) { + if (XPackUser.is(token.principal()) == false) { try { enqueue(message("authentication_failed", action, token, null, indices(message), message), "authentication_failed"); } catch (Exception e) { @@ -386,7 +386,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl @Override public void authenticationFailed(AuthenticationToken token, RestRequest request) { if (events.contains(AUTHENTICATION_FAILED)) { - if (!principalIsAuditor(token.principal())) { + if (XPackUser.is(token.principal()) == false) { try { enqueue(message("authentication_failed", null, token, null, null, request), "authentication_failed"); } catch (Exception e) { @@ -399,7 +399,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl @Override public void authenticationFailed(String realm, AuthenticationToken token, String action, TransportMessage message) { if (events.contains(AUTHENTICATION_FAILED)) { - if (!principalIsAuditor(token.principal())) { + if (XPackUser.is(token.principal()) == false) { try { enqueue(message("authentication_failed", action, token, realm, indices(message), message), "authentication_failed"); } catch (Exception e) { @@ -412,7 +412,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl @Override public void authenticationFailed(String realm, AuthenticationToken token, RestRequest request) { if (events.contains(AUTHENTICATION_FAILED)) { - if (!principalIsAuditor(token.principal())) { + if (XPackUser.is(token.principal()) == false) { try { enqueue(message("authentication_failed", null, token, realm, null, request), "authentication_failed"); } catch (Exception e) { @@ -424,35 +424,31 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl @Override public void accessGranted(User user, String action, TransportMessage message) { - if (!principalIsAuditor(user.principal())) { - // special treatment for internal system actions - only log if explicitly told to - if ((SystemUser.is(user) && SystemPrivilege.INSTANCE.predicate().test(action)) || XPackUser.is(user)) { - if (events.contains(SYSTEM_ACCESS_GRANTED)) { - try { - enqueue(message("access_granted", action, user, indices(message), message), "access_granted"); - } catch (Exception e) { - logger.warn("failed to index audit event: [access_granted]", e); - } - } - } else if (events.contains(ACCESS_GRANTED)) { + // special treatment for internal system actions - only log if explicitly told to + if ((SystemUser.is(user) && SystemPrivilege.INSTANCE.predicate().test(action))) { + if (events.contains(SYSTEM_ACCESS_GRANTED)) { try { enqueue(message("access_granted", action, user, indices(message), message), "access_granted"); } catch (Exception e) { logger.warn("failed to index audit event: [access_granted]", e); } } + } else if (events.contains(ACCESS_GRANTED) && XPackUser.is(user) == false) { + try { + enqueue(message("access_granted", action, user, indices(message), message), "access_granted"); + } catch (Exception e) { + logger.warn("failed to index audit event: [access_granted]", e); + } } } @Override public void accessDenied(User user, String action, TransportMessage message) { - if (events.contains(ACCESS_DENIED)) { - if (!principalIsAuditor(user.principal())) { - try { - enqueue(message("access_denied", action, user, indices(message), message), "access_denied"); - } catch (Exception e) { - logger.warn("failed to index audit event: [access_denied]", e); - } + if (events.contains(ACCESS_DENIED) && XPackUser.is(user) == false) { + try { + enqueue(message("access_denied", action, user, indices(message), message), "access_denied"); + } catch (Exception e) { + logger.warn("failed to index audit event: [access_denied]", e); } } } @@ -481,13 +477,11 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl @Override public void tamperedRequest(User user, String action, TransportMessage request) { - if (events.contains(TAMPERED_REQUEST)) { - if (!principalIsAuditor(user.principal())) { - try { - enqueue(message("tampered_request", action, user, indices(request), request), "tampered_request"); - } catch (Exception e) { - logger.warn("failed to index audit event: [tampered_request]", e); - } + if (events.contains(TAMPERED_REQUEST) && XPackUser.is(user) == false) { + try { + enqueue(message("tampered_request", action, user, indices(request), request), "tampered_request"); + } catch (Exception e) { + logger.warn("failed to index audit event: [tampered_request]", e); } } } @@ -536,10 +530,6 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl } } - private boolean principalIsAuditor(String principal) { - return principal.equals(XPackUser.INSTANCE.principal()); - } - private Message message(String type, @Nullable String action, @Nullable User user, @Nullable String[] indices, TransportMessage message) throws Exception { @@ -819,6 +809,10 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl } } + BlockingQueue createQueue(int maxQueueSize) { + return new LinkedBlockingQueue<>(maxQueueSize); + } + private void initializeBulkProcessor() { final int bulkSize = BULK_SIZE_SETTING.get(settings); @@ -900,7 +894,6 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl QueueConsumer(String name) { super(name); - setDaemon(true); } @Override @@ -947,25 +940,25 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl } interface Field { - String TIMESTAMP = new String("@timestamp"); - String NODE_NAME = new String("node_name"); - String NODE_HOST_NAME = new String("node_host_name"); - String NODE_HOST_ADDRESS = new String("node_host_address"); - String LAYER = new String("layer"); - String TYPE = new String("event_type"); - String ORIGIN_ADDRESS = new String("origin_address"); - String ORIGIN_TYPE = new String("origin_type"); - String PRINCIPAL = new String("principal"); - String RUN_AS_PRINCIPAL = new String("run_as_principal"); - String RUN_BY_PRINCIPAL = new String("run_by_principal"); - String ACTION = new String("action"); - String INDICES = new String("indices"); - String REQUEST = new String("request"); - String REQUEST_BODY = new String("request_body"); - String URI = new String("uri"); - String REALM = new String("realm"); - String TRANSPORT_PROFILE = new String("transport_profile"); - String RULE = new String("rule"); + String TIMESTAMP = "@timestamp"; + String NODE_NAME = "node_name"; + String NODE_HOST_NAME = "node_host_name"; + String NODE_HOST_ADDRESS = "node_host_address"; + String LAYER = "layer"; + String TYPE = "event_type"; + String ORIGIN_ADDRESS = "origin_address"; + String ORIGIN_TYPE = "origin_type"; + String PRINCIPAL = "principal"; + String RUN_AS_PRINCIPAL = "run_as_principal"; + String RUN_BY_PRINCIPAL = "run_by_principal"; + String ACTION = "action"; + String INDICES = "indices"; + String REQUEST = "request"; + String REQUEST_BODY = "request_body"; + String URI = "uri"; + String REALM = "realm"; + String TRANSPORT_PROFILE = "transport_profile"; + String RULE = "rule"; } public enum State { diff --git a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/user/XPackUser.java b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/user/XPackUser.java index 12a826ca789..84de4f8f0b2 100644 --- a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/user/XPackUser.java +++ b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/user/XPackUser.java @@ -35,4 +35,8 @@ public class XPackUser extends ReservedUser { public static boolean is(User user) { return INSTANCE.equals(user); } + + public static boolean is(String principal) { + return NAME.equals(principal); + } } diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailMutedTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailMutedTests.java new file mode 100644 index 00000000000..a91bc3729b5 --- /dev/null +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailMutedTests.java @@ -0,0 +1,274 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.shield.audit.index; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.util.Providers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.shield.InternalClient; +import org.elasticsearch.shield.audit.index.IndexAuditTrail.State; +import org.elasticsearch.shield.authc.AuthenticationToken; +import org.elasticsearch.shield.transport.filter.ShieldIpFilterRule; +import org.elasticsearch.shield.user.SystemUser; +import org.elasticsearch.shield.user.User; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportMessage; +import org.junit.After; +import org.junit.Before; + +import java.net.InetAddress; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class IndexAuditTrailMutedTests extends ESTestCase { + + private InternalClient client; + private TransportClient transportClient; + private ThreadPool threadPool; + private Transport transport; + private IndexAuditTrail auditTrail; + + private AtomicBoolean messageEnqueued; + private AtomicBoolean clientCalled; + + @Before + public void setup() { + transport = mock(Transport.class); + when(transport.boundAddress()).thenReturn(new BoundTransportAddress(new TransportAddress[] { DummyTransportAddress.INSTANCE }, + DummyTransportAddress.INSTANCE)); + + threadPool = new ThreadPool("index audit trail tests"); + transportClient = TransportClient.builder().settings(Settings.EMPTY).build(); + clientCalled = new AtomicBoolean(false); + client = new InternalClient(transportClient) { + @Override + protected , Response extends ActionResponse, RequestBuilder extends + ActionRequestBuilder> void doExecute( + Action action, Request request, ActionListener listener) { + clientCalled.set(true); + } + }; + messageEnqueued = new AtomicBoolean(false); + } + + @After + public void stop() { + if (auditTrail != null) { + auditTrail.close(); + } + if (transportClient != null) { + transportClient.close(); + } + threadPool.shutdown(); + } + + public void testAnonymousAccessDeniedMutedTransport() { + createAuditTrail(new String[] { "anonymous_access_denied" }); + TransportMessage message = mock(TransportMessage.class); + auditTrail.anonymousAccessDenied("_action", message); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(message); + } + + public void testAnonymousAccessDeniedMutedRest() { + createAuditTrail(new String[] { "anonymous_access_denied" }); + RestRequest restRequest = mock(RestRequest.class); + auditTrail.anonymousAccessDenied(restRequest); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(restRequest); + } + + public void testAuthenticationFailedMutedTransport() { + createAuditTrail(new String[] { "authentication_failed" }); + TransportMessage message = mock(TransportMessage.class); + AuthenticationToken token = mock(AuthenticationToken.class); + + // with realm + auditTrail.authenticationFailed(randomAsciiOfLengthBetween(2, 10), token, "_action", message); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + // without realm + auditTrail.authenticationFailed(token, "_action", message); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + // without the token + auditTrail.authenticationFailed("_action", message); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(token, message); + } + + public void testAuthenticationFailedMutedRest() { + createAuditTrail(new String[] { "authentication_failed" }); + RestRequest restRequest = mock(RestRequest.class); + AuthenticationToken token = mock(AuthenticationToken.class); + + // with realm + auditTrail.authenticationFailed(randomAsciiOfLengthBetween(2, 10), token, restRequest); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + // without the realm + auditTrail.authenticationFailed(token, restRequest); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + // without the token + auditTrail.authenticationFailed(restRequest); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(token, restRequest); + } + + public void testAccessGrantedMuted() { + createAuditTrail(new String[] { "access_granted" }); + TransportMessage message = mock(TransportMessage.class); + User user = mock(User.class); + auditTrail.accessGranted(user, randomAsciiOfLengthBetween(6, 40), message); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(message, user); + } + + public void testSystemAccessGrantedMuted() { + createAuditTrail(randomFrom(new String[] { "access_granted" }, null)); + TransportMessage message = mock(TransportMessage.class); + User user = SystemUser.INSTANCE; + auditTrail.accessGranted(user, "internal:foo", message); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(message); + } + + public void testAccessDeniedMuted() { + createAuditTrail(new String[] { "access_denied" }); + TransportMessage message = mock(TransportMessage.class); + User user = mock(User.class); + auditTrail.accessDenied(user, randomAsciiOfLengthBetween(6, 40), message); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(message, user); + } + + public void testTamperedRequestMuted() { + createAuditTrail(new String[] { "tampered_request" }); + TransportMessage message = mock(TransportMessage.class); + User user = mock(User.class); + + // with user + auditTrail.tamperedRequest(user, randomAsciiOfLengthBetween(6, 40), message); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + // without user + auditTrail.tamperedRequest(randomAsciiOfLengthBetween(6, 40), message); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(message, user); + } + + public void testConnectionGrantedMuted() { + createAuditTrail(new String[] { "connection_granted" }); + InetAddress address = mock(InetAddress.class); + ShieldIpFilterRule rule = mock(ShieldIpFilterRule.class); + + auditTrail.connectionGranted(address, randomAsciiOfLengthBetween(1, 12), rule); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(address, rule); + } + + public void testConnectionDeniedMuted() { + createAuditTrail(new String[] { "connection_denied" }); + InetAddress address = mock(InetAddress.class); + ShieldIpFilterRule rule = mock(ShieldIpFilterRule.class); + + auditTrail.connectionDenied(address, randomAsciiOfLengthBetween(1, 12), rule); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(address, rule); + } + + public void testRunAsGrantedMuted() { + createAuditTrail(new String[] { "run_as_granted" }); + TransportMessage message = mock(TransportMessage.class); + User user = mock(User.class); + + auditTrail.runAsGranted(user, randomAsciiOfLengthBetween(6, 40), message); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(message, user); + } + + public void testRunAsDeniedMuted() { + createAuditTrail(new String[] { "run_as_denied" }); + TransportMessage message = mock(TransportMessage.class); + User user = mock(User.class); + + auditTrail.runAsDenied(user, randomAsciiOfLengthBetween(6, 40), message); + assertThat(messageEnqueued.get(), is(false)); + assertThat(clientCalled.get(), is(false)); + + verifyZeroInteractions(message, user); + } + + IndexAuditTrail createAuditTrail(String[] excludes) { + Settings settings = IndexAuditTrailTests.levelSettings(null, excludes); + auditTrail = new IndexAuditTrail(settings, transport, Providers.of(client), threadPool, mock(ClusterService.class)) { + @Override + void putTemplate(Settings settings) { + // make this a no-op so we don't have to stub out unnecessary client activities + } + + @Override + BlockingQueue createQueue(int maxQueueSize) { + return new LinkedBlockingQueue(maxQueueSize) { + @Override + public boolean offer(Message message) { + messageEnqueued.set(true); + return super.offer(message); + } + }; + } + }; + auditTrail.start(true); + assertThat(auditTrail.state(), is(State.STARTED)); + return auditTrail; + } +} diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java index 54e7a335a2d..bff7563e563 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java @@ -5,14 +5,17 @@ */ package org.elasticsearch.shield.audit.index; -import org.apache.lucene.util.LuceneTestCase.BadApple; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.util.Providers; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.Settings; @@ -21,13 +24,13 @@ import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.shield.Security; import org.elasticsearch.shield.user.SystemUser; import org.elasticsearch.shield.user.User; import org.elasticsearch.shield.authc.AuthenticationToken; +import org.elasticsearch.shield.crypto.InternalCryptoService; import org.elasticsearch.shield.transport.filter.IPFilter; import org.elasticsearch.shield.transport.filter.ShieldIpFilterRule; import org.elasticsearch.test.ESIntegTestCase; @@ -39,11 +42,13 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInfo; import org.elasticsearch.transport.TransportMessage; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.xpack.XPackPlugin; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.ISODateTimeFormat; import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import java.io.IOException; import java.net.InetAddress; @@ -61,8 +66,6 @@ import static org.elasticsearch.shield.audit.index.IndexNameResolver.Rollover.MO import static org.elasticsearch.shield.audit.index.IndexNameResolver.Rollover.WEEKLY; import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; import static org.elasticsearch.test.InternalTestCluster.clusterName; -import static org.elasticsearch.test.ShieldSettingsSource.DEFAULT_PASSWORD; -import static org.elasticsearch.test.ShieldSettingsSource.DEFAULT_USER_NAME; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -74,26 +77,120 @@ import static org.mockito.Mockito.when; /** * */ -//test is just too slow, please fix it to not be sleep-based -@BadApple(bugUrl = "https://github.com/elastic/x-plugins/issues/1007") @ESIntegTestCase.ClusterScope(scope = SUITE, numDataNodes = 1) public class IndexAuditTrailTests extends ShieldIntegTestCase { public static final String SECOND_CLUSTER_NODE_PREFIX = "remote_" + SUITE_CLUSTER_NODE_PREFIX; + private static boolean remoteIndexing; + private static InternalTestCluster remoteCluster; + private static Settings remoteSettings; + private static byte[] systemKey; + private IndexNameResolver.Rollover rollover; private IndexAuditTrail auditor; - private boolean remoteIndexing = false; - private InternalTestCluster cluster2; - private Client remoteClient; private int numShards; private int numReplicas; private ThreadPool threadPool; + @BeforeClass + public static void configureBeforeClass() { + remoteIndexing = randomBoolean(); + systemKey = InternalCryptoService.generateKey(); + if (remoteIndexing == false) { + remoteSettings = Settings.EMPTY; + } + } + + @AfterClass + public static void cleanupAfterTest() { + if (remoteCluster != null) { + remoteCluster.close(); + remoteCluster = null; + } + remoteSettings = null; + } + + @Before + public void initializeRemoteClusterIfNecessary() throws Exception { + if (remoteIndexing == false) { + logger.info("--> remote indexing disabled."); + return; + } + + if (remoteCluster != null) { + return; + } + + // create another cluster + String cluster2Name = clusterName(Scope.SUITE.name(), randomLong()); + + // Setup a second test cluster with randomization for number of nodes, shield enabled, and SSL + final int numNodes = randomIntBetween(1, 2); + final boolean useShield = randomBoolean(); + final boolean useSSL = useShield && randomBoolean(); + logger.info("--> remote indexing enabled. shield enabled: [{}], SSL enabled: [{}], nodes: [{}]", useShield, useSSL, numNodes); + ShieldSettingsSource cluster2SettingsSource = + new ShieldSettingsSource(numNodes, useSSL, systemKey(), createTempDir(), Scope.SUITE) { + @Override + public Settings nodeSettings(int nodeOrdinal) { + Settings.Builder builder = Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(Security.enabledSetting(), useShield); + return builder.build(); + } + }; + remoteCluster = new InternalTestCluster("network", randomLong(), createTempDir(), numNodes, numNodes, cluster2Name, + cluster2SettingsSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, getMockPlugins(), + useShield ? getClientWrapper() : Function.identity()); + remoteCluster.beforeTest(random(), 0.5); + + NodesInfoResponse response = remoteCluster.client().admin().cluster().prepareNodesInfo().execute().actionGet(); + TransportInfo info = response.getNodes()[0].getTransport(); + InetSocketTransportAddress inet = (InetSocketTransportAddress) info.address().publishAddress(); + + Settings.Builder builder = Settings.builder() + .put(Security.enabledSetting(), useShield) + .put(remoteSettings(NetworkAddress.format(inet.address().getAddress()), inet.address().getPort(), cluster2Name)) + .put("xpack.security.audit.index.client.xpack.security.user", ShieldSettingsSource.DEFAULT_USER_NAME + ":" + + ShieldSettingsSource.DEFAULT_PASSWORD); + + if (useSSL) { + for (Map.Entry entry : cluster2SettingsSource.getClientSSLSettings().getAsMap().entrySet()) { + builder.put("xpack.security.audit.index.client." + entry.getKey(), entry.getValue()); + } + } + remoteSettings = builder.build(); + } + + @After + public void afterTest() { + if (threadPool != null) { + threadPool.shutdown(); + } + if (auditor != null) { + auditor.close(); + } + + if (remoteCluster != null) { + remoteCluster.wipe(Collections.emptySet()); + } + } + @Override protected Set excludeTemplates() { return Collections.singleton(IndexAuditTrail.INDEX_TEMPLATE_NAME); } + @Override + protected byte[] systemKey() { + return systemKey; + } + + @Override + protected int maximumNumberOfShards() { + return 3; + } + private Settings commonSettings(IndexNameResolver.Rollover rollover) { return Settings.builder() .put("xpack.security.audit.enabled", true) @@ -106,14 +203,14 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { .build(); } - private Settings remoteSettings(String address, int port, String clusterName) { + static Settings remoteSettings(String address, int port, String clusterName) { return Settings.builder() .put("xpack.security.audit.index.client.hosts", address + ":" + port) .put("xpack.security.audit.index.client.cluster.name", clusterName) .build(); } - private Settings levelSettings(String[] includes, String[] excludes) { + static Settings levelSettings(String[] includes, String[] excludes) { Settings.Builder builder = Settings.builder(); if (includes != null) { builder.putArray("xpack.security.audit.index.events.include", includes); @@ -132,92 +229,33 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { } private Client getClient() { - return remoteIndexing ? remoteClient : client(); + return remoteIndexing ? remoteCluster.client() : client(); } - private void initialize(String... excludes) throws IOException, InterruptedException { - initialize(null, excludes); + private void initialize() throws IOException, InterruptedException { + initialize(null, null); } private void initialize(String[] includes, String[] excludes) throws IOException, InterruptedException { rollover = randomFrom(HOURLY, DAILY, WEEKLY, MONTHLY); numReplicas = numberOfReplicas(); numShards = numberOfShards(); - Settings settings = settings(rollover, includes, excludes); - remoteIndexing = randomBoolean(); - + Settings.Builder builder = Settings.builder(); if (remoteIndexing) { - // create another cluster - String cluster2Name = clusterName(Scope.SUITE.name(), randomLong()); - - // Setup a second test cluster with randomization for number of nodes, shield enabled, and SSL - final int numNodes = randomIntBetween(1, 2); - final boolean useShield = randomBoolean(); - final boolean useSSL = useShield && randomBoolean(); - logger.info("--> remote indexing enabled. shield enabled: [{}], SSL enabled: [{}]", useShield, useSSL); - ShieldSettingsSource cluster2SettingsSource = new ShieldSettingsSource(numNodes, useSSL, systemKey(), createTempDir(), - Scope.SUITE) { - @Override - public Settings nodeSettings(int nodeOrdinal) { - Settings.Builder builder = Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(XPackPlugin.featureEnabledSetting(Security.NAME), useShield); - return builder.build(); - } - }; - cluster2 = new InternalTestCluster("network", randomLong(), createTempDir(), numNodes, numNodes, cluster2Name, - cluster2SettingsSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, getMockPlugins(), - useShield ? getClientWrapper() : Function.identity()); - cluster2.beforeTest(random(), 0.5); - remoteClient = cluster2.client(); - - NodesInfoResponse response = remoteClient.admin().cluster().prepareNodesInfo().execute().actionGet(); - TransportInfo info = response.getNodes()[0].getTransport(); - InetSocketTransportAddress inet = (InetSocketTransportAddress) info.address().publishAddress(); - - Settings.Builder builder = Settings.builder() - .put(settings) - .put(XPackPlugin.featureEnabledSetting(Security.NAME), useShield) - .put(remoteSettings(NetworkAddress.format(inet.address().getAddress()), inet.address().getPort(), cluster2Name)) - .put("xpack.security.audit.index.client." + Security.USER_SETTING.getKey(), DEFAULT_USER_NAME + ":" + DEFAULT_PASSWORD); - - if (useSSL) { - for (Map.Entry entry : cluster2SettingsSource.getClientSSLSettings().getAsMap().entrySet()) { - builder.put("xpack.security.audit.index.client." + entry.getKey(), entry.getValue()); - } - } - settings = builder.build(); + builder.put(remoteSettings); } - settings = Settings.builder().put(settings).put("path.home", createTempDir()).build(); + Settings settings = builder.put(settings(rollover, includes, excludes)).build(); logger.info("--> settings: [{}]", settings.getAsMap().toString()); Transport transport = mock(Transport.class); BoundTransportAddress boundTransportAddress = new BoundTransportAddress(new TransportAddress[]{DummyTransportAddress.INSTANCE}, DummyTransportAddress.INSTANCE); when(transport.boundAddress()).thenReturn(boundTransportAddress); - threadPool = new ThreadPool("index audit trail tests"); auditor = new IndexAuditTrail(settings, transport, Providers.of(internalClient()), threadPool, mock(ClusterService.class)); auditor.start(true); } - @After - public void afterTest() { - if (threadPool != null) { - threadPool.shutdown(); - } - if (auditor != null) { - auditor.close(); - } - - cluster().wipe(Collections.singleton(IndexAuditTrail.INDEX_TEMPLATE_NAME)); - if (remoteIndexing && cluster2 != null) { - cluster2.wipe(Collections.singleton(IndexAuditTrail.INDEX_TEMPLATE_NAME)); - remoteClient.close(); - cluster2.close(); - } - } - public void testAnonymousAccessDeniedTransport() throws Exception { initialize(); TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); @@ -242,18 +280,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertEquals(sourceMap.get("request"), message.getClass().getSimpleName()); } - public void testAnonymousAccessDeniedTransportMuted() throws Exception { - initialize("anonymous_access_denied"); - TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); - auditor.anonymousAccessDenied("_action", message); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testAnonymousAccessDeniedRest() throws Exception { initialize(); RestRequest request = mockRestRequest(); @@ -270,18 +296,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertThat(sourceMap.get("request_body"), notNullValue()); } - public void testAnonymousAccessDeniedRestMuted() throws Exception { - initialize("anonymous_access_denied"); - RestRequest request = mockRestRequest(); - auditor.anonymousAccessDenied(request); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testAuthenticationFailedTransport() throws Exception { initialize(); TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage(); @@ -330,30 +344,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertEquals(sourceMap.get("request"), message.getClass().getSimpleName()); } - public void testAuthenticationFailed_Transport_Muted() throws Exception { - initialize("authentication_failed"); - TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); - auditor.authenticationFailed(new MockToken(), "_action", message); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - - public void testAuthenticationFailedTransportNoTokenMuted() throws Exception { - initialize("authentication_failed"); - TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); - auditor.authenticationFailed("_action", message); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testAuthenticationFailedRest() throws Exception { initialize(); RestRequest request = mockRestRequest(); @@ -388,30 +378,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertThat(sourceMap.get("request_body"), notNullValue()); } - public void testAuthenticationFailedRestMuted() throws Exception { - initialize("authentication_failed"); - RestRequest request = mockRestRequest(); - auditor.authenticationFailed(new MockToken(), request); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - - public void testAuthenticationFailedRestNoTokenMuted() throws Exception { - initialize("authentication_failed"); - RestRequest request = mockRestRequest(); - auditor.authenticationFailed(request); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testAuthenticationFailedTransportRealm() throws Exception { initialize(); TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); @@ -440,18 +406,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertEquals(sourceMap.get("request"), message.getClass().getSimpleName()); } - public void testAuthenticationFailedTransportRealmMuted() throws Exception { - initialize("authentication_failed"); - TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); - auditor.authenticationFailed("_realm", new MockToken(), "_action", message); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testAuthenticationFailedRestRealm() throws Exception { initialize(); RestRequest request = mockRestRequest(); @@ -469,18 +423,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertThat(sourceMap.get("request_body"), notNullValue()); } - public void testAuthenticationFailedRestRealmMuted() throws Exception { - initialize("authentication_failed"); - RestRequest request = mockRestRequest(); - auditor.authenticationFailed("_realm", new MockToken(), request); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testAccessGranted() throws Exception { initialize(); TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); @@ -513,17 +455,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertEquals(sourceMap.get("request"), message.getClass().getSimpleName()); } - public void testAccessGrantedMuted() throws Exception { - initialize("access_granted"); - TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); - auditor.accessGranted(new User("_username", "r1"), "_action", message); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } public void testSystemAccessGranted() throws Exception { initialize(new String[] { "system_access_granted" }, null); TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage(); @@ -539,18 +470,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertEquals(sourceMap.get("request"), message.getClass().getSimpleName()); } - public void testSystemAccessGrantedMuted() throws Exception { - initialize(); - TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage(); - auditor.accessGranted(SystemUser.INSTANCE, "internal:_action", message); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testAccessDenied() throws Exception { initialize(); TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); @@ -583,18 +502,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertEquals(sourceMap.get("request"), message.getClass().getSimpleName()); } - public void testAccessDenied_Muted() throws Exception { - initialize("access_denied"); - TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); - auditor.accessDenied(new User("_username", "r1"), "_action", message); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testTamperedRequestRest() throws Exception { initialize(); RestRequest request = mockRestRequest(); @@ -655,32 +562,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertEquals(sourceMap.get("request"), message.getClass().getSimpleName()); } - public void testTamperedRequestMuted() throws Exception { - initialize("tampered_request"); - TransportRequest message = new RemoteHostMockTransportRequest(); - final int type = randomIntBetween(0, 2); - switch (type) { - case 0: - auditor.tamperedRequest(new User("_username", new String[]{"r1"}), "_action", message); - break; - case 1: - auditor.tamperedRequest("_action", message); - break; - case 2: - auditor.tamperedRequest(mockRestRequest()); - break; - default: - throw new IllegalStateException("invalid value for type: " + type); - } - - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testConnectionGranted() throws Exception { initialize(); InetAddress inetAddress = InetAddress.getLoopbackAddress(); @@ -696,19 +577,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertEquals("default", sourceMap.get("transport_profile")); } - public void testConnectionGrantedMuted() throws Exception { - initialize("connection_granted"); - InetAddress inetAddress = InetAddress.getLoopbackAddress(); - ShieldIpFilterRule rule = IPFilter.DEFAULT_PROFILE_ACCEPT_ALL; - auditor.connectionGranted(inetAddress, "default", rule); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testConnectionDenied() throws Exception { initialize(); InetAddress inetAddress = InetAddress.getLoopbackAddress(); @@ -724,19 +592,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertEquals("default", sourceMap.get("transport_profile")); } - public void testConnectionDeniedMuted() throws Exception { - initialize("connection_denied"); - InetAddress inetAddress = InetAddress.getLoopbackAddress(); - ShieldIpFilterRule rule = new ShieldIpFilterRule(false, "_all"); - auditor.connectionDenied(inetAddress, "default", rule); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testRunAsGranted() throws Exception { initialize(); TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); @@ -754,18 +609,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertEquals(sourceMap.get("request"), message.getClass().getSimpleName()); } - public void testRunAsGrantedMuted() throws Exception { - initialize("run_as_granted"); - TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); - auditor.runAsGranted(new User("_username", new String[]{"r1"}, new User("running as", new String[]{"r2"})), "_action", message); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - public void testRunAsDenied() throws Exception { initialize(); TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); @@ -783,18 +626,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertEquals(sourceMap.get("request"), message.getClass().getSimpleName()); } - public void testRunAsDeniedMuted() throws Exception { - initialize("run_as_denied"); - TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); - auditor.runAsDenied(new User("_username", new String[]{"r1"}, new User("running as", new String[]{"r2"})), "_action", message); - try { - getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet(); - fail("Expected IndexNotFoundException"); - } catch (IndexNotFoundException e) { - assertThat(e.getMessage(), is("no such index")); - } - } - private void assertAuditMessage(SearchHit hit, String layer, String type) { Map sourceMap = hit.sourceAsMap(); assertThat(sourceMap.get("@timestamp"), notNullValue()); @@ -901,10 +732,11 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { } private void awaitAuditDocumentCreation(final String indexName) throws InterruptedException { + ensureYellow(indexName); boolean found = awaitBusy(() -> { try { SearchResponse searchResponse = getClient().prepareSearch(indexName).setSize(0).setTerminateAfter(1).execute().actionGet(); - return searchResponse.getHits().totalHits() > 0; + return searchResponse.getHits().totalHits() > 0L; } catch (Exception e) { return false; } @@ -916,6 +748,26 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { assertThat(response.getSetting(indexName, "index.number_of_replicas"), is(Integer.toString(numReplicas))); } + @Override + public ClusterHealthStatus ensureYellow(String... indices) { + if (remoteIndexing == false) { + return super.ensureYellow(indices); + } + + // pretty ugly but just a rip of ensureYellow that uses a different client + ClusterHealthResponse actionGet = getClient().admin().cluster().health(Requests.clusterHealthRequest(indices) + .waitForRelocatingShards(0).waitForYellowStatus().waitForEvents(Priority.LANGUID)).actionGet(); + if (actionGet.isTimedOut()) { + logger.info("ensureYellow timed out, cluster state:\n{}\n{}", + getClient().admin().cluster().prepareState().get().getState().prettyPrint(), + getClient().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); + assertThat("timed out waiting for yellow", actionGet.isTimedOut(), equalTo(false)); + } + + logger.debug("indices {} are yellow", indices.length == 0 ? "[_all]" : indices); + return actionGet.getStatus(); + } + private String resolveIndexName() { return IndexNameResolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, DateTime.now(DateTimeZone.UTC), rollover); }