diff --git a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java b/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java index 4fa161df6c7..41ca0a6d118 100644 --- a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java +++ b/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java @@ -59,11 +59,6 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust public void beforeStop() { stop(); } - - @Override - public void beforeClose() { - close(); - } }); } @@ -152,15 +147,4 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust } } } - - public void close() { - // There is no .close() method for the roles module - if (indexAuditTrail != null) { - try { - indexAuditTrail.close(); - } catch (Exception e) { - logger.error("failed to close audit trail module", e); - } - } - } } diff --git a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrail.java b/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrail.java index 9ab4e9fac4b..aacfc8c1ee7 100644 --- a/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrail.java +++ b/elasticsearch/x-pack/security/src/main/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrail.java @@ -80,6 +80,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -282,33 +283,25 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl } } - public void stop() { + public synchronized void stop() { if (state.compareAndSet(State.STARTED, State.STOPPING)) { + queueConsumer.interrupt(); + } + + if (state() != State.STOPPED) { try { - queueConsumer.interrupt(); if (bulkProcessor != null) { - bulkProcessor.flush(); + if (bulkProcessor.awaitClose(10, TimeUnit.SECONDS) == false) { + logger.warn("index audit trail failed to store all pending events after waiting for 10s"); + } } + } catch (InterruptedException exc) { + Thread.currentThread().interrupt(); } finally { - state.set(State.STOPPED); - } - } - } - - public void close() { - if (state.get() != State.STOPPED) { - stop(); - } - - try { - if (bulkProcessor != null) { - bulkProcessor.close(); - } - } finally { - if (indexToRemoteCluster) { - if (client != null) { + if (indexToRemoteCluster) { client.close(); } + state.set(State.STOPPED); } } } diff --git a/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailMutedTests.java b/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailMutedTests.java index 208ca1ad498..a53185305ab 100644 --- a/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailMutedTests.java +++ b/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailMutedTests.java @@ -80,7 +80,7 @@ public class IndexAuditTrailMutedTests extends ESTestCase { @After public void stop() { if (auditTrail != null) { - auditTrail.close(); + auditTrail.stop(); } if (transportClient != null) { transportClient.close(); diff --git a/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java b/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java index 346245c4562..16da2780268 100644 --- a/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java +++ b/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java @@ -184,7 +184,7 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase { threadPool.shutdown(); } if (auditor != null) { - auditor.close(); + auditor.stop(); } if (remoteCluster != null) { diff --git a/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailUpdateMappingTests.java b/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailUpdateMappingTests.java index 77d62a66e28..04f8c07db39 100644 --- a/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailUpdateMappingTests.java +++ b/elasticsearch/x-pack/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailUpdateMappingTests.java @@ -16,8 +16,6 @@ import org.elasticsearch.test.SecurityIntegTestCase; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.junit.After; -import org.junit.Before; import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.DAILY; import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.HOURLY; @@ -35,8 +33,9 @@ public class IndexAuditTrailUpdateMappingTests extends SecurityIntegTestCase { private ThreadPool threadPool; private IndexAuditTrail auditor; - @Before - public void setup() { + @Override + public void setUp() throws Exception { + super.setUp(); threadPool = new TestThreadPool("index audit trail update mapping tests"); } @@ -63,17 +62,13 @@ public class IndexAuditTrailUpdateMappingTests extends SecurityIntegTestCase { // default mapping GetMappingsResponse response = client().admin().indices().prepareGetMappings(indexName).get(); - try { - // start the audit trail which should update the mappings since it is the master - auditor.start(true); + // start the audit trail which should update the mappings since it is the master + auditor.start(true); - // get the updated mappings - GetMappingsResponse updated = client().admin().indices().prepareGetMappings(indexName).get(); - assertThat(response.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), nullValue()); - assertThat(updated.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), notNullValue()); - } finally { - auditor.close(); - } + // get the updated mappings + GetMappingsResponse updated = client().admin().indices().prepareGetMappings(indexName).get(); + assertThat(response.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), nullValue()); + assertThat(updated.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), notNullValue()); } @Override @@ -81,13 +76,14 @@ public class IndexAuditTrailUpdateMappingTests extends SecurityIntegTestCase { // no-op here because of the shard counter check } - @After - public void shutdown() { + @Override + public void tearDown() throws Exception { + super.tearDown(); if (auditor != null) { auditor.stop(); } if (threadPool != null) { - threadPool.shutdownNow(); + terminate(threadPool); } } }