We should cope with the situation when a watch is going to be executed whilst shutting down.

The execution service will throw an exection if we acquire a watch lock or add a watch to the current executions if Watcher has stopped or is stopping. We should deal with this sitation by properly catching those failures. FYI if a watch managed to put itself to the current executions a shutdown will wait until a watch execution is finished.

Original commit: elastic/x-pack-elasticsearch@5692316072
This commit is contained in:
Martijn van Groningen 2015-06-17 09:47:30 +02:00
parent 2f14240915
commit b82fca3379
4 changed files with 34 additions and 14 deletions

View File

@ -5,9 +5,8 @@
*/ */
package org.elasticsearch.watcher.execution; package org.elasticsearch.watcher.execution;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.watcher.support.WatcherInactiveException;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -28,7 +27,7 @@ public class CurrentExecutions implements Iterable<ExecutionService.WatchExecuti
try { try {
if (seal) { if (seal) {
// We shouldn't get here, because, ExecutionService#started should have been set to false // We shouldn't get here, because, ExecutionService#started should have been set to false
throw new ElasticsearchIllegalStateException("put execution forbidden, because we're sealed"); throw new WatcherInactiveException("could not register execution [{}]. current executions are sealed and forbid registrations of additional executions.", id);
} }
currentExecutions.put(id, execution); currentExecutions.put(id, execution);
} finally { } finally {

View File

@ -21,6 +21,7 @@ import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.history.HistoryStore; import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord; import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.input.Input; import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.support.WatcherInactiveException;
import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.validation.WatcherSettingsValidation; import org.elasticsearch.watcher.support.validation.WatcherSettingsValidation;
import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.TriggerEvent;
@ -237,9 +238,7 @@ public class ExecutionService extends AbstractComponent {
logger.trace("acquired lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock)); logger.trace("acquired lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock));
} }
try { try {
currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread())); currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread()));
if (ctx.knownWatch() && watchStore.get(ctx.watch().id()) == null) { if (ctx.knownWatch() && watchStore.get(ctx.watch().id()) == null) {
// fail fast if we are trying to execute a deleted watch // fail fast if we are trying to execute a deleted watch
String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring..."; String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring...";
@ -252,14 +251,12 @@ public class ExecutionService extends AbstractComponent {
watchStore.updateStatus(ctx.watch()); watchStore.updateStatus(ctx.watch());
} }
} }
} catch (Exception e) { } catch (Exception e) {
String detailedMessage = ExceptionsHelper.detailedMessage(e); String detailedMessage = ExceptionsHelper.detailedMessage(e);
logger.warn("failed to execute watch [{}], failure [{}]", ctx.id(), detailedMessage); logger.warn("failed to execute watch [{}], failure [{}]", ctx.id(), detailedMessage);
record = ctx.abortFailedExecution(detailedMessage); record = ctx.abortFailedExecution(detailedMessage);
} finally { } finally {
if (ctx.knownWatch() && record != null && ctx.recordExecution()) { if (ctx.knownWatch() && record != null && ctx.recordExecution()) {
try { try {
historyStore.put(record); historyStore.put(record);
@ -267,13 +264,11 @@ public class ExecutionService extends AbstractComponent {
logger.error("failed to update watch record [{}]", e, ctx.id()); logger.error("failed to update watch record [{}]", e, ctx.id());
} }
} }
try { try {
triggeredWatchStore.delete(ctx.id()); triggeredWatchStore.delete(ctx.id());
} catch (Exception e) { } catch (Exception e) {
logger.error("failed to delete triggered watch [{}]", e, ctx.id()); logger.error("failed to delete triggered watch [{}]", e, ctx.id());
} }
currentExecutions.remove(ctx.watch().id()); currentExecutions.remove(ctx.watch().id());
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("releasing lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock)); logger.trace("releasing lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock));
@ -281,7 +276,6 @@ public class ExecutionService extends AbstractComponent {
lock.release(); lock.release();
logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id());
} }
return record; return record;
} }
@ -375,7 +369,15 @@ public class ExecutionService extends AbstractComponent {
@Override @Override
public void run() { public void run() {
execute(ctx); try {
execute(ctx);
} catch (WatcherInactiveException e) {
// When can end up here when acquiring the lock or adding a watch to the current executions while shutting down.
// Once we a watch is added to the current executions we shouldn't end up here.
logger.debug("could not execute watch [{}]/[{}]. watcher is not active", e, ctx.watch().id(), ctx.id());
} catch (Exception e) {
logger.error("could not execute watch [{}]/[{}]", e, ctx.watch().id(), ctx.id());
}
} }
} }

View File

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.watcher.WatcherException;
public class WatcherInactiveException extends WatcherException {
public WatcherInactiveException(String msg, Object... args) {
super(msg, args);
}
public WatcherInactiveException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.watcher.watch; package org.elasticsearch.watcher.watch;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.PeriodType; import org.elasticsearch.common.joda.time.PeriodType;
@ -13,6 +12,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.WatcherInactiveException;
import org.elasticsearch.watcher.support.concurrent.FairKeyedLock; import org.elasticsearch.watcher.support.concurrent.FairKeyedLock;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -43,7 +43,7 @@ public class WatchLockService extends AbstractComponent {
public Lock acquire(String name) { public Lock acquire(String name) {
if (!running.get()) { if (!running.get()) {
throw new ElasticsearchIllegalStateException("not started"); throw new WatcherInactiveException("not started");
} }
watchLocks.acquire(name); watchLocks.acquire(name);
@ -52,7 +52,7 @@ public class WatchLockService extends AbstractComponent {
public Lock tryAcquire(String name, TimeValue timeout) { public Lock tryAcquire(String name, TimeValue timeout) {
if (!running.get()) { if (!running.get()) {
throw new ElasticsearchIllegalStateException("not started"); throw new WatcherInactiveException("not started");
} }
try { try {
if (!watchLocks.tryAcquire(name, timeout.millis(), TimeUnit.MILLISECONDS)) { if (!watchLocks.tryAcquire(name, timeout.millis(), TimeUnit.MILLISECONDS)) {