test: IndexAuditTrailTests use the proper timestamp to resolve the index name

This changes the IndexAuditTrailTests to use the actual timestamp of the message being indexed to determine
the index name. Some build failures occurred due to running right at the change of an hour and the rollover was
set to hourly. So the message was indexed in one index and the test expected a different index.

Original commit: elastic/x-pack-elasticsearch@9dd5012a73
This commit is contained in:
jaymode 2016-05-26 09:47:59 -04:00
parent 13f9c931b6
commit ab3438cbc4
1 changed files with 55 additions and 93 deletions

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.shield.audit.index;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
@ -27,6 +28,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.shield.Security;
import org.elasticsearch.shield.audit.index.IndexAuditTrail.Message;
import org.elasticsearch.shield.transport.netty.ShieldNettyTransport;
import org.elasticsearch.shield.user.SystemUser;
import org.elasticsearch.shield.user.User;
@ -89,6 +91,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
private IndexNameResolver.Rollover rollover;
private IndexAuditTrail auditor;
private SetOnce<Message> enqueuedMessage;
private int numShards;
private int numReplicas;
private ThreadPool threadPool;
@ -255,7 +258,14 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
DummyTransportAddress.INSTANCE);
when(transport.boundAddress()).thenReturn(boundTransportAddress);
threadPool = new ThreadPool("index audit trail tests");
auditor = new IndexAuditTrail(settings, transport, Providers.of(internalClient()), threadPool, mock(ClusterService.class));
enqueuedMessage = new SetOnce<>();
auditor = new IndexAuditTrail(settings, transport, Providers.of(internalClient()), threadPool, mock(ClusterService.class)) {
@Override
void enqueue(Message message, String type) {
enqueuedMessage.set(message);
super.enqueue(message, type);
}
};
auditor.start(true);
}
@ -263,9 +273,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
initialize();
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
auditor.anonymousAccessDenied("_action", message);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "transport", "anonymous_access_denied");
Map<String, Object> sourceMap = hit.sourceAsMap();
if (message instanceof RemoteHostMockMessage) {
@ -287,9 +296,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
initialize();
RestRequest request = mockRestRequest();
auditor.anonymousAccessDenied(request);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "rest", "anonymous_access_denied");
Map<String, Object> sourceMap = hit.sourceAsMap();
@ -303,9 +310,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
initialize();
TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage();
auditor.authenticationFailed(new MockToken(), "_action", message);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
Map<String, Object> sourceMap = hit.sourceAsMap();
assertAuditMessage(hit, "transport", "authentication_failed");
@ -325,9 +330,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
initialize();
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
auditor.authenticationFailed("_action", message);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "transport", "authentication_failed");
Map<String, Object> sourceMap = hit.sourceAsMap();
@ -351,9 +354,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
initialize();
RestRequest request = mockRestRequest();
auditor.authenticationFailed(new MockToken(), request);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "rest", "authentication_failed");
Map<String, Object> sourceMap = hit.sourceAsMap();
@ -368,9 +369,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
initialize();
RestRequest request = mockRestRequest();
auditor.authenticationFailed(request);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "rest", "authentication_failed");
Map<String, Object> sourceMap = hit.sourceAsMap();
@ -385,9 +384,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
initialize();
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
auditor.authenticationFailed("_realm", new MockToken(), "_action", message);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "transport", "authentication_failed");
Map<String, Object> sourceMap = hit.sourceAsMap();
@ -413,9 +410,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
initialize();
RestRequest request = mockRestRequest();
auditor.authenticationFailed("_realm", new MockToken(), request);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "rest", "authentication_failed");
Map<String, Object> sourceMap = hit.sourceAsMap();
@ -438,9 +433,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
user = new User("_username", new String[]{"r1"});
}
auditor.accessGranted(user, "_action", message);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "transport", "access_granted");
Map<String, Object> sourceMap = hit.sourceAsMap();
assertEquals("transport", sourceMap.get("origin_type"));
@ -462,9 +456,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
initialize(new String[] { "system_access_granted" }, null);
TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage();
auditor.accessGranted(SystemUser.INSTANCE, "internal:_action", message);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "transport", "access_granted");
Map<String, Object> sourceMap = hit.sourceAsMap();
assertEquals("transport", sourceMap.get("origin_type"));
@ -485,9 +478,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
user = new User("_username", new String[]{"r1"});
}
auditor.accessDenied(user, "_action", message);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
Map<String, Object> sourceMap = hit.sourceAsMap();
assertAuditMessage(hit, "transport", "access_denied");
assertEquals("transport", sourceMap.get("origin_type"));
@ -509,9 +501,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
initialize();
RestRequest request = mockRestRequest();
auditor.tamperedRequest(request);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "rest", "tampered_request");
Map<String, Object> sourceMap = hit.sourceAsMap();
assertThat(sourceMap.get("principal"), nullValue());
@ -525,9 +516,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
initialize();
TransportRequest message = new RemoteHostMockTransportRequest();
auditor.tamperedRequest("_action", message);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
Map<String, Object> sourceMap = hit.sourceAsMap();
assertAuditMessage(hit, "transport", "tampered_request");
assertEquals("transport", sourceMap.get("origin_type"));
@ -548,9 +538,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
user = new User("_username", new String[]{"r1"});
}
auditor.tamperedRequest(user, "_action", message);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "transport", "tampered_request");
Map<String, Object> sourceMap = hit.sourceAsMap();
@ -570,9 +559,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
InetAddress inetAddress = InetAddress.getLoopbackAddress();
ShieldIpFilterRule rule = IPFilter.DEFAULT_PROFILE_ACCEPT_ALL;
auditor.connectionGranted(inetAddress, "default", rule);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "ip_filter", "connection_granted");
Map<String, Object> sourceMap = hit.sourceAsMap();
@ -585,9 +573,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
InetAddress inetAddress = InetAddress.getLoopbackAddress();
ShieldIpFilterRule rule = new ShieldIpFilterRule(false, "_all");
auditor.connectionDenied(inetAddress, "default", rule);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "ip_filter", "connection_denied");
Map<String, Object> sourceMap = hit.sourceAsMap();
@ -600,9 +587,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
User user = new User("_username", new String[]{"r1"}, new User("running as", new String[] {"r2"}));
auditor.runAsGranted(user, "_action", message);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "transport", "run_as_granted");
Map<String, Object> sourceMap = hit.sourceAsMap();
assertEquals("transport", sourceMap.get("origin_type"));
@ -617,9 +603,8 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
User user = new User("_username", new String[]{"r1"}, new User("running as", new String[] {"r2"}));
auditor.runAsDenied(user, "_action", message);
awaitAuditDocumentCreation(resolveIndexName());
SearchHit hit = getIndexedAuditMessage();
SearchHit hit = getIndexedAuditMessage(enqueuedMessage.get());
assertAuditMessage(hit, "transport", "run_as_denied");
Map<String, Object> sourceMap = hit.sourceAsMap();
assertEquals("transport", sourceMap.get("origin_type"));
@ -700,57 +685,38 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
return request;
}
private SearchHit getIndexedAuditMessage() {
private SearchHit getIndexedAuditMessage(Message message) throws InterruptedException {
assertNotNull("no audit message was enqueued", message);
final String indexName = IndexNameResolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, message.timestamp, rollover);
ensureYellow(indexName);
GetSettingsResponse settingsResponse = getClient().admin().indices().prepareGetSettings(indexName).get();
assertThat(settingsResponse.getSetting(indexName, "index.number_of_shards"), is(Integer.toString(numShards)));
assertThat(settingsResponse.getSetting(indexName, "index.number_of_replicas"), is(Integer.toString(numReplicas)));
SearchResponse response = getClient().prepareSearch(resolveIndexName())
.setTypes(IndexAuditTrail.DOC_TYPE)
.execute().actionGet();
final SetOnce<SearchResponse> searchResponseSetOnce = new SetOnce<>();
final boolean found = awaitBusy(() -> {
try {
SearchResponse searchResponse = getClient()
.prepareSearch(indexName)
.setTypes(IndexAuditTrail.DOC_TYPE)
.get();
if (searchResponse.getHits().totalHits() > 0L) {
searchResponseSetOnce.set(searchResponse);
return true;
}
} catch (Exception e) {
logger.debug("caught exception while executing search", e);
}
return false;
});
assertThat("no audit document exists!", found, is(true));
SearchResponse response = searchResponseSetOnce.get();
assertNotNull(response);
assertEquals(1, response.getHits().getTotalHits());
return response.getHits().getHits()[0];
}
private String[] fieldList() {
return new String[] {
"@timestamp",
"node_name",
"node_host_name",
"node_host_address",
"layer",
"event_type",
"origin_address",
"origin_type",
"principal",
"run_by_principal",
"run_as_principal",
"action",
"indices",
"request",
"request_body",
"uri",
"realm",
"transport_profile",
"rule"
};
}
private void awaitAuditDocumentCreation(final String indexName) throws InterruptedException {
ensureYellow(indexName);
boolean found = awaitBusy(() -> {
try {
SearchResponse searchResponse = getClient().prepareSearch(indexName).setSize(0).setTerminateAfter(1).execute().actionGet();
return searchResponse.getHits().totalHits() > 0L;
} catch (Exception e) {
return false;
}
});
assertThat("no audit document exists!", found, is(true));
GetSettingsResponse response = getClient().admin().indices().prepareGetSettings(indexName).execute().actionGet();
assertThat(response.getSetting(indexName, "index.number_of_shards"), is(Integer.toString(numShards)));
assertThat(response.getSetting(indexName, "index.number_of_replicas"), is(Integer.toString(numReplicas)));
}
@Override
public ClusterHealthStatus ensureYellow(String... indices) {
if (remoteIndexing == false) {
@ -771,10 +737,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
return actionGet.getStatus();
}
private String resolveIndexName() {
return IndexNameResolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, DateTime.now(DateTimeZone.UTC), rollover);
}
static String remoteHostAddress() throws Exception {
return DummyTransportAddress.INSTANCE.toString();
}