Do not execute blocking calls on the cluster state update thread

This commit stops the index audit trail from executing blocking calls on the cluster
state update thread. Blocking calls were executed when indexing to a remote cluster
to get that cluster's state and also possibly put a template and mappings.

Closes elastic/elasticsearch#3989

Original commit: elastic/x-pack-elasticsearch@a8c0269fad
This commit is contained in:
jaymode 2016-11-07 14:23:05 -05:00
parent bb94f3a2b2
commit 32754b12c0
3 changed files with 105 additions and 73 deletions

View File

@ -9,12 +9,9 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
@ -38,7 +35,6 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -80,9 +76,8 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import static;
@ -165,7 +160,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
private final BlockingQueue<Message> eventQueue;
private final QueueConsumer queueConsumer;
private final ThreadPool threadPool;
private final Lock putMappingLock = new ReentrantLock();
private final AtomicBoolean putTemplatePending = new AtomicBoolean(false);
private final ClusterService clusterService;
private final boolean indexToRemoteCluster;
private final EnumSet<AuditLevel> events;
@ -225,8 +220,8 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
public synchronized boolean canStart(ClusterChangedEvent event, boolean master) {
if (indexToRemoteCluster) {
ClusterStateResponse response = client.admin().cluster().prepareState().execute().actionGet();
return canStart(response.getState(), master);
// do not block here!
return true;
return canStart(event.state(), master);
@ -271,17 +266,47 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
if (state.compareAndSet(State.INITIALIZED, State.STARTING)) {
this.nodeHostName = clusterService.localNode().getHostName();
this.nodeHostAddress = clusterService.localNode().getHostAddress();
if (indexToRemoteCluster) {
client.admin().cluster().prepareState().execute(new ActionListener<ClusterStateResponse>() {
public void onResponse(ClusterStateResponse clusterStateResponse) {
if (canStart(clusterStateResponse.getState(), master)) {
if (master) {
putTemplate(customAuditIndexSettings(settings), ActionListener.wrap((v) -> innerStart(),
(e) -> state.set(State.FAILED)));
} else {
} else {
state.compareAndSet(State.STARTING, State.INITIALIZED);
if (master) {
public void onFailure(Exception e) {
logger.error("failed to get remote cluster state", e);
} else {
if (master) {
putTemplate(customAuditIndexSettings(settings), ActionListener.wrap((v) -> innerStart(),
(e) -> state.set(State.FAILED)));
} else {
private void innerStart() {
if (indexToRemoteCluster == false) {
public synchronized void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
@ -798,12 +823,12 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
void putTemplate(Settings customSettings) {
void putTemplate(Settings customSettings, ActionListener<Void> listener) {
try (InputStream is = getClass().getResourceAsStream("/" + INDEX_TEMPLATE_NAME + ".json")) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
final byte[] template = out.toByteArray();
PutIndexTemplateRequest request = new PutIndexTemplateRequest(INDEX_TEMPLATE_NAME).source(template);
final PutIndexTemplateRequest request = new PutIndexTemplateRequest(INDEX_TEMPLATE_NAME).source(template);
if (customSettings != null && customSettings.names().size() > 0) {
Settings updatedSettings = Settings.builder()
@ -812,40 +837,55 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!";
PutIndexTemplateResponse response = client.admin().indices().putTemplate(request).actionGet();
if (!response.isAcknowledged()) {
throw new IllegalStateException("failed to put index template for audit logging");
// now we may need to update the mappings of the current index
DateTime dateTime;
Message message = eventQueue.peek();
if (message != null) {
dateTime = message.timestamp;
} else {
dateTime =;
String index = resolve(INDEX_NAME_PREFIX, dateTime, rollover);
IndicesExistsRequest existsRequest = new IndicesExistsRequest(index);
if (client.admin().indices().exists(existsRequest).get().isExists()) {
logger.debug("index [{}] exists so we need to update mappings", index);
PutMappingRequest putMappingRequest = new PutMappingRequest(index).type(DOC_TYPE).source(request.mappings().get(DOC_TYPE));
PutMappingResponse putMappingResponse = client.admin().indices().putMapping(putMappingRequest).get();
if (!putMappingResponse.isAcknowledged()) {
throw new IllegalStateException("failed to put mappings for audit logging index [" + index + "]");
} else {
logger.debug("index [{}] does not exist so we do not need to update mappings", index);
client.admin().indices().putTemplate(request, ActionListener.wrap((response) -> {
if (response.isAcknowledged()) {
// now we may need to update the mappings of the current index
final DateTime dateTime;
final Message message = eventQueue.peek();
if (message != null) {
dateTime = message.timestamp;
} else {
dateTime =;
final String index = resolve(INDEX_NAME_PREFIX, dateTime, rollover);
checkIfCurrentIndexExists(index, request, listener);
} else {
listener.onFailure(new IllegalStateException("failed to put index template for audit logging"));
}, listener::onFailure));
} catch (Exception e) {
logger.debug("unexpected exception while putting index template", e);
throw new IllegalStateException("failed to load [" + INDEX_TEMPLATE_NAME + ".json]", e);
private void checkIfCurrentIndexExists(String index, PutIndexTemplateRequest indexTemplateRequest, ActionListener<Void> listener) {
client.admin().indices().prepareExists(index).execute(ActionListener.wrap((response) -> {
if (response.isExists()) {
logger.debug("index [{}] exists so we need to update mappings", index);
putAuditIndexMappings(index, indexTemplateRequest, listener);
} else {
logger.debug("index [{}] does not exist so we do not need to update mappings", index);
}, listener::onFailure));
private void putAuditIndexMappings(String index, PutIndexTemplateRequest request, ActionListener<Void> listener) {
.execute(ActionListener.wrap((response) -> {
if (response.isAcknowledged()) {
} else {
listener.onFailure(new IllegalStateException("failed to put mappings for audit logging index [" + index + "]"));
BlockingQueue<Message> createQueue(int maxQueueSize) {
return new LinkedBlockingQueue<>(maxQueueSize);
@ -882,36 +922,25 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
// this could be handled by a template registry service but adding that is extra complexity until we actually need it
public void clusterChanged(ClusterChangedEvent clusterChangedEvent) {
State state = state();
if (state != State.STARTED || indexToRemoteCluster) {
if (clusterChangedEvent.localNodeMaster() == false) {
if (clusterChangedEvent.state().metaData().templates().get(INDEX_TEMPLATE_NAME) == null) {
assert indexToRemoteCluster == false;
if (state() == State.STARTED
&& clusterChangedEvent.localNodeMaster()
&& clusterChangedEvent.state().metaData().templates().get(INDEX_TEMPLATE_NAME) == null
&& putTemplatePending.compareAndSet(false, true)) {
logger.debug("security audit index template [{}] does not exist. it may have been deleted - putting the template",
threadPool.generic().execute(new AbstractRunnable() {
putTemplate(customAuditIndexSettings(settings), new ActionListener<Void>() {
public void onFailure(Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage(
"failed to update security audit index template [{}]", INDEX_TEMPLATE_NAME), e);
public void onResponse(Void aVoid) {
protected void doRun() throws Exception {
final boolean locked = putMappingLock.tryLock();
if (locked) {
try {
} finally {
} else {
logger.trace("unable to PUT security audit index template as the lock is already held");
public void onFailure(Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage(
"failed to update security audit index template [{}]", INDEX_TEMPLATE_NAME), e);

View File

@ -296,8 +296,9 @@ public class IndexAuditTrailMutedTests extends ESTestCase {
Settings settings = IndexAuditTrailTests.levelSettings(null, excludes);
auditTrail = new IndexAuditTrail(settings, client, threadPool, clusterService) {
void putTemplate(Settings settings) {
void putTemplate(Settings settings, ActionListener<Void> listener) {
// make this a no-op so we don't have to stub out unnecessary client activities

View File

@ -16,6 +16,7 @@ import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import static;
@ -62,13 +63,14 @@ public class IndexAuditTrailUpdateMappingTests extends SecurityIntegTestCase {
// default mapping
GetMappingsResponse response = client().admin().indices().prepareGetMappings(indexName).get();
assertThat(response.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), nullValue());
// start the audit trail which should update the mappings since it is the master
assertTrue(awaitBusy(() -> auditor.state() == State.STARTED));
// get the updated mappings
GetMappingsResponse updated = client().admin().indices().prepareGetMappings(indexName).get();
assertThat(response.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), nullValue());
assertThat(updated.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), notNullValue());