Stop the TriggeredWatchStore and HistoryStore if there are no current executions.

There may be current executions still going on during stopping.
We should try to wait for the current executions to have completed.
Otherwise we can run into a situation where we didn't delete the watch from the .triggered_watches index,
but did insert into the history index. Upon start this can lead to DocumentAlreadyExistsException,
because we already stored the history record during shutdown...

Introduced `CurrentExecutions` to handle this synchronization.

(we always first store the watch record and then remove the triggered watch)

Original commit: elastic/x-pack-elasticsearch@89c4a8d8ad
This commit is contained in:
Martijn van Groningen 2015-06-01 10:41:19 +02:00 committed by uboness
parent e684194c23
commit 478d5ecc48
2 changed files with 87 additions and 6 deletions

View File

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.execution;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class CurrentExecutions implements Iterable<ExecutionService.WatchExecution> {
private final ConcurrentMap<String, ExecutionService.WatchExecution> currentExecutions = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private final ReentrantLock lock = new ReentrantLock();
private final Condition empty = lock.newCondition();
private boolean seal = false;
public void put(String id, ExecutionService.WatchExecution execution) {
lock.lock();
try {
if (seal) {
// We shouldn't get here, because, ExecutionService#started should have been set to false
throw new ElasticsearchIllegalStateException("put execution forbidden, because we're sealed");
}
currentExecutions.put(id, execution);
} finally {
lock.unlock();
}
}
public void remove(String id) {
lock.lock();
try {
currentExecutions.remove(id);
if (currentExecutions.isEmpty()) {
empty.signal();
}
} finally {
lock.unlock();
}
}
public void sealAndAwaitEmpty(TimeValue maxStopTimeout) {
// We may have current executions still going on.
// We should try to wait for the current executions to have completed.
// Otherwise we can run into a situation where we didn't delete the watch from the .triggered_watches index,
// but did insert into the history index. Upon start this can lead to DocumentAlreadyExistsException,
// because we already stored the history record during shutdown...
// (we always first store the watch record and then remove the triggered watch)
lock.lock();
try {
seal = true;
while (currentExecutions.size() > 0) {
empty.await(maxStopTimeout.millis(), TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
@Override
public Iterator<ExecutionService.WatchExecution> iterator() {
return currentExecutions.values().iterator();
}
}

View File

@ -14,13 +14,13 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.actions.ActionWrapper;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.history.*;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.validation.WatcherSettingsValidation;
@ -31,7 +31,7 @@ import org.elasticsearch.watcher.watch.WatchStore;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
@ -40,6 +40,9 @@ import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
*/
public class ExecutionService extends AbstractComponent {
private static final TimeValue DEFAULT_MAX_STOP_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
private static final String DEFAULT_MAX_STOP_TIMEOUT_SETTING = "watcher.stop.timeout";
private final HistoryStore historyStore;
private final TriggeredWatchStore triggeredWatchStore;
private final WatchExecutor executor;
@ -47,9 +50,9 @@ public class ExecutionService extends AbstractComponent {
private final WatchLockService watchLockService;
private final Clock clock;
private final TimeValue defaultThrottlePeriod;
private final TimeValue maxStopTimeout;
private final ConcurrentMap<String, WatchExecution> currentExecutions = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private volatile CurrentExecutions currentExecutions = null;
private final AtomicBoolean started = new AtomicBoolean(false);
@Inject
@ -63,6 +66,7 @@ public class ExecutionService extends AbstractComponent {
this.watchLockService = watchLockService;
this.clock = clock;
this.defaultThrottlePeriod = componentSettings.getAsTime("default_throttle_period", TimeValue.timeValueSeconds(5));
maxStopTimeout = settings.getAsTime(DEFAULT_MAX_STOP_TIMEOUT_SETTING, DEFAULT_MAX_STOP_TIMEOUT);
if (ExecutionService.this.defaultThrottlePeriod.millis() < 0) {
settingsValidation.addError("watcher.execution.default_throttle_period", "time value cannot be negative");
}
@ -78,6 +82,7 @@ public class ExecutionService extends AbstractComponent {
logger.debug("starting execution service");
historyStore.start();
triggeredWatchStore.start();
currentExecutions = new CurrentExecutions();
Collection<TriggeredWatch> records = triggeredWatchStore.loadTriggeredWatches(state);
executeRecords(records);
logger.debug("started execution service");
@ -95,6 +100,8 @@ public class ExecutionService extends AbstractComponent {
// this is a forceful shutdown that also interrupts the worker threads in the threadpool
List<Runnable> cancelledTasks = new ArrayList<>();
executor.queue().drainTo(cancelledTasks);
currentExecutions.sealAndAwaitEmpty(maxStopTimeout);
triggeredWatchStore.stop();
historyStore.stop();
logger.debug("cancelled [{}] queued tasks", cancelledTasks.size());
@ -120,7 +127,7 @@ public class ExecutionService extends AbstractComponent {
public List<WatchExecutionSnapshot> currentExecutions() {
List<WatchExecutionSnapshot> currentExecutions = new ArrayList<>();
for (WatchExecution watchExecution : this.currentExecutions.values()) {
for (WatchExecution watchExecution : this.currentExecutions) {
currentExecutions.add(watchExecution.createSnapshot());
}
// Lets show the longest running watch first: