From 478d5ecc486ef249b6e410c22f6e95bfe72d9927 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 1 Jun 2015 10:41:19 +0200 Subject: [PATCH] Stop the TriggeredWatchStore and HistoryStore if there are no current executions. There may be current executions still going on during stopping. We should try to wait for the current executions to have completed. Otherwise we can run into a situation where we didn't delete the watch from the .triggered_watches index, but did insert into the history index. Upon start this can lead to DocumentAlreadyExistsException, because we already stored the history record during shutdown... Introduced `CurrentExecutions` to handle this synchronization. (we always first store the watch record and then remove the triggered watch) Original commit: elastic/x-pack-elasticsearch@89c4a8d8ad90f81dafe9eb02cc957e205f51f44d --- .../watcher/execution/CurrentExecutions.java | 74 +++++++++++++++++++ .../watcher/execution/ExecutionService.java | 19 +++-- 2 files changed, 87 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/elasticsearch/watcher/execution/CurrentExecutions.java diff --git a/src/main/java/org/elasticsearch/watcher/execution/CurrentExecutions.java b/src/main/java/org/elasticsearch/watcher/execution/CurrentExecutions.java new file mode 100644 index 00000000000..9376970a580 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/execution/CurrentExecutions.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.execution; + +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; + +import java.util.Iterator; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class CurrentExecutions implements Iterable { + + private final ConcurrentMap currentExecutions = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + private final ReentrantLock lock = new ReentrantLock(); + private final Condition empty = lock.newCondition(); + private boolean seal = false; + + public void put(String id, ExecutionService.WatchExecution execution) { + lock.lock(); + try { + if (seal) { + // We shouldn't get here, because, ExecutionService#started should have been set to false + throw new ElasticsearchIllegalStateException("put execution forbidden, because we're sealed"); + } + currentExecutions.put(id, execution); + } finally { + lock.unlock(); + } + } + + public void remove(String id) { + lock.lock(); + try { + currentExecutions.remove(id); + if (currentExecutions.isEmpty()) { + empty.signal(); + } + } finally { + lock.unlock(); + } + } + + public void sealAndAwaitEmpty(TimeValue maxStopTimeout) { + // We may have current executions still going on. + // We should try to wait for the current executions to have completed. + // Otherwise we can run into a situation where we didn't delete the watch from the .triggered_watches index, + // but did insert into the history index. Upon start this can lead to DocumentAlreadyExistsException, + // because we already stored the history record during shutdown... + // (we always first store the watch record and then remove the triggered watch) + lock.lock(); + try { + seal = true; + while (currentExecutions.size() > 0) { + empty.await(maxStopTimeout.millis(), TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + lock.unlock(); + } + } + + @Override + public Iterator iterator() { + return currentExecutions.values().iterator(); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java index 756ab5cc1ec..d453f7fbe2d 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java @@ -14,13 +14,13 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.actions.ActionWrapper; import org.elasticsearch.watcher.condition.Condition; -import org.elasticsearch.watcher.history.*; +import org.elasticsearch.watcher.history.HistoryStore; +import org.elasticsearch.watcher.history.WatchRecord; import org.elasticsearch.watcher.input.Input; import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.support.validation.WatcherSettingsValidation; @@ -31,7 +31,7 @@ import org.elasticsearch.watcher.watch.WatchStore; import java.io.IOException; import java.util.*; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; @@ -40,6 +40,9 @@ import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; */ public class ExecutionService extends AbstractComponent { + private static final TimeValue DEFAULT_MAX_STOP_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); + private static final String DEFAULT_MAX_STOP_TIMEOUT_SETTING = "watcher.stop.timeout"; + private final HistoryStore historyStore; private final TriggeredWatchStore triggeredWatchStore; private final WatchExecutor executor; @@ -47,9 +50,9 @@ public class ExecutionService extends AbstractComponent { private final WatchLockService watchLockService; private final Clock clock; private final TimeValue defaultThrottlePeriod; + private final TimeValue maxStopTimeout; - private final ConcurrentMap currentExecutions = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); - + private volatile CurrentExecutions currentExecutions = null; private final AtomicBoolean started = new AtomicBoolean(false); @Inject @@ -63,6 +66,7 @@ public class ExecutionService extends AbstractComponent { this.watchLockService = watchLockService; this.clock = clock; this.defaultThrottlePeriod = componentSettings.getAsTime("default_throttle_period", TimeValue.timeValueSeconds(5)); + maxStopTimeout = settings.getAsTime(DEFAULT_MAX_STOP_TIMEOUT_SETTING, DEFAULT_MAX_STOP_TIMEOUT); if (ExecutionService.this.defaultThrottlePeriod.millis() < 0) { settingsValidation.addError("watcher.execution.default_throttle_period", "time value cannot be negative"); } @@ -78,6 +82,7 @@ public class ExecutionService extends AbstractComponent { logger.debug("starting execution service"); historyStore.start(); triggeredWatchStore.start(); + currentExecutions = new CurrentExecutions(); Collection records = triggeredWatchStore.loadTriggeredWatches(state); executeRecords(records); logger.debug("started execution service"); @@ -95,6 +100,8 @@ public class ExecutionService extends AbstractComponent { // this is a forceful shutdown that also interrupts the worker threads in the threadpool List cancelledTasks = new ArrayList<>(); executor.queue().drainTo(cancelledTasks); + + currentExecutions.sealAndAwaitEmpty(maxStopTimeout); triggeredWatchStore.stop(); historyStore.stop(); logger.debug("cancelled [{}] queued tasks", cancelledTasks.size()); @@ -120,7 +127,7 @@ public class ExecutionService extends AbstractComponent { public List currentExecutions() { List currentExecutions = new ArrayList<>(); - for (WatchExecution watchExecution : this.currentExecutions.values()) { + for (WatchExecution watchExecution : this.currentExecutions) { currentExecutions.add(watchExecution.createSnapshot()); } // Lets show the longest running watch first: