security: simplify index audit trail stopping

The IndexAuditTrail had both a stop and close method that needed to be called in order
to stop the service. There was a race where we called either flush or close in a non
blocking fashion and then immediately closed the underlying client. This change makes
the stop method wait for up to 10 seconds when closing the bulk processor.

Closes elastic/elasticsearch#3279

Original commit: elastic/x-pack-elasticsearch@0d776bc91a
This commit is contained in:
Jay Modi 2016-09-16 10:31:27 -04:00 committed by GitHub
parent efeb9cefce
commit a6d55f26c6
5 changed files with 28 additions and 55 deletions

View File

@ -59,11 +59,6 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
public void beforeStop() {
stop();
}
@Override
public void beforeClose() {
close();
}
});
}
@ -152,15 +147,4 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
}
}
}
public void close() {
// There is no .close() method for the roles module
if (indexAuditTrail != null) {
try {
indexAuditTrail.close();
} catch (Exception e) {
logger.error("failed to close audit trail module", e);
}
}
}
}

View File

@ -80,6 +80,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -282,33 +283,25 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
}
}
public void stop() {
public synchronized void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
queueConsumer.interrupt();
}
if (state() != State.STOPPED) {
try {
queueConsumer.interrupt();
if (bulkProcessor != null) {
bulkProcessor.flush();
if (bulkProcessor.awaitClose(10, TimeUnit.SECONDS) == false) {
logger.warn("index audit trail failed to store all pending events after waiting for 10s");
}
}
} catch (InterruptedException exc) {
Thread.currentThread().interrupt();
} finally {
state.set(State.STOPPED);
}
}
}
public void close() {
if (state.get() != State.STOPPED) {
stop();
}
try {
if (bulkProcessor != null) {
bulkProcessor.close();
}
} finally {
if (indexToRemoteCluster) {
if (client != null) {
if (indexToRemoteCluster) {
client.close();
}
state.set(State.STOPPED);
}
}
}

View File

@ -80,7 +80,7 @@ public class IndexAuditTrailMutedTests extends ESTestCase {
@After
public void stop() {
if (auditTrail != null) {
auditTrail.close();
auditTrail.stop();
}
if (transportClient != null) {
transportClient.close();

View File

@ -184,7 +184,7 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
threadPool.shutdown();
}
if (auditor != null) {
auditor.close();
auditor.stop();
}
if (remoteCluster != null) {

View File

@ -16,8 +16,6 @@ import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.DAILY;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.HOURLY;
@ -35,8 +33,9 @@ public class IndexAuditTrailUpdateMappingTests extends SecurityIntegTestCase {
private ThreadPool threadPool;
private IndexAuditTrail auditor;
@Before
public void setup() {
@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("index audit trail update mapping tests");
}
@ -63,17 +62,13 @@ public class IndexAuditTrailUpdateMappingTests extends SecurityIntegTestCase {
// default mapping
GetMappingsResponse response = client().admin().indices().prepareGetMappings(indexName).get();
try {
// start the audit trail which should update the mappings since it is the master
auditor.start(true);
// start the audit trail which should update the mappings since it is the master
auditor.start(true);
// get the updated mappings
GetMappingsResponse updated = client().admin().indices().prepareGetMappings(indexName).get();
assertThat(response.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), nullValue());
assertThat(updated.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), notNullValue());
} finally {
auditor.close();
}
// get the updated mappings
GetMappingsResponse updated = client().admin().indices().prepareGetMappings(indexName).get();
assertThat(response.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), nullValue());
assertThat(updated.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), notNullValue());
}
@Override
@ -81,13 +76,14 @@ public class IndexAuditTrailUpdateMappingTests extends SecurityIntegTestCase {
// no-op here because of the shard counter check
}
@After
public void shutdown() {
@Override
public void tearDown() throws Exception {
super.tearDown();
if (auditor != null) {
auditor.stop();
}
if (threadPool != null) {
threadPool.shutdownNow();
terminate(threadPool);
}
}
}