Do not execute blocking calls on the cluster state update thread

This commit stops the index audit trail from executing blocking calls on the cluster
state update thread. Blocking calls were executed when indexing to a remote cluster
to get that cluster's state and also possibly put a template and mappings.

Closes elastic/elasticsearch#3989

Original commit: elastic/x-pack-elasticsearch@fc42efc639
This commit is contained in:
Jay Modi 2016-11-14 12:52:29 -05:00 committed by GitHub
commit 64288e9f37
3 changed files with 109 additions and 73 deletions

View File

@ -9,12 +9,9 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
@ -38,7 +35,6 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -80,9 +76,8 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import static org.elasticsearch.xpack.security.Security.setting;
@ -165,7 +160,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
private final BlockingQueue<Message> eventQueue;
private final QueueConsumer queueConsumer;
private final ThreadPool threadPool;
private final Lock putMappingLock = new ReentrantLock();
private final AtomicBoolean putTemplatePending = new AtomicBoolean(false);
private final ClusterService clusterService;
private final boolean indexToRemoteCluster;
private final EnumSet<AuditLevel> events;
@ -225,8 +220,10 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
*/
public synchronized boolean canStart(ClusterChangedEvent event, boolean master) {
if (indexToRemoteCluster) {
ClusterStateResponse response = client.admin().cluster().prepareState().execute().actionGet();
return canStart(response.getState(), master);
// just return true as we do not determine whether we can start or not based on the local cluster state, but must base it off
// of the remote cluster state and this method is called on the cluster state update thread, so we do not really want to
// execute remote calls on this thread
return true;
}
return canStart(event.state(), master);
}
@ -271,17 +268,49 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
if (state.compareAndSet(State.INITIALIZED, State.STARTING)) {
this.nodeHostName = clusterService.localNode().getHostName();
this.nodeHostAddress = clusterService.localNode().getHostAddress();
if (indexToRemoteCluster) {
client.admin().cluster().prepareState().execute(new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
final boolean currentMaster = clusterService.state().getNodes().isLocalNodeElectedMaster();
if (canStart(clusterStateResponse.getState(), currentMaster)) {
if (currentMaster) {
putTemplate(customAuditIndexSettings(settings), ActionListener.wrap((v) -> innerStart(),
(e) -> state.set(State.FAILED)));
} else {
innerStart();
}
} else {
if (state.compareAndSet(State.STARTING, State.INITIALIZED) == false) {
throw new IllegalStateException("state transition from starting to initialized failed, current value: " +
state.get());
}
}
}
if (master) {
putTemplate(customAuditIndexSettings(settings));
@Override
public void onFailure(Exception e) {
logger.error("failed to get remote cluster state", e);
}
});
} else if (master) {
putTemplate(customAuditIndexSettings(settings), ActionListener.wrap((v) -> innerStart(),
(e) -> state.set(State.FAILED)));
} else {
innerStart();
}
this.clusterService.add(this);
initializeBulkProcessor();
queueConsumer.start();
state.set(State.STARTED);
}
}
private void innerStart() {
if (indexToRemoteCluster == false) {
this.clusterService.add(this);
}
initializeBulkProcessor();
queueConsumer.start();
state.set(State.STARTED);
}
public synchronized void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
queueConsumer.interrupt();
@ -798,12 +827,12 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
return builder.build();
}
void putTemplate(Settings customSettings) {
void putTemplate(Settings customSettings, ActionListener<Void> listener) {
try (InputStream is = getClass().getResourceAsStream("/" + INDEX_TEMPLATE_NAME + ".json")) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, out);
final byte[] template = out.toByteArray();
PutIndexTemplateRequest request = new PutIndexTemplateRequest(INDEX_TEMPLATE_NAME).source(template);
final PutIndexTemplateRequest request = new PutIndexTemplateRequest(INDEX_TEMPLATE_NAME).source(template);
if (customSettings != null && customSettings.names().size() > 0) {
Settings updatedSettings = Settings.builder()
.put(request.settings())
@ -812,40 +841,55 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
request.settings(updatedSettings);
}
assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!";
PutIndexTemplateResponse response = client.admin().indices().putTemplate(request).actionGet();
if (!response.isAcknowledged()) {
throw new IllegalStateException("failed to put index template for audit logging");
}
// now we may need to update the mappings of the current index
DateTime dateTime;
Message message = eventQueue.peek();
if (message != null) {
dateTime = message.timestamp;
} else {
dateTime = DateTime.now(DateTimeZone.UTC);
}
String index = resolve(INDEX_NAME_PREFIX, dateTime, rollover);
IndicesExistsRequest existsRequest = new IndicesExistsRequest(index);
if (client.admin().indices().exists(existsRequest).get().isExists()) {
logger.debug("index [{}] exists so we need to update mappings", index);
PutMappingRequest putMappingRequest = new PutMappingRequest(index).type(DOC_TYPE).source(request.mappings().get(DOC_TYPE));
PutMappingResponse putMappingResponse = client.admin().indices().putMapping(putMappingRequest).get();
if (!putMappingResponse.isAcknowledged()) {
throw new IllegalStateException("failed to put mappings for audit logging index [" + index + "]");
}
} else {
logger.debug("index [{}] does not exist so we do not need to update mappings", index);
}
client.admin().indices().putTemplate(request, ActionListener.wrap((response) -> {
if (response.isAcknowledged()) {
// now we may need to update the mappings of the current index
final DateTime dateTime;
final Message message = eventQueue.peek();
if (message != null) {
dateTime = message.timestamp;
} else {
dateTime = DateTime.now(DateTimeZone.UTC);
}
final String index = resolve(INDEX_NAME_PREFIX, dateTime, rollover);
checkIfCurrentIndexExists(index, request, listener);
} else {
listener.onFailure(new IllegalStateException("failed to put index template for audit logging"));
}
}, listener::onFailure));
} catch (Exception e) {
logger.debug("unexpected exception while putting index template", e);
throw new IllegalStateException("failed to load [" + INDEX_TEMPLATE_NAME + ".json]", e);
listener.onFailure(e);
}
}
private void checkIfCurrentIndexExists(String index, PutIndexTemplateRequest indexTemplateRequest, ActionListener<Void> listener) {
client.admin().indices().prepareExists(index).execute(ActionListener.wrap((response) -> {
if (response.isExists()) {
logger.debug("index [{}] exists so we need to update mappings", index);
putAuditIndexMappings(index, indexTemplateRequest, listener);
} else {
logger.debug("index [{}] does not exist so we do not need to update mappings", index);
listener.onResponse(null);
}
}, listener::onFailure));
}
private void putAuditIndexMappings(String index, PutIndexTemplateRequest request, ActionListener<Void> listener) {
client.admin().indices().preparePutMapping(index)
.setType(DOC_TYPE)
.setSource(request.mappings()
.get(DOC_TYPE))
.execute(ActionListener.wrap((response) -> {
if (response.isAcknowledged()) {
listener.onResponse(null);
} else {
listener.onFailure(new IllegalStateException("failed to put mappings for audit logging index [" + index + "]"));
}
},
listener::onFailure));
}
BlockingQueue<Message> createQueue(int maxQueueSize) {
return new LinkedBlockingQueue<>(maxQueueSize);
}
@ -882,36 +926,25 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
// this could be handled by a template registry service but adding that is extra complexity until we actually need it
@Override
public void clusterChanged(ClusterChangedEvent clusterChangedEvent) {
State state = state();
if (state != State.STARTED || indexToRemoteCluster) {
return;
}
if (clusterChangedEvent.localNodeMaster() == false) {
return;
}
if (clusterChangedEvent.state().metaData().templates().get(INDEX_TEMPLATE_NAME) == null) {
assert indexToRemoteCluster == false;
if (state() == State.STARTED
&& clusterChangedEvent.localNodeMaster()
&& clusterChangedEvent.state().metaData().templates().get(INDEX_TEMPLATE_NAME) == null
&& putTemplatePending.compareAndSet(false, true)) {
logger.debug("security audit index template [{}] does not exist. it may have been deleted - putting the template",
INDEX_TEMPLATE_NAME);
threadPool.generic().execute(new AbstractRunnable() {
putTemplate(customAuditIndexSettings(settings), new ActionListener<Void>() {
@Override
public void onFailure(Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage(
"failed to update security audit index template [{}]", INDEX_TEMPLATE_NAME), e);
public void onResponse(Void aVoid) {
putTemplatePending.set(false);
}
@Override
protected void doRun() throws Exception {
final boolean locked = putMappingLock.tryLock();
if (locked) {
try {
putTemplate(customAuditIndexSettings(settings));
} finally {
putMappingLock.unlock();
}
} else {
logger.trace("unable to PUT security audit index template as the lock is already held");
}
public void onFailure(Exception e) {
putTemplatePending.set(false);
logger.error((Supplier<?>) () -> new ParameterizedMessage(
"failed to update security audit index template [{}]", INDEX_TEMPLATE_NAME), e);
}
});
}

View File

@ -296,8 +296,9 @@ public class IndexAuditTrailMutedTests extends ESTestCase {
Settings settings = IndexAuditTrailTests.levelSettings(null, excludes);
auditTrail = new IndexAuditTrail(settings, client, threadPool, clusterService) {
@Override
void putTemplate(Settings settings) {
void putTemplate(Settings settings, ActionListener<Void> listener) {
// make this a no-op so we don't have to stub out unnecessary client activities
listener.onResponse(null);
}
@Override

View File

@ -16,6 +16,7 @@ import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.security.audit.index.IndexAuditTrail.State;
import org.junit.After;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.DAILY;
@ -62,13 +63,14 @@ public class IndexAuditTrailUpdateMappingTests extends SecurityIntegTestCase {
// default mapping
GetMappingsResponse response = client().admin().indices().prepareGetMappings(indexName).get();
assertThat(response.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), nullValue());
// start the audit trail which should update the mappings since it is the master
auditor.start(true);
assertTrue(awaitBusy(() -> auditor.state() == State.STARTED));
// get the updated mappings
GetMappingsResponse updated = client().admin().indices().prepareGetMappings(indexName).get();
assertThat(response.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), nullValue());
assertThat(updated.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), notNullValue());
}