change how audit user is compared, do not setDaemon, test cleanup

This commit makes a few modifications to the IndexAuditTrail class:

* Use `InternalAuditUser#is` to determine if the principal is the auditor when we have a user
and simply compare `InternalAuditUser#NAME` when only a string principal is available
* Remove the `Thread#setDaemon` call in the QueueConsumer as this thread should be terminated
as part of the shutdown of the node

In terms of tests, there are some issues and changes to how we test certain aspects. The muted tests
were not accurate since the tests immediately checked for the existence of an index and did not poll or
wait and this operation is asynchronous so the index could be created after the exists request was
executed. These tests were removed and a new class was added to test the muted behavior. In these
tests we override the audit trails implementation of a queue, which will set a flag to indicate a message
has been added to the queue. This is a synchronous operation so it can be checked immediately.

The other tests in the IndexAuditTrail tests remain but a few changes have been made to the execution.

* ensureYellow is called for the index we expect to be created before searching for documents
* the remote cluster is only setup at the beginning of the suite rather than before every test to ensure
quicker execution
* the maximum number of shards has been reduced to three since we do not really need up to 10 shards
for a single document

Original commit: elastic/x-pack-elasticsearch@501b6ce9da
This commit is contained in:
jaymode 2016-04-27 18:39:51 -04:00
parent 27f0a68a28
commit de48b2426b
4 changed files with 464 additions and 341 deletions

View File

@ -75,6 +75,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@ -151,7 +152,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
private final String nodeName;
private final Provider<InternalClient> clientProvider;
private final LinkedBlockingQueue<Message> eventQueue;
private final BlockingQueue<Message> eventQueue;
private final QueueConsumer queueConsumer;
private final Transport transport;
private final ThreadPool threadPool;
@ -181,9 +182,8 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
this.clusterService = clusterService;
this.nodeName = settings.get("name");
this.queueConsumer = new QueueConsumer(EsExecutors.threadName(settings, "audit-queue-consumer"));
int maxQueueSize = QUEUE_SIZE_SETTING.get(settings);
this.eventQueue = new LinkedBlockingQueue<>(maxQueueSize);
this.eventQueue = createQueue(maxQueueSize);
// we have to initialize this here since we use rollover in determining if we can start...
rollover = ROLLOVER_SETTING.get(settings);
@ -373,7 +373,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
@Override
public void authenticationFailed(AuthenticationToken token, String action, TransportMessage message) {
if (events.contains(AUTHENTICATION_FAILED)) {
if (!principalIsAuditor(token.principal())) {
if (XPackUser.is(token.principal()) == false) {
try {
enqueue(message("authentication_failed", action, token, null, indices(message), message), "authentication_failed");
} catch (Exception e) {
@ -386,7 +386,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
@Override
public void authenticationFailed(AuthenticationToken token, RestRequest request) {
if (events.contains(AUTHENTICATION_FAILED)) {
if (!principalIsAuditor(token.principal())) {
if (XPackUser.is(token.principal()) == false) {
try {
enqueue(message("authentication_failed", null, token, null, null, request), "authentication_failed");
} catch (Exception e) {
@ -399,7 +399,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
@Override
public void authenticationFailed(String realm, AuthenticationToken token, String action, TransportMessage message) {
if (events.contains(AUTHENTICATION_FAILED)) {
if (!principalIsAuditor(token.principal())) {
if (XPackUser.is(token.principal()) == false) {
try {
enqueue(message("authentication_failed", action, token, realm, indices(message), message), "authentication_failed");
} catch (Exception e) {
@ -412,7 +412,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
@Override
public void authenticationFailed(String realm, AuthenticationToken token, RestRequest request) {
if (events.contains(AUTHENTICATION_FAILED)) {
if (!principalIsAuditor(token.principal())) {
if (XPackUser.is(token.principal()) == false) {
try {
enqueue(message("authentication_failed", null, token, realm, null, request), "authentication_failed");
} catch (Exception e) {
@ -424,35 +424,31 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
@Override
public void accessGranted(User user, String action, TransportMessage message) {
if (!principalIsAuditor(user.principal())) {
// special treatment for internal system actions - only log if explicitly told to
if ((SystemUser.is(user) && SystemPrivilege.INSTANCE.predicate().test(action)) || XPackUser.is(user)) {
if (events.contains(SYSTEM_ACCESS_GRANTED)) {
try {
enqueue(message("access_granted", action, user, indices(message), message), "access_granted");
} catch (Exception e) {
logger.warn("failed to index audit event: [access_granted]", e);
}
}
} else if (events.contains(ACCESS_GRANTED)) {
// special treatment for internal system actions - only log if explicitly told to
if ((SystemUser.is(user) && SystemPrivilege.INSTANCE.predicate().test(action))) {
if (events.contains(SYSTEM_ACCESS_GRANTED)) {
try {
enqueue(message("access_granted", action, user, indices(message), message), "access_granted");
} catch (Exception e) {
logger.warn("failed to index audit event: [access_granted]", e);
}
}
} else if (events.contains(ACCESS_GRANTED) && XPackUser.is(user) == false) {
try {
enqueue(message("access_granted", action, user, indices(message), message), "access_granted");
} catch (Exception e) {
logger.warn("failed to index audit event: [access_granted]", e);
}
}
}
@Override
public void accessDenied(User user, String action, TransportMessage message) {
if (events.contains(ACCESS_DENIED)) {
if (!principalIsAuditor(user.principal())) {
try {
enqueue(message("access_denied", action, user, indices(message), message), "access_denied");
} catch (Exception e) {
logger.warn("failed to index audit event: [access_denied]", e);
}
if (events.contains(ACCESS_DENIED) && XPackUser.is(user) == false) {
try {
enqueue(message("access_denied", action, user, indices(message), message), "access_denied");
} catch (Exception e) {
logger.warn("failed to index audit event: [access_denied]", e);
}
}
}
@ -481,13 +477,11 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
@Override
public void tamperedRequest(User user, String action, TransportMessage request) {
if (events.contains(TAMPERED_REQUEST)) {
if (!principalIsAuditor(user.principal())) {
try {
enqueue(message("tampered_request", action, user, indices(request), request), "tampered_request");
} catch (Exception e) {
logger.warn("failed to index audit event: [tampered_request]", e);
}
if (events.contains(TAMPERED_REQUEST) && XPackUser.is(user) == false) {
try {
enqueue(message("tampered_request", action, user, indices(request), request), "tampered_request");
} catch (Exception e) {
logger.warn("failed to index audit event: [tampered_request]", e);
}
}
}
@ -536,10 +530,6 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
}
}
private boolean principalIsAuditor(String principal) {
return principal.equals(XPackUser.INSTANCE.principal());
}
private Message message(String type, @Nullable String action, @Nullable User user,
@Nullable String[] indices, TransportMessage message) throws Exception {
@ -819,6 +809,10 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
}
}
BlockingQueue<Message> createQueue(int maxQueueSize) {
return new LinkedBlockingQueue<>(maxQueueSize);
}
private void initializeBulkProcessor() {
final int bulkSize = BULK_SIZE_SETTING.get(settings);
@ -900,7 +894,6 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
QueueConsumer(String name) {
super(name);
setDaemon(true);
}
@Override
@ -947,25 +940,25 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
}
interface Field {
String TIMESTAMP = new String("@timestamp");
String NODE_NAME = new String("node_name");
String NODE_HOST_NAME = new String("node_host_name");
String NODE_HOST_ADDRESS = new String("node_host_address");
String LAYER = new String("layer");
String TYPE = new String("event_type");
String ORIGIN_ADDRESS = new String("origin_address");
String ORIGIN_TYPE = new String("origin_type");
String PRINCIPAL = new String("principal");
String RUN_AS_PRINCIPAL = new String("run_as_principal");
String RUN_BY_PRINCIPAL = new String("run_by_principal");
String ACTION = new String("action");
String INDICES = new String("indices");
String REQUEST = new String("request");
String REQUEST_BODY = new String("request_body");
String URI = new String("uri");
String REALM = new String("realm");
String TRANSPORT_PROFILE = new String("transport_profile");
String RULE = new String("rule");
String TIMESTAMP = "@timestamp";
String NODE_NAME = "node_name";
String NODE_HOST_NAME = "node_host_name";
String NODE_HOST_ADDRESS = "node_host_address";
String LAYER = "layer";
String TYPE = "event_type";
String ORIGIN_ADDRESS = "origin_address";
String ORIGIN_TYPE = "origin_type";
String PRINCIPAL = "principal";
String RUN_AS_PRINCIPAL = "run_as_principal";
String RUN_BY_PRINCIPAL = "run_by_principal";
String ACTION = "action";
String INDICES = "indices";
String REQUEST = "request";
String REQUEST_BODY = "request_body";
String URI = "uri";
String REALM = "realm";
String TRANSPORT_PROFILE = "transport_profile";
String RULE = "rule";
}
public enum State {

View File

@ -35,4 +35,8 @@ public class XPackUser extends ReservedUser {
public static boolean is(User user) {
return INSTANCE.equals(user);
}
public static boolean is(String principal) {
return NAME.equals(principal);
}
}

View File

@ -0,0 +1,274 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.shield.audit.index;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.shield.InternalClient;
import org.elasticsearch.shield.audit.index.IndexAuditTrail.State;
import org.elasticsearch.shield.authc.AuthenticationToken;
import org.elasticsearch.shield.transport.filter.ShieldIpFilterRule;
import org.elasticsearch.shield.user.SystemUser;
import org.elasticsearch.shield.user.User;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportMessage;
import org.junit.After;
import org.junit.Before;
import java.net.InetAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class IndexAuditTrailMutedTests extends ESTestCase {
private InternalClient client;
private TransportClient transportClient;
private ThreadPool threadPool;
private Transport transport;
private IndexAuditTrail auditTrail;
private AtomicBoolean messageEnqueued;
private AtomicBoolean clientCalled;
@Before
public void setup() {
transport = mock(Transport.class);
when(transport.boundAddress()).thenReturn(new BoundTransportAddress(new TransportAddress[] { DummyTransportAddress.INSTANCE },
DummyTransportAddress.INSTANCE));
threadPool = new ThreadPool("index audit trail tests");
transportClient = TransportClient.builder().settings(Settings.EMPTY).build();
clientCalled = new AtomicBoolean(false);
client = new InternalClient(transportClient) {
@Override
protected <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
clientCalled.set(true);
}
};
messageEnqueued = new AtomicBoolean(false);
}
@After
public void stop() {
if (auditTrail != null) {
auditTrail.close();
}
if (transportClient != null) {
transportClient.close();
}
threadPool.shutdown();
}
public void testAnonymousAccessDeniedMutedTransport() {
createAuditTrail(new String[] { "anonymous_access_denied" });
TransportMessage message = mock(TransportMessage.class);
auditTrail.anonymousAccessDenied("_action", message);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(message);
}
public void testAnonymousAccessDeniedMutedRest() {
createAuditTrail(new String[] { "anonymous_access_denied" });
RestRequest restRequest = mock(RestRequest.class);
auditTrail.anonymousAccessDenied(restRequest);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(restRequest);
}
public void testAuthenticationFailedMutedTransport() {
createAuditTrail(new String[] { "authentication_failed" });
TransportMessage message = mock(TransportMessage.class);
AuthenticationToken token = mock(AuthenticationToken.class);
// with realm
auditTrail.authenticationFailed(randomAsciiOfLengthBetween(2, 10), token, "_action", message);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
// without realm
auditTrail.authenticationFailed(token, "_action", message);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
// without the token
auditTrail.authenticationFailed("_action", message);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(token, message);
}
public void testAuthenticationFailedMutedRest() {
createAuditTrail(new String[] { "authentication_failed" });
RestRequest restRequest = mock(RestRequest.class);
AuthenticationToken token = mock(AuthenticationToken.class);
// with realm
auditTrail.authenticationFailed(randomAsciiOfLengthBetween(2, 10), token, restRequest);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
// without the realm
auditTrail.authenticationFailed(token, restRequest);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
// without the token
auditTrail.authenticationFailed(restRequest);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(token, restRequest);
}
public void testAccessGrantedMuted() {
createAuditTrail(new String[] { "access_granted" });
TransportMessage message = mock(TransportMessage.class);
User user = mock(User.class);
auditTrail.accessGranted(user, randomAsciiOfLengthBetween(6, 40), message);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(message, user);
}
public void testSystemAccessGrantedMuted() {
createAuditTrail(randomFrom(new String[] { "access_granted" }, null));
TransportMessage message = mock(TransportMessage.class);
User user = SystemUser.INSTANCE;
auditTrail.accessGranted(user, "internal:foo", message);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(message);
}
public void testAccessDeniedMuted() {
createAuditTrail(new String[] { "access_denied" });
TransportMessage message = mock(TransportMessage.class);
User user = mock(User.class);
auditTrail.accessDenied(user, randomAsciiOfLengthBetween(6, 40), message);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(message, user);
}
public void testTamperedRequestMuted() {
createAuditTrail(new String[] { "tampered_request" });
TransportMessage message = mock(TransportMessage.class);
User user = mock(User.class);
// with user
auditTrail.tamperedRequest(user, randomAsciiOfLengthBetween(6, 40), message);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
// without user
auditTrail.tamperedRequest(randomAsciiOfLengthBetween(6, 40), message);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(message, user);
}
public void testConnectionGrantedMuted() {
createAuditTrail(new String[] { "connection_granted" });
InetAddress address = mock(InetAddress.class);
ShieldIpFilterRule rule = mock(ShieldIpFilterRule.class);
auditTrail.connectionGranted(address, randomAsciiOfLengthBetween(1, 12), rule);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(address, rule);
}
public void testConnectionDeniedMuted() {
createAuditTrail(new String[] { "connection_denied" });
InetAddress address = mock(InetAddress.class);
ShieldIpFilterRule rule = mock(ShieldIpFilterRule.class);
auditTrail.connectionDenied(address, randomAsciiOfLengthBetween(1, 12), rule);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(address, rule);
}
public void testRunAsGrantedMuted() {
createAuditTrail(new String[] { "run_as_granted" });
TransportMessage message = mock(TransportMessage.class);
User user = mock(User.class);
auditTrail.runAsGranted(user, randomAsciiOfLengthBetween(6, 40), message);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(message, user);
}
public void testRunAsDeniedMuted() {
createAuditTrail(new String[] { "run_as_denied" });
TransportMessage message = mock(TransportMessage.class);
User user = mock(User.class);
auditTrail.runAsDenied(user, randomAsciiOfLengthBetween(6, 40), message);
assertThat(messageEnqueued.get(), is(false));
assertThat(clientCalled.get(), is(false));
verifyZeroInteractions(message, user);
}
IndexAuditTrail createAuditTrail(String[] excludes) {
Settings settings = IndexAuditTrailTests.levelSettings(null, excludes);
auditTrail = new IndexAuditTrail(settings, transport, Providers.of(client), threadPool, mock(ClusterService.class)) {
@Override
void putTemplate(Settings settings) {
// make this a no-op so we don't have to stub out unnecessary client activities
}
@Override
BlockingQueue<Message> createQueue(int maxQueueSize) {
return new LinkedBlockingQueue<Message>(maxQueueSize) {
@Override
public boolean offer(Message message) {
messageEnqueued.set(true);
return super.offer(message);
}
};
}
};
auditTrail.start(true);
assertThat(auditTrail.state(), is(State.STARTED));
return auditTrail;
}
}

View File

@ -5,14 +5,17 @@
*/
package org.elasticsearch.shield.audit.index;
import org.apache.lucene.util.LuceneTestCase.BadApple;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.settings.Settings;
@ -21,13 +24,13 @@ import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.shield.Security;
import org.elasticsearch.shield.user.SystemUser;
import org.elasticsearch.shield.user.User;
import org.elasticsearch.shield.authc.AuthenticationToken;
import org.elasticsearch.shield.crypto.InternalCryptoService;
import org.elasticsearch.shield.transport.filter.IPFilter;
import org.elasticsearch.shield.transport.filter.ShieldIpFilterRule;
import org.elasticsearch.test.ESIntegTestCase;
@ -39,11 +42,13 @@ import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInfo;
import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.XPackPlugin;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.net.InetAddress;
@ -61,8 +66,6 @@ import static org.elasticsearch.shield.audit.index.IndexNameResolver.Rollover.MO
import static org.elasticsearch.shield.audit.index.IndexNameResolver.Rollover.WEEKLY;
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.elasticsearch.test.InternalTestCluster.clusterName;
import static org.elasticsearch.test.ShieldSettingsSource.DEFAULT_PASSWORD;
import static org.elasticsearch.test.ShieldSettingsSource.DEFAULT_USER_NAME;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -74,26 +77,120 @@ import static org.mockito.Mockito.when;
/**
*
*/
//test is just too slow, please fix it to not be sleep-based
@BadApple(bugUrl = "https://github.com/elastic/x-plugins/issues/1007")
@ESIntegTestCase.ClusterScope(scope = SUITE, numDataNodes = 1)
public class IndexAuditTrailTests extends ShieldIntegTestCase {
public static final String SECOND_CLUSTER_NODE_PREFIX = "remote_" + SUITE_CLUSTER_NODE_PREFIX;
private static boolean remoteIndexing;
private static InternalTestCluster remoteCluster;
private static Settings remoteSettings;
private static byte[] systemKey;
private IndexNameResolver.Rollover rollover;
private IndexAuditTrail auditor;
private boolean remoteIndexing = false;
private InternalTestCluster cluster2;
private Client remoteClient;
private int numShards;
private int numReplicas;
private ThreadPool threadPool;
@BeforeClass
public static void configureBeforeClass() {
remoteIndexing = randomBoolean();
systemKey = InternalCryptoService.generateKey();
if (remoteIndexing == false) {
remoteSettings = Settings.EMPTY;
}
}
@AfterClass
public static void cleanupAfterTest() {
if (remoteCluster != null) {
remoteCluster.close();
remoteCluster = null;
}
remoteSettings = null;
}
@Before
public void initializeRemoteClusterIfNecessary() throws Exception {
if (remoteIndexing == false) {
logger.info("--> remote indexing disabled.");
return;
}
if (remoteCluster != null) {
return;
}
// create another cluster
String cluster2Name = clusterName(Scope.SUITE.name(), randomLong());
// Setup a second test cluster with randomization for number of nodes, shield enabled, and SSL
final int numNodes = randomIntBetween(1, 2);
final boolean useShield = randomBoolean();
final boolean useSSL = useShield && randomBoolean();
logger.info("--> remote indexing enabled. shield enabled: [{}], SSL enabled: [{}], nodes: [{}]", useShield, useSSL, numNodes);
ShieldSettingsSource cluster2SettingsSource =
new ShieldSettingsSource(numNodes, useSSL, systemKey(), createTempDir(), Scope.SUITE) {
@Override
public Settings nodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(Security.enabledSetting(), useShield);
return builder.build();
}
};
remoteCluster = new InternalTestCluster("network", randomLong(), createTempDir(), numNodes, numNodes, cluster2Name,
cluster2SettingsSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, getMockPlugins(),
useShield ? getClientWrapper() : Function.identity());
remoteCluster.beforeTest(random(), 0.5);
NodesInfoResponse response = remoteCluster.client().admin().cluster().prepareNodesInfo().execute().actionGet();
TransportInfo info = response.getNodes()[0].getTransport();
InetSocketTransportAddress inet = (InetSocketTransportAddress) info.address().publishAddress();
Settings.Builder builder = Settings.builder()
.put(Security.enabledSetting(), useShield)
.put(remoteSettings(NetworkAddress.format(inet.address().getAddress()), inet.address().getPort(), cluster2Name))
.put("xpack.security.audit.index.client.xpack.security.user", ShieldSettingsSource.DEFAULT_USER_NAME + ":" +
ShieldSettingsSource.DEFAULT_PASSWORD);
if (useSSL) {
for (Map.Entry<String, String> entry : cluster2SettingsSource.getClientSSLSettings().getAsMap().entrySet()) {
builder.put("xpack.security.audit.index.client." + entry.getKey(), entry.getValue());
}
}
remoteSettings = builder.build();
}
@After
public void afterTest() {
if (threadPool != null) {
threadPool.shutdown();
}
if (auditor != null) {
auditor.close();
}
if (remoteCluster != null) {
remoteCluster.wipe(Collections.<String>emptySet());
}
}
@Override
protected Set<String> excludeTemplates() {
return Collections.singleton(IndexAuditTrail.INDEX_TEMPLATE_NAME);
}
@Override
protected byte[] systemKey() {
return systemKey;
}
@Override
protected int maximumNumberOfShards() {
return 3;
}
private Settings commonSettings(IndexNameResolver.Rollover rollover) {
return Settings.builder()
.put("xpack.security.audit.enabled", true)
@ -106,14 +203,14 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
.build();
}
private Settings remoteSettings(String address, int port, String clusterName) {
static Settings remoteSettings(String address, int port, String clusterName) {
return Settings.builder()
.put("xpack.security.audit.index.client.hosts", address + ":" + port)
.put("xpack.security.audit.index.client.cluster.name", clusterName)
.build();
}
private Settings levelSettings(String[] includes, String[] excludes) {
static Settings levelSettings(String[] includes, String[] excludes) {
Settings.Builder builder = Settings.builder();
if (includes != null) {
builder.putArray("xpack.security.audit.index.events.include", includes);
@ -132,92 +229,33 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
}
private Client getClient() {
return remoteIndexing ? remoteClient : client();
return remoteIndexing ? remoteCluster.client() : client();
}
private void initialize(String... excludes) throws IOException, InterruptedException {
initialize(null, excludes);
private void initialize() throws IOException, InterruptedException {
initialize(null, null);
}
private void initialize(String[] includes, String[] excludes) throws IOException, InterruptedException {
rollover = randomFrom(HOURLY, DAILY, WEEKLY, MONTHLY);
numReplicas = numberOfReplicas();
numShards = numberOfShards();
Settings settings = settings(rollover, includes, excludes);
remoteIndexing = randomBoolean();
Settings.Builder builder = Settings.builder();
if (remoteIndexing) {
// create another cluster
String cluster2Name = clusterName(Scope.SUITE.name(), randomLong());
// Setup a second test cluster with randomization for number of nodes, shield enabled, and SSL
final int numNodes = randomIntBetween(1, 2);
final boolean useShield = randomBoolean();
final boolean useSSL = useShield && randomBoolean();
logger.info("--> remote indexing enabled. shield enabled: [{}], SSL enabled: [{}]", useShield, useSSL);
ShieldSettingsSource cluster2SettingsSource = new ShieldSettingsSource(numNodes, useSSL, systemKey(), createTempDir(),
Scope.SUITE) {
@Override
public Settings nodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(XPackPlugin.featureEnabledSetting(Security.NAME), useShield);
return builder.build();
}
};
cluster2 = new InternalTestCluster("network", randomLong(), createTempDir(), numNodes, numNodes, cluster2Name,
cluster2SettingsSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, getMockPlugins(),
useShield ? getClientWrapper() : Function.identity());
cluster2.beforeTest(random(), 0.5);
remoteClient = cluster2.client();
NodesInfoResponse response = remoteClient.admin().cluster().prepareNodesInfo().execute().actionGet();
TransportInfo info = response.getNodes()[0].getTransport();
InetSocketTransportAddress inet = (InetSocketTransportAddress) info.address().publishAddress();
Settings.Builder builder = Settings.builder()
.put(settings)
.put(XPackPlugin.featureEnabledSetting(Security.NAME), useShield)
.put(remoteSettings(NetworkAddress.format(inet.address().getAddress()), inet.address().getPort(), cluster2Name))
.put("xpack.security.audit.index.client." + Security.USER_SETTING.getKey(), DEFAULT_USER_NAME + ":" + DEFAULT_PASSWORD);
if (useSSL) {
for (Map.Entry<String, String> entry : cluster2SettingsSource.getClientSSLSettings().getAsMap().entrySet()) {
builder.put("xpack.security.audit.index.client." + entry.getKey(), entry.getValue());
}
}
settings = builder.build();
builder.put(remoteSettings);
}
settings = Settings.builder().put(settings).put("path.home", createTempDir()).build();
Settings settings = builder.put(settings(rollover, includes, excludes)).build();
logger.info("--> settings: [{}]", settings.getAsMap().toString());
Transport transport = mock(Transport.class);
BoundTransportAddress boundTransportAddress = new BoundTransportAddress(new TransportAddress[]{DummyTransportAddress.INSTANCE},
DummyTransportAddress.INSTANCE);
when(transport.boundAddress()).thenReturn(boundTransportAddress);
threadPool = new ThreadPool("index audit trail tests");
auditor = new IndexAuditTrail(settings, transport, Providers.of(internalClient()), threadPool, mock(ClusterService.class));
auditor.start(true);
}
@After
public void afterTest() {
if (threadPool != null) {
threadPool.shutdown();
}
if (auditor != null) {
auditor.close();
}
cluster().wipe(Collections.singleton(IndexAuditTrail.INDEX_TEMPLATE_NAME));
if (remoteIndexing && cluster2 != null) {
cluster2.wipe(Collections.singleton(IndexAuditTrail.INDEX_TEMPLATE_NAME));
remoteClient.close();
cluster2.close();
}
}
public void testAnonymousAccessDeniedTransport() throws Exception {
initialize();
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
@ -242,18 +280,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertEquals(sourceMap.get("request"), message.getClass().getSimpleName());
}
public void testAnonymousAccessDeniedTransportMuted() throws Exception {
initialize("anonymous_access_denied");
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
auditor.anonymousAccessDenied("_action", message);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testAnonymousAccessDeniedRest() throws Exception {
initialize();
RestRequest request = mockRestRequest();
@ -270,18 +296,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertThat(sourceMap.get("request_body"), notNullValue());
}
public void testAnonymousAccessDeniedRestMuted() throws Exception {
initialize("anonymous_access_denied");
RestRequest request = mockRestRequest();
auditor.anonymousAccessDenied(request);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testAuthenticationFailedTransport() throws Exception {
initialize();
TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage();
@ -330,30 +344,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertEquals(sourceMap.get("request"), message.getClass().getSimpleName());
}
public void testAuthenticationFailed_Transport_Muted() throws Exception {
initialize("authentication_failed");
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
auditor.authenticationFailed(new MockToken(), "_action", message);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testAuthenticationFailedTransportNoTokenMuted() throws Exception {
initialize("authentication_failed");
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
auditor.authenticationFailed("_action", message);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testAuthenticationFailedRest() throws Exception {
initialize();
RestRequest request = mockRestRequest();
@ -388,30 +378,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertThat(sourceMap.get("request_body"), notNullValue());
}
public void testAuthenticationFailedRestMuted() throws Exception {
initialize("authentication_failed");
RestRequest request = mockRestRequest();
auditor.authenticationFailed(new MockToken(), request);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testAuthenticationFailedRestNoTokenMuted() throws Exception {
initialize("authentication_failed");
RestRequest request = mockRestRequest();
auditor.authenticationFailed(request);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testAuthenticationFailedTransportRealm() throws Exception {
initialize();
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
@ -440,18 +406,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertEquals(sourceMap.get("request"), message.getClass().getSimpleName());
}
public void testAuthenticationFailedTransportRealmMuted() throws Exception {
initialize("authentication_failed");
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
auditor.authenticationFailed("_realm", new MockToken(), "_action", message);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testAuthenticationFailedRestRealm() throws Exception {
initialize();
RestRequest request = mockRestRequest();
@ -469,18 +423,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertThat(sourceMap.get("request_body"), notNullValue());
}
public void testAuthenticationFailedRestRealmMuted() throws Exception {
initialize("authentication_failed");
RestRequest request = mockRestRequest();
auditor.authenticationFailed("_realm", new MockToken(), request);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testAccessGranted() throws Exception {
initialize();
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
@ -513,17 +455,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertEquals(sourceMap.get("request"), message.getClass().getSimpleName());
}
public void testAccessGrantedMuted() throws Exception {
initialize("access_granted");
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
auditor.accessGranted(new User("_username", "r1"), "_action", message);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testSystemAccessGranted() throws Exception {
initialize(new String[] { "system_access_granted" }, null);
TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage();
@ -539,18 +470,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertEquals(sourceMap.get("request"), message.getClass().getSimpleName());
}
public void testSystemAccessGrantedMuted() throws Exception {
initialize();
TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage();
auditor.accessGranted(SystemUser.INSTANCE, "internal:_action", message);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testAccessDenied() throws Exception {
initialize();
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
@ -583,18 +502,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertEquals(sourceMap.get("request"), message.getClass().getSimpleName());
}
public void testAccessDenied_Muted() throws Exception {
initialize("access_denied");
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
auditor.accessDenied(new User("_username", "r1"), "_action", message);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testTamperedRequestRest() throws Exception {
initialize();
RestRequest request = mockRestRequest();
@ -655,32 +562,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertEquals(sourceMap.get("request"), message.getClass().getSimpleName());
}
public void testTamperedRequestMuted() throws Exception {
initialize("tampered_request");
TransportRequest message = new RemoteHostMockTransportRequest();
final int type = randomIntBetween(0, 2);
switch (type) {
case 0:
auditor.tamperedRequest(new User("_username", new String[]{"r1"}), "_action", message);
break;
case 1:
auditor.tamperedRequest("_action", message);
break;
case 2:
auditor.tamperedRequest(mockRestRequest());
break;
default:
throw new IllegalStateException("invalid value for type: " + type);
}
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testConnectionGranted() throws Exception {
initialize();
InetAddress inetAddress = InetAddress.getLoopbackAddress();
@ -696,19 +577,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertEquals("default", sourceMap.get("transport_profile"));
}
public void testConnectionGrantedMuted() throws Exception {
initialize("connection_granted");
InetAddress inetAddress = InetAddress.getLoopbackAddress();
ShieldIpFilterRule rule = IPFilter.DEFAULT_PROFILE_ACCEPT_ALL;
auditor.connectionGranted(inetAddress, "default", rule);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testConnectionDenied() throws Exception {
initialize();
InetAddress inetAddress = InetAddress.getLoopbackAddress();
@ -724,19 +592,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertEquals("default", sourceMap.get("transport_profile"));
}
public void testConnectionDeniedMuted() throws Exception {
initialize("connection_denied");
InetAddress inetAddress = InetAddress.getLoopbackAddress();
ShieldIpFilterRule rule = new ShieldIpFilterRule(false, "_all");
auditor.connectionDenied(inetAddress, "default", rule);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testRunAsGranted() throws Exception {
initialize();
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
@ -754,18 +609,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertEquals(sourceMap.get("request"), message.getClass().getSimpleName());
}
public void testRunAsGrantedMuted() throws Exception {
initialize("run_as_granted");
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
auditor.runAsGranted(new User("_username", new String[]{"r1"}, new User("running as", new String[]{"r2"})), "_action", message);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
public void testRunAsDenied() throws Exception {
initialize();
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
@ -783,18 +626,6 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertEquals(sourceMap.get("request"), message.getClass().getSimpleName());
}
public void testRunAsDeniedMuted() throws Exception {
initialize("run_as_denied");
TransportMessage message = randomFrom(new RemoteHostMockMessage(), new LocalHostMockMessage(), new MockIndicesTransportMessage());
auditor.runAsDenied(new User("_username", new String[]{"r1"}, new User("running as", new String[]{"r2"})), "_action", message);
try {
getClient().prepareSearch(resolveIndexName()).setSize(0).setTerminateAfter(1).execute().actionGet();
fail("Expected IndexNotFoundException");
} catch (IndexNotFoundException e) {
assertThat(e.getMessage(), is("no such index"));
}
}
private void assertAuditMessage(SearchHit hit, String layer, String type) {
Map<String, Object> sourceMap = hit.sourceAsMap();
assertThat(sourceMap.get("@timestamp"), notNullValue());
@ -901,10 +732,11 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
}
private void awaitAuditDocumentCreation(final String indexName) throws InterruptedException {
ensureYellow(indexName);
boolean found = awaitBusy(() -> {
try {
SearchResponse searchResponse = getClient().prepareSearch(indexName).setSize(0).setTerminateAfter(1).execute().actionGet();
return searchResponse.getHits().totalHits() > 0;
return searchResponse.getHits().totalHits() > 0L;
} catch (Exception e) {
return false;
}
@ -916,6 +748,26 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
assertThat(response.getSetting(indexName, "index.number_of_replicas"), is(Integer.toString(numReplicas)));
}
@Override
public ClusterHealthStatus ensureYellow(String... indices) {
if (remoteIndexing == false) {
return super.ensureYellow(indices);
}
// pretty ugly but just a rip of ensureYellow that uses a different client
ClusterHealthResponse actionGet = getClient().admin().cluster().health(Requests.clusterHealthRequest(indices)
.waitForRelocatingShards(0).waitForYellowStatus().waitForEvents(Priority.LANGUID)).actionGet();
if (actionGet.isTimedOut()) {
logger.info("ensureYellow timed out, cluster state:\n{}\n{}",
getClient().admin().cluster().prepareState().get().getState().prettyPrint(),
getClient().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
assertThat("timed out waiting for yellow", actionGet.isTimedOut(), equalTo(false));
}
logger.debug("indices {} are yellow", indices.length == 0 ? "[_all]" : indices);
return actionGet.getStatus();
}
private String resolveIndexName() {
return IndexNameResolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, DateTime.now(DateTimeZone.UTC), rollover);
}