diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java index 968a37dcc57..8b5a9381e37 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java @@ -81,6 +81,7 @@ public class ExecutionService extends AbstractComponent { } if (started.compareAndSet(false, true)) { logger.debug("starting execution service"); + historyStore.start(); executeRecords(records); logger.debug("started execution service"); } @@ -94,8 +95,10 @@ public class ExecutionService extends AbstractComponent { // this is a forceful shutdown that also interrupts the worker threads in the threadpool List cancelledTasks = new ArrayList<>(); executor.queue().drainTo(cancelledTasks); + historyStore.stop(); logger.debug("cancelled [{}] queued tasks", cancelledTasks.size()); logger.debug("stopped execution service"); + } } @@ -113,8 +116,13 @@ public class ExecutionService extends AbstractComponent { public WatchRecord execute(WatchExecutionContext ctx) throws IOException { WatchRecord watchRecord = new WatchRecord(ctx.id(), ctx.watch(), ctx.triggerEvent()); - WatchExecution execution = executeInner(ctx); - watchRecord.seal(execution); + WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().name()); + try { + WatchExecution execution = executeInner(ctx); + watchRecord.seal(execution); + } finally { + lock.release(); + } if (ctx.recordInHistory()) { historyStore.put(watchRecord); } @@ -249,7 +257,9 @@ public class ExecutionService extends AbstractComponent { @Override public void run() { + logger.info("Running [{}] [{}]", ctx.watch().name(), ctx.id()); if (!started.get()) { + logger.warn("Rejecting execution due to service is not started"); logger.debug("can't initiate watch execution as execution service is not started, ignoring it..."); return; } @@ -264,20 +274,21 @@ public class ExecutionService extends AbstractComponent { } } catch (Exception e) { if (started()) { - logger.warn("failed to execute watch [{}]", e, watchRecord.name()); + logger.warn("failed to execute watch [{}] [{}]", e, watchRecord.name(), ctx.id()); try { watchRecord.update(WatchRecord.State.FAILED, e.getMessage()); if (ctx.recordInHistory()) { historyStore.update(watchRecord); } } catch (Exception e2) { - logger.error("failed to update watch record [{}] failure [{}]", e2, watchRecord, e.getMessage()); + logger.error("failed to update watch record [{}] failure [{}] for [{}] [{}]", e2, watchRecord, ctx.watch().name(), ctx.id(), e.getMessage()); } } else { logger.debug("failed to execute watch [{}] after shutdown", e, watchRecord); } } finally { lock.release(); + logger.info("Finished [{}] [{}]", ctx.watch().name(), ctx.id()); } } @@ -298,7 +309,11 @@ public class ExecutionService extends AbstractComponent { try { ExecutionService.this.executeWatch(watch, event); } catch (Exception e) { - logger.error("failed to execute watch [{}]", e, name); + if (started()) { + logger.error("failed to execute watch from SchedulerListener [{}]", e, name); + } else { + logger.error("failed to execute watch from SchedulerListener [{}] after shutdown", e, name); + } } } } diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java index 7886acc4eb7..1d79e6e8e30 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java @@ -9,11 +9,10 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.watcher.WatcherException; -import org.elasticsearch.watcher.support.TemplateUtils; -import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; @@ -28,9 +27,19 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.watcher.WatcherException; +import org.elasticsearch.watcher.support.TemplateUtils; +import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** */ @@ -47,6 +56,11 @@ public class HistoryStore extends AbstractComponent { private final int scrollSize; private final TimeValue scrollTimeout; private final WatchRecord.Parser recordParser; + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock putUpdateLock = readWriteLock.readLock(); + private final Lock stopLock = readWriteLock.writeLock(); + private final AtomicBoolean started = new AtomicBoolean(false); + @Inject public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, WatchRecord.Parser recordParser) { @@ -58,7 +72,25 @@ public class HistoryStore extends AbstractComponent { this.scrollSize = componentSettings.getAsInt("scroll.size", 100); } + public void start() { + started.set(true); + } + + public void stop() { + stopLock.lock(); //This will block while put or update actions are underway + try { + started.set(false); + } finally { + stopLock.unlock(); + } + + } + public void put(WatchRecord watchRecord) throws HistoryException { + putUpdateLock.lock(); + if (!started.get()) { + throw new HistoryException("unable to persist watch record history store is not ready"); + } String index = getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); try { IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) @@ -68,11 +100,16 @@ public class HistoryStore extends AbstractComponent { watchRecord.version(response.getVersion()); } catch (IOException e) { throw new HistoryException("failed to persist watch record [" + watchRecord + "]", e); + } finally { + putUpdateLock.unlock(); } } public void update(WatchRecord watchRecord) throws HistoryException { - logger.debug("updating watch record [{}]...", watchRecord); + putUpdateLock.lock(); + if (!started.get()) { + throw new HistoryException("unable to persist watch record history store is not ready"); + } try { BytesReference bytes = XContentFactory.jsonBuilder().value(watchRecord).bytes(); IndexRequest request = new IndexRequest(getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()), DOC_TYPE, watchRecord.id().value()) @@ -83,6 +120,8 @@ public class HistoryStore extends AbstractComponent { logger.debug("successfully updated watch record [{}]", watchRecord); } catch (IOException e) { throw new HistoryException("failed to update watch record [" + watchRecord + "]", e); + } finally { + putUpdateLock.unlock(); } } diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java index f015c5f4552..80aa5d3b91b 100644 --- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java @@ -66,6 +66,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { templateUtils = mock(TemplateUtils.class); parser = mock(WatchRecord.Parser.class); historyStore = new HistoryStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser); + historyStore.start(); } @Test diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java index fb29a92b3be..fa0fd39f9e5 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java @@ -30,11 +30,6 @@ import static org.hamcrest.Matchers.*; */ public class WatchCrudTests extends AbstractWatcherIntegrationTests { - @Override - protected boolean timeWarped() { - return true; - } - @Test @Repeat(iterations = 10) public void testPut() throws Exception { ensureWatcherStarted();