Merge branch 'master' into upgrade_to_es_20

Conflicts:
	src/main/java/org/elasticsearch/watcher/execution/CurrentExecutions.java
	src/main/java/org/elasticsearch/watcher/watch/WatchLockService.java
	src/test/java/org/elasticsearch/watcher/actions/throttler/ActionThrottleTests.java

Original commit: elastic/x-pack-elasticsearch@ab51f0104f
This commit is contained in:
Simon Willnauer 2015-06-17 15:56:45 +02:00
commit de39879558
5 changed files with 38 additions and 14 deletions

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.execution;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.watcher.support.WatcherInactiveException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
@ -26,7 +27,7 @@ public class CurrentExecutions implements Iterable<ExecutionService.WatchExecuti
try {
if (seal) {
// We shouldn't get here, because, ExecutionService#started should have been set to false
throw new IllegalStateException("put execution forbidden, because we're sealed");
throw new WatcherInactiveException("could not register execution [{}]. current executions are sealed and forbid registrations of additional executions.", id);
}
currentExecutions.put(id, execution);
} finally {

View File

@ -19,6 +19,7 @@ import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.support.WatcherInactiveException;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.validation.WatcherSettingsValidation;
import org.elasticsearch.watcher.trigger.TriggerEvent;
@ -236,9 +237,7 @@ public class ExecutionService extends AbstractComponent {
logger.trace("acquired lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock));
}
try {
currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread()));
if (ctx.knownWatch() && watchStore.get(ctx.watch().id()) == null) {
// fail fast if we are trying to execute a deleted watch
String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring...";
@ -251,14 +250,12 @@ public class ExecutionService extends AbstractComponent {
watchStore.updateStatus(ctx.watch());
}
}
} catch (Exception e) {
String detailedMessage = ExceptionsHelper.detailedMessage(e);
logger.warn("failed to execute watch [{}], failure [{}]", ctx.id(), detailedMessage);
record = ctx.abortFailedExecution(detailedMessage);
} finally {
if (ctx.knownWatch() && record != null && ctx.recordExecution()) {
try {
historyStore.put(record);
@ -266,13 +263,11 @@ public class ExecutionService extends AbstractComponent {
logger.error("failed to update watch record [{}]", e, ctx.id());
}
}
try {
triggeredWatchStore.delete(ctx.id());
} catch (Exception e) {
logger.error("failed to delete triggered watch [{}]", e, ctx.id());
}
currentExecutions.remove(ctx.watch().id());
if (logger.isTraceEnabled()) {
logger.trace("releasing lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock));
@ -280,7 +275,6 @@ public class ExecutionService extends AbstractComponent {
lock.release();
logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id());
}
return record;
}
@ -374,7 +368,15 @@ public class ExecutionService extends AbstractComponent {
@Override
public void run() {
try {
execute(ctx);
} catch (WatcherInactiveException e) {
// When can end up here when acquiring the lock or adding a watch to the current executions while shutting down.
// Once we a watch is added to the current executions we shouldn't end up here.
logger.debug("could not execute watch [{}]/[{}]. watcher is not active", e, ctx.watch().id(), ctx.id());
} catch (Exception e) {
logger.error("could not execute watch [{}]/[{}]", e, ctx.watch().id(), ctx.id());
}
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.watcher.WatcherException;
public class WatcherInactiveException extends WatcherException {
public WatcherInactiveException(String msg, Object... args) {
super(msg, args);
}
public WatcherInactiveException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -11,6 +11,7 @@ import org.joda.time.PeriodType;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.WatcherInactiveException;
import org.elasticsearch.watcher.support.concurrent.FairKeyedLock;
import java.util.concurrent.TimeUnit;
@ -41,7 +42,7 @@ public class WatchLockService extends AbstractComponent {
public Lock acquire(String name) {
if (!running.get()) {
throw new IllegalStateException("not started");
throw new WatcherInactiveException("not started");
}
watchLocks.acquire(name);
@ -50,7 +51,7 @@ public class WatchLockService extends AbstractComponent {
public Lock tryAcquire(String name, TimeValue timeout) {
if (!running.get()) {
throw new IllegalStateException("not started");
throw new WatcherInactiveException("not started");
}
try {
if (!watchLocks.tryAcquire(name, timeout.millis(), TimeUnit.MILLISECONDS)) {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.watcher.actions.throttler;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ElasticsearchException;
import org.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
@ -258,7 +259,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
public void testWatchThrottlePeriod() throws Exception {
WatchSourceBuilder watchSourceBuilder = watchBuilder()
.trigger(schedule(interval("60m")))
.defaultThrottlePeriod(new TimeValue(1, TimeUnit.SECONDS));
.defaultThrottlePeriod(new TimeValue(20, TimeUnit.SECONDS));
AvailableAction availableAction = randomFrom(AvailableAction.values());
watchSourceBuilder.addAction("default_global_throttle", availableAction.action());
@ -294,7 +295,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
assertThat(resultStatus.toString(), equalTo("throttled"));
if (timeWarped()) {
timeWarp().clock().fastForwardSeconds(1);
timeWarp().clock().fastForwardSeconds(20);
}
assertBusy(new Runnable() {
@Override
@ -313,7 +314,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTests {
throw new ElasticsearchException("failed to execute", ioe);
}
}
}, 1, TimeUnit.SECONDS);
}, 20, TimeUnit.SECONDS);
}
@Test @Slow