From c6b55074369cae59ec5d48e057616006cfe612af Mon Sep 17 00:00:00 2001 From: Brian Murphy Date: Thu, 16 Apr 2015 14:14:56 -0400 Subject: [PATCH] Changes after review. Add tests for stopped history store. Original commit: elastic/x-pack-elasticsearch@e496891ed5720d5ebdb9d81e7d74c38e4f0b5c0b --- .../watcher/execution/ExecutionService.java | 6 +-- .../watcher/history/HistoryStore.java | 5 +-- .../watcher/history/HistoryStoreTests.java | 40 +++++++++++++++++++ 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java index 8b5a9381e37..234947c9ed5 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java @@ -98,7 +98,6 @@ public class ExecutionService extends AbstractComponent { historyStore.stop(); logger.debug("cancelled [{}] queued tasks", cancelledTasks.size()); logger.debug("stopped execution service"); - } } @@ -257,12 +256,11 @@ public class ExecutionService extends AbstractComponent { @Override public void run() { - logger.info("Running [{}] [{}]", ctx.watch().name(), ctx.id()); if (!started.get()) { - logger.warn("Rejecting execution due to service is not started"); logger.debug("can't initiate watch execution as execution service is not started, ignoring it..."); return; } + logger.trace("executing [{}] [{}]", ctx.watch().name(), ctx.id()); WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().name()); try { watchRecord.update(WatchRecord.State.CHECKING, null); @@ -288,7 +286,7 @@ public class ExecutionService extends AbstractComponent { } } finally { lock.release(); - logger.info("Finished [{}] [{}]", ctx.watch().name(), ctx.id()); + logger.trace("finished [{}] [{}]", ctx.watch().name(), ctx.id()); } } diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java index 1d79e6e8e30..6954f9c122d 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java @@ -61,7 +61,6 @@ public class HistoryStore extends AbstractComponent { private final Lock stopLock = readWriteLock.writeLock(); private final AtomicBoolean started = new AtomicBoolean(false); - @Inject public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, WatchRecord.Parser recordParser) { super(settings); @@ -87,11 +86,11 @@ public class HistoryStore extends AbstractComponent { } public void put(WatchRecord watchRecord) throws HistoryException { - putUpdateLock.lock(); if (!started.get()) { throw new HistoryException("unable to persist watch record history store is not ready"); } String index = getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); + putUpdateLock.lock(); try { IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) .source(XContentFactory.jsonBuilder().value(watchRecord)) @@ -106,10 +105,10 @@ public class HistoryStore extends AbstractComponent { } public void update(WatchRecord watchRecord) throws HistoryException { - putUpdateLock.lock(); if (!started.get()) { throw new HistoryException("unable to persist watch record history store is not ready"); } + putUpdateLock.lock(); try { BytesReference bytes = XContentFactory.jsonBuilder().value(watchRecord).bytes(); IndexRequest request = new IndexRequest(getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()), DOC_TYPE, watchRecord.id().value()) diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java index 80aa5d3b91b..131a23597b0 100644 --- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java @@ -110,6 +110,46 @@ public class HistoryStoreTests extends ElasticsearchTestCase { assertThat(watchRecord.version(), equalTo(version)); } + @Test(expected = HistoryException.class) + public void testPut_stopped() { + Watch watch = mock(Watch.class); + when(watch.name()).thenReturn("_name"); + when(watch.condition()).thenReturn(new AlwaysTrueCondition(logger)); + when(watch.input()).thenReturn(null); + when(watch.metadata()).thenReturn(null); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); + Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC)); + WatchRecord watchRecord = new WatchRecord(wid, watch, event); + + historyStore.stop(); + try { + historyStore.put(watchRecord); + } finally { + historyStore.start(); + } + fail(); + } + + @Test(expected = HistoryException.class) + public void testUpdate_stopped() throws Exception { + Watch watch = mock(Watch.class); + when(watch.name()).thenReturn("_name"); + when(watch.condition()).thenReturn(new AlwaysTrueCondition(logger)); + when(watch.input()).thenReturn(null); + when(watch.metadata()).thenReturn(null); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); + Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC)); + WatchRecord watchRecord = new WatchRecord(wid, watch, event); + + historyStore.stop(); + try { + historyStore.update(watchRecord); + } finally { + historyStore.start(); + } + fail(); + } + @Test public void testLoadWatchRecords_noPriorHistoryIndices() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name"));