From ab3438cbc4c32797c766fd782e51051b06e14271 Mon Sep 17 00:00:00 2001 From: jaymode Date: Thu, 26 May 2016 09:47:59 -0400 Subject: [PATCH] test: IndexAuditTrailTests use the proper timestamp to resolve the index name This changes the IndexAuditTrailTests to use the actual timestamp of the message being indexed to determine the index name. Some build failures occurred due to running right at the change of an hour and the rollover was set to hourly. So the message was indexed in one index and the test expected a different index. Original commit: elastic/x-pack-elasticsearch@9dd5012a73876ad73f6e0d928e275ed724ef2005 --- .../audit/index/IndexAuditTrailTests.java | 148 +++++++----------- 1 file changed, 55 insertions(+), 93 deletions(-) diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java index 1ff18a540dd..ef210aa05bf 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.shield.audit.index; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; @@ -27,6 +28,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.shield.Security; +import org.elasticsearch.shield.audit.index.IndexAuditTrail.Message; import org.elasticsearch.shield.transport.netty.ShieldNettyTransport; import org.elasticsearch.shield.user.SystemUser; import org.elasticsearch.shield.user.User; @@ -89,6 +91,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { private IndexNameResolver.Rollover rollover; private IndexAuditTrail auditor; + private SetOnce enqueuedMessage; private int numShards; private int numReplicas; private ThreadPool threadPool; @@ -255,7 +258,14 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { DummyTransportAddress.INSTANCE); when(transport.boundAddress()).thenReturn(boundTransportAddress); threadPool = new ThreadPool("index audit trail tests"); - auditor = new IndexAuditTrail(settings, transport, Providers.of(internalClient()), threadPool, mock(ClusterService.class)); + enqueuedMessage = new SetOnce<>(); + auditor = new IndexAuditTrail(settings, transport, Providers.of(internalClient()), threadPool, mock(ClusterService.class)) { + @Override + void enqueue(Message message, String type) { + enqueuedMessage.set(message); + super.enqueue(message, type); + } + }; auditor.start(true); } @@ -263,9 +273,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { initialize(); TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); auditor.anonymousAccessDenied("_action", message); - awaitAuditDocumentCreation(resolveIndexName()); - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "transport", "anonymous_access_denied"); Map sourceMap = hit.sourceAsMap(); if (message instanceof RemoteHostMockMessage) { @@ -287,9 +296,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { initialize(); RestRequest request = mockRestRequest(); auditor.anonymousAccessDenied(request); - awaitAuditDocumentCreation(resolveIndexName()); - - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "rest", "anonymous_access_denied"); Map sourceMap = hit.sourceAsMap(); @@ -303,9 +310,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { initialize(); TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage(); auditor.authenticationFailed(new MockToken(), "_action", message); - awaitAuditDocumentCreation(resolveIndexName()); - - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); Map sourceMap = hit.sourceAsMap(); assertAuditMessage(hit, "transport", "authentication_failed"); @@ -325,9 +330,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { initialize(); TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); auditor.authenticationFailed("_action", message); - awaitAuditDocumentCreation(resolveIndexName()); - - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "transport", "authentication_failed"); Map sourceMap = hit.sourceAsMap(); @@ -351,9 +354,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { initialize(); RestRequest request = mockRestRequest(); auditor.authenticationFailed(new MockToken(), request); - awaitAuditDocumentCreation(resolveIndexName()); - - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "rest", "authentication_failed"); Map sourceMap = hit.sourceAsMap(); @@ -368,9 +369,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { initialize(); RestRequest request = mockRestRequest(); auditor.authenticationFailed(request); - awaitAuditDocumentCreation(resolveIndexName()); - - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "rest", "authentication_failed"); Map sourceMap = hit.sourceAsMap(); @@ -385,9 +384,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { initialize(); TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); auditor.authenticationFailed("_realm", new MockToken(), "_action", message); - awaitAuditDocumentCreation(resolveIndexName()); - - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "transport", "authentication_failed"); Map sourceMap = hit.sourceAsMap(); @@ -413,9 +410,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { initialize(); RestRequest request = mockRestRequest(); auditor.authenticationFailed("_realm", new MockToken(), request); - awaitAuditDocumentCreation(resolveIndexName()); - - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "rest", "authentication_failed"); Map sourceMap = hit.sourceAsMap(); @@ -438,9 +433,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { user = new User("_username", new String[]{"r1"}); } auditor.accessGranted(user, "_action", message); - awaitAuditDocumentCreation(resolveIndexName()); - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "transport", "access_granted"); Map sourceMap = hit.sourceAsMap(); assertEquals("transport", sourceMap.get("origin_type")); @@ -462,9 +456,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { initialize(new String[] { "system_access_granted" }, null); TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage(); auditor.accessGranted(SystemUser.INSTANCE, "internal:_action", message); - awaitAuditDocumentCreation(resolveIndexName()); - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "transport", "access_granted"); Map sourceMap = hit.sourceAsMap(); assertEquals("transport", sourceMap.get("origin_type")); @@ -485,9 +478,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { user = new User("_username", new String[]{"r1"}); } auditor.accessDenied(user, "_action", message); - awaitAuditDocumentCreation(resolveIndexName()); - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); Map sourceMap = hit.sourceAsMap(); assertAuditMessage(hit, "transport", "access_denied"); assertEquals("transport", sourceMap.get("origin_type")); @@ -509,9 +501,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { initialize(); RestRequest request = mockRestRequest(); auditor.tamperedRequest(request); - awaitAuditDocumentCreation(resolveIndexName()); - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "rest", "tampered_request"); Map sourceMap = hit.sourceAsMap(); assertThat(sourceMap.get("principal"), nullValue()); @@ -525,9 +516,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { initialize(); TransportRequest message = new RemoteHostMockTransportRequest(); auditor.tamperedRequest("_action", message); - awaitAuditDocumentCreation(resolveIndexName()); - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); Map sourceMap = hit.sourceAsMap(); assertAuditMessage(hit, "transport", "tampered_request"); assertEquals("transport", sourceMap.get("origin_type")); @@ -548,9 +538,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { user = new User("_username", new String[]{"r1"}); } auditor.tamperedRequest(user, "_action", message); - awaitAuditDocumentCreation(resolveIndexName()); - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "transport", "tampered_request"); Map sourceMap = hit.sourceAsMap(); @@ -570,9 +559,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { InetAddress inetAddress = InetAddress.getLoopbackAddress(); ShieldIpFilterRule rule = IPFilter.DEFAULT_PROFILE_ACCEPT_ALL; auditor.connectionGranted(inetAddress, "default", rule); - awaitAuditDocumentCreation(resolveIndexName()); - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "ip_filter", "connection_granted"); Map sourceMap = hit.sourceAsMap(); @@ -585,9 +573,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { InetAddress inetAddress = InetAddress.getLoopbackAddress(); ShieldIpFilterRule rule = new ShieldIpFilterRule(false, "_all"); auditor.connectionDenied(inetAddress, "default", rule); - awaitAuditDocumentCreation(resolveIndexName()); - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "ip_filter", "connection_denied"); Map sourceMap = hit.sourceAsMap(); @@ -600,9 +587,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); User user = new User("_username", new String[]{"r1"}, new User("running as", new String[] {"r2"})); auditor.runAsGranted(user, "_action", message); - awaitAuditDocumentCreation(resolveIndexName()); - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "transport", "run_as_granted"); Map sourceMap = hit.sourceAsMap(); assertEquals("transport", sourceMap.get("origin_type")); @@ -617,9 +603,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage()); User user = new User("_username", new String[]{"r1"}, new User("running as", new String[] {"r2"})); auditor.runAsDenied(user, "_action", message); - awaitAuditDocumentCreation(resolveIndexName()); - SearchHit hit = getIndexedAuditMessage(); + SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get()); assertAuditMessage(hit, "transport", "run_as_denied"); Map sourceMap = hit.sourceAsMap(); assertEquals("transport", sourceMap.get("origin_type")); @@ -700,57 +685,38 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { return request; } - private SearchHit getIndexedAuditMessage() { + private SearchHit getIndexedAuditMessage(Message message) throws InterruptedException { + assertNotNull("no audit message was enqueued", message); + final String indexName = IndexNameResolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, message.timestamp, rollover); + ensureYellow(indexName); + GetSettingsResponse settingsResponse = getClient().admin().indices().prepareGetSettings(indexName).get(); + assertThat(settingsResponse.getSetting(indexName, "index.number_of_shards"), is(Integer.toString(numShards))); + assertThat(settingsResponse.getSetting(indexName, "index.number_of_replicas"), is(Integer.toString(numReplicas))); - SearchResponse response = getClient().prepareSearch(resolveIndexName()) - .setTypes(IndexAuditTrail.DOC_TYPE) - .execute().actionGet(); + final SetOnce searchResponseSetOnce = new SetOnce<>(); + final boolean found = awaitBusy(() -> { + try { + SearchResponse searchResponse = getClient() + .prepareSearch(indexName) + .setTypes(IndexAuditTrail.DOC_TYPE) + .get(); + if (searchResponse.getHits().totalHits() > 0L) { + searchResponseSetOnce.set(searchResponse); + return true; + } + } catch (Exception e) { + logger.debug("caught exception while executing search", e); + } + return false; + }); + assertThat("no audit document exists!", found, is(true)); + SearchResponse response = searchResponseSetOnce.get(); + assertNotNull(response); assertEquals(1, response.getHits().getTotalHits()); return response.getHits().getHits()[0]; } - private String[] fieldList() { - return new String[] { - "@timestamp", - "node_name", - "node_host_name", - "node_host_address", - "layer", - "event_type", - "origin_address", - "origin_type", - "principal", - "run_by_principal", - "run_as_principal", - "action", - "indices", - "request", - "request_body", - "uri", - "realm", - "transport_profile", - "rule" - }; - } - - private void awaitAuditDocumentCreation(final String indexName) throws InterruptedException { - ensureYellow(indexName); - boolean found = awaitBusy(() -> { - try { - SearchResponse searchResponse = getClient().prepareSearch(indexName).setSize(0).setTerminateAfter(1).execute().actionGet(); - return searchResponse.getHits().totalHits() > 0L; - } catch (Exception e) { - return false; - } - }); - assertThat("no audit document exists!", found, is(true)); - - GetSettingsResponse response = getClient().admin().indices().prepareGetSettings(indexName).execute().actionGet(); - assertThat(response.getSetting(indexName, "index.number_of_shards"), is(Integer.toString(numShards))); - assertThat(response.getSetting(indexName, "index.number_of_replicas"), is(Integer.toString(numReplicas))); - } - @Override public ClusterHealthStatus ensureYellow(String... indices) { if (remoteIndexing == false) { @@ -771,10 +737,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { return actionGet.getStatus(); } - private String resolveIndexName() { - return IndexNameResolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, DateTime.now(DateTimeZone.UTC), rollover); - } - static String remoteHostAddress() throws Exception { return DummyTransportAddress.INSTANCE.toString(); }