Changes after review.

Add tests for stopped history store.

Original commit: elastic/x-pack-elasticsearch@e496891ed5
This commit is contained in:
Brian Murphy 2015-04-16 14:14:56 -04:00
parent 3e7b668406
commit c6b5507436
3 changed files with 44 additions and 7 deletions

View File

@ -98,7 +98,6 @@ public class ExecutionService extends AbstractComponent {
historyStore.stop();
logger.debug("cancelled [{}] queued tasks", cancelledTasks.size());
logger.debug("stopped execution service");
}
}
@ -257,12 +256,11 @@ public class ExecutionService extends AbstractComponent {
@Override
public void run() {
logger.info("Running [{}] [{}]", ctx.watch().name(), ctx.id());
if (!started.get()) {
logger.warn("Rejecting execution due to service is not started");
logger.debug("can't initiate watch execution as execution service is not started, ignoring it...");
return;
}
logger.trace("executing [{}] [{}]", ctx.watch().name(), ctx.id());
WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().name());
try {
watchRecord.update(WatchRecord.State.CHECKING, null);
@ -288,7 +286,7 @@ public class ExecutionService extends AbstractComponent {
}
} finally {
lock.release();
logger.info("Finished [{}] [{}]", ctx.watch().name(), ctx.id());
logger.trace("finished [{}] [{}]", ctx.watch().name(), ctx.id());
}
}

View File

@ -61,7 +61,6 @@ public class HistoryStore extends AbstractComponent {
private final Lock stopLock = readWriteLock.writeLock();
private final AtomicBoolean started = new AtomicBoolean(false);
@Inject
public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, WatchRecord.Parser recordParser) {
super(settings);
@ -87,11 +86,11 @@ public class HistoryStore extends AbstractComponent {
}
public void put(WatchRecord watchRecord) throws HistoryException {
putUpdateLock.lock();
if (!started.get()) {
throw new HistoryException("unable to persist watch record history store is not ready");
}
String index = getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
putUpdateLock.lock();
try {
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
.source(XContentFactory.jsonBuilder().value(watchRecord))
@ -106,10 +105,10 @@ public class HistoryStore extends AbstractComponent {
}
public void update(WatchRecord watchRecord) throws HistoryException {
putUpdateLock.lock();
if (!started.get()) {
throw new HistoryException("unable to persist watch record history store is not ready");
}
putUpdateLock.lock();
try {
BytesReference bytes = XContentFactory.jsonBuilder().value(watchRecord).bytes();
IndexRequest request = new IndexRequest(getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()), DOC_TYPE, watchRecord.id().value())

View File

@ -110,6 +110,46 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
assertThat(watchRecord.version(), equalTo(version));
}
@Test(expected = HistoryException.class)
public void testPut_stopped() {
Watch watch = mock(Watch.class);
when(watch.name()).thenReturn("_name");
when(watch.condition()).thenReturn(new AlwaysTrueCondition(logger));
when(watch.input()).thenReturn(null);
when(watch.metadata()).thenReturn(null);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
historyStore.stop();
try {
historyStore.put(watchRecord);
} finally {
historyStore.start();
}
fail();
}
@Test(expected = HistoryException.class)
public void testUpdate_stopped() throws Exception {
Watch watch = mock(Watch.class);
when(watch.name()).thenReturn("_name");
when(watch.condition()).thenReturn(new AlwaysTrueCondition(logger));
when(watch.input()).thenReturn(null);
when(watch.metadata()).thenReturn(null);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
historyStore.stop();
try {
historyStore.update(watchRecord);
} finally {
historyStore.start();
}
fail();
}
@Test
public void testLoadWatchRecords_noPriorHistoryIndices() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name"));