From 2377d1525bf0233c3777572da6ae2ee28839d220 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 17 Mar 2015 17:27:40 -0700 Subject: [PATCH] Tests: added tests for AlertStore, AlertService and AlertLockService Changed ClientProxy to be return responses instead of ActionFutures and removed builders. This helps with mocking. Original commit: elastic/x-pack-elasticsearch@bfc36d9405190ca19cc73652027ff338a9d623c5 --- .../alerts/AlertLockService.java | 14 +- .../org/elasticsearch/alerts/AlertsStore.java | 44 +-- .../alerts/history/HistoryStore.java | 14 +- .../alerts/input/search/SearchInput.java | 2 +- .../support/init/proxy/ClientProxy.java | 22 +- .../alerts/transform/SearchTransform.java | 2 +- .../alerts/AlertLockServiceTests.java | 57 ++++ .../alerts/AlertServiceTests.java | 171 ++++++++++ .../elasticsearch/alerts/AlertStoreTests.java | 306 ++++++++++++++++++ .../alerts/history/HistoryStoreTests.java | 74 ++--- 10 files changed, 613 insertions(+), 93 deletions(-) create mode 100644 src/test/java/org/elasticsearch/alerts/AlertLockServiceTests.java create mode 100644 src/test/java/org/elasticsearch/alerts/AlertServiceTests.java create mode 100644 src/test/java/org/elasticsearch/alerts/AlertStoreTests.java diff --git a/src/main/java/org/elasticsearch/alerts/AlertLockService.java b/src/main/java/org/elasticsearch/alerts/AlertLockService.java index 74518980639..97577ebead0 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertLockService.java +++ b/src/main/java/org/elasticsearch/alerts/AlertLockService.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.alerts; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.common.util.concurrent.KeyedLock; import java.util.concurrent.atomic.AtomicBoolean; @@ -15,10 +16,13 @@ import java.util.concurrent.atomic.AtomicBoolean; public class AlertLockService { private final KeyedLock alertLock = new KeyedLock<>(); - - private AtomicBoolean running = new AtomicBoolean(); + private final AtomicBoolean running = new AtomicBoolean(false); public Lock acquire(String name) { + if (!running.get()) { + throw new ElasticsearchIllegalStateException("not started"); + } + alertLock.acquire(name); return new Lock(name, alertLock); } @@ -38,7 +42,7 @@ public class AlertLockService { // ongoing operations to complete. Resulting in once the alert service starts again that more than // expected alert action entries are processed. // - // Note: new operations will fail now because the state has been set to: stopping + // Note: new operations will fail now because the running has been set to false while (alertLock.hasLockedKeys()) { try { Thread.sleep(100); @@ -48,6 +52,10 @@ public class AlertLockService { } } + KeyedLock getAlertLock() { + return alertLock; + } + public static class Lock { private final String name; diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 64385e72beb..192e29de12a 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -13,8 +13,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.search.*; import org.elasticsearch.alerts.support.Callback; import org.elasticsearch.alerts.support.TemplateUtils; import org.elasticsearch.alerts.support.init.proxy.ClientProxy; @@ -31,10 +30,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -93,7 +92,7 @@ public class AlertsStore extends AbstractComponent { if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { logger.debug("alerts index [{}] found with all active primary shards. loading alerts...", ALERT_INDEX); try { - int count = loadAlerts(client, scrollSize, scrollTimeout, alertIndexMetaData.numberOfShards(), alertParser, alertMap); + int count = loadAlerts(alertIndexMetaData.numberOfShards()); logger.debug("loaded [{}] alerts from the alert index [{}]", count, ALERT_INDEX); } catch (Exception e) { logger.debug("failed to load alerts for alert index [{}]. scheduled to retry alert loading...", e, ALERT_INDEX); @@ -226,42 +225,43 @@ public class AlertsStore extends AbstractComponent { * scrolls all the alert documents in the alerts index, parses them, and loads them into * the given map. */ - static int loadAlerts(ClientProxy client, int scrollSize, TimeValue scrollTimeout, int numPrimaryShards, Alert.Parser parser, Map alerts) { - assert alerts.isEmpty() : "no alerts should reside, but there are [" + alerts.size() + "] alerts."; - RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_INDEX)).actionGet(); + int loadAlerts(int numPrimaryShards) { + assert alertMap.isEmpty() : "no alerts should reside, but there are [" + alertMap.size() + "] alerts."; + RefreshResponse refreshResponse = client.refresh(new RefreshRequest(ALERT_INDEX)); if (refreshResponse.getSuccessfulShards() < numPrimaryShards) { throw new AlertsException("not all required shards have been refreshed"); } int count = 0; - SearchResponse response = client.prepareSearch(ALERT_INDEX) - .setTypes(ALERT_TYPE) - .setPreference("_primary") - .setSearchType(SearchType.SCAN) - .setScroll(scrollTimeout) - .setSize(scrollSize) - .setVersion(true) - .get(); + SearchRequest searchRequest = new SearchRequest(ALERT_INDEX) + .types(ALERT_TYPE) + .preference("_primary") + .searchType(SearchType.SCAN) + .scroll(scrollTimeout) + .source(new SearchSourceBuilder() + .size(scrollSize) + .version(true)); + SearchResponse response = client.search(searchRequest); try { if (response.getTotalShards() != response.getSuccessfulShards()) { throw new ElasticsearchException("Partial response while loading alerts"); } if (response.getHits().getTotalHits() > 0) { - response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); + response = client.searchScroll(response.getScrollId(), scrollTimeout); while (response.getHits().hits().length != 0) { for (SearchHit hit : response.getHits()) { String name = hit.getId(); - Alert alert = parser.parse(name, true, hit.getSourceRef()); + Alert alert = alertParser.parse(name, true, hit.getSourceRef()); alert.status().version(hit.version()); - alerts.put(name, alert); + alertMap.put(name, alert); count++; } - response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); + response = client.searchScroll(response.getScrollId(), scrollTimeout); } } } finally { - client.prepareClearScroll().addScrollId(response.getScrollId()).get(); + client.clearScroll(response.getScrollId()); } return count; } @@ -272,7 +272,7 @@ public class AlertsStore extends AbstractComponent { } } - public final class AlertPut { + public class AlertPut { private final Alert previous; private final Alert current; @@ -297,7 +297,7 @@ public class AlertsStore extends AbstractComponent { } } - public final class AlertDelete { + public class AlertDelete { private final DeleteResponse response; diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java b/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java index 095175bf3a2..09c140e581c 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java @@ -9,9 +9,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.search.*; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.alerts.support.TemplateUtils; import org.elasticsearch.alerts.support.init.proxy.ClientProxy; @@ -109,13 +107,13 @@ public class HistoryStore extends AbstractComponent { } } - RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")).actionGet(); + RefreshResponse refreshResponse = client.refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")); if (refreshResponse.getSuccessfulShards() < numPrimaryShards) { return new LoadResult(false); } SearchRequest searchRequest = createScanSearchRequest(firedAlertState); - SearchResponse response = client.search(searchRequest).actionGet(); + SearchResponse response = client.search(searchRequest); List alerts = new ArrayList<>(); try { if (response.getTotalShards() != response.getSuccessfulShards()) { @@ -123,7 +121,7 @@ public class HistoryStore extends AbstractComponent { } if (response.getHits().getTotalHits() > 0) { - response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); + response = client.searchScroll(response.getScrollId(), scrollTimeout); while (response.getHits().hits().length != 0) { for (SearchHit sh : response.getHits()) { String historyId = sh.getId(); @@ -132,11 +130,11 @@ public class HistoryStore extends AbstractComponent { logger.debug("loaded fired alert from index [{}/{}/{}]", sh.index(), sh.type(), sh.id()); alerts.add(historyEntry); } - response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); + response = client.searchScroll(response.getScrollId(), scrollTimeout); } } } finally { - client.prepareClearScroll().addScrollId(response.getScrollId()).get(); + client.clearScroll(response.getScrollId()); } templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory"); return new LoadResult(true, alerts); diff --git a/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java b/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java index e3dd16baece..d3214d6f38d 100644 --- a/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java +++ b/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java @@ -74,7 +74,7 @@ public class SearchInput extends Input { } // actionGet deals properly with InterruptedException - SearchResponse response = client.search(request).actionGet(); + SearchResponse response = client.search(request); if (logger.isDebugEnabled()) { logger.debug("[{}] found [{}] hits", ctx.id(), ctx.alert().name(), response.getHits().getTotalHits()); for (SearchHit hit : response.getHits()) { diff --git a/src/main/java/org/elasticsearch/alerts/support/init/proxy/ClientProxy.java b/src/main/java/org/elasticsearch/alerts/support/init/proxy/ClientProxy.java index 5195f3b7793..137267ade46 100644 --- a/src/main/java/org/elasticsearch/alerts/support/init/proxy/ClientProxy.java +++ b/src/main/java/org/elasticsearch/alerts/support/init/proxy/ClientProxy.java @@ -7,6 +7,8 @@ package org.elasticsearch.alerts.support.init.proxy; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequestBuilder; @@ -18,6 +20,7 @@ import org.elasticsearch.alerts.support.init.InitializingService; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Injector; +import org.elasticsearch.common.unit.TimeValue; /** * A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client @@ -65,20 +68,23 @@ public class ClientProxy implements InitializingService.Initializable { return client.prepareGet(index, type, id); } - public ActionFuture search(SearchRequest request) { - return client.search(request); + public SearchResponse search(SearchRequest request) { + return client.search(request).actionGet(); } - public SearchRequestBuilder prepareSearch(String... indices) { - return client.prepareSearch(indices); + public SearchResponse searchScroll(String scrollId, TimeValue timeout) { + SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeout); + return client.searchScroll(request).actionGet(); } - public SearchScrollRequestBuilder prepareSearchScroll(String scrollId) { - return client.prepareSearchScroll(scrollId); + public ClearScrollResponse clearScroll(String scrollId) { + ClearScrollRequest request = new ClearScrollRequest(); + request.addScrollId(scrollId); + return client.clearScroll(request).actionGet(); } - public ClearScrollRequestBuilder prepareClearScroll() { - return client.prepareClearScroll(); + public RefreshResponse refresh(RefreshRequest request) { + return client.admin().indices().refresh(request).actionGet(); } } diff --git a/src/main/java/org/elasticsearch/alerts/transform/SearchTransform.java b/src/main/java/org/elasticsearch/alerts/transform/SearchTransform.java index 8ceb0e9ac4d..980bd71c4b3 100644 --- a/src/main/java/org/elasticsearch/alerts/transform/SearchTransform.java +++ b/src/main/java/org/elasticsearch/alerts/transform/SearchTransform.java @@ -63,7 +63,7 @@ public class SearchTransform extends Transform { @Override public Result apply(ExecutionContext ctx, Payload payload) throws IOException { SearchRequest req = createRequest(request, ctx, payload); - SearchResponse resp = client.search(req).actionGet(); + SearchResponse resp = client.search(req); return new Result(TYPE, new Payload.XContent(resp)); } diff --git a/src/test/java/org/elasticsearch/alerts/AlertLockServiceTests.java b/src/test/java/org/elasticsearch/alerts/AlertLockServiceTests.java new file mode 100644 index 00000000000..efb6feb9077 --- /dev/null +++ b/src/test/java/org/elasticsearch/alerts/AlertLockServiceTests.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + */ +public class AlertLockServiceTests extends ElasticsearchTestCase { + + @Test + public void testLocking_notStarted() { + AlertLockService lockService = new AlertLockService(); + try { + lockService.acquire("_name"); + fail("exception expected"); + } catch (Exception e) { + assertThat(e.getMessage(), equalTo("not started")); + } + } + + @Test + public void testLocking() { + AlertLockService lockService = new AlertLockService(); + lockService.start(); + AlertLockService.Lock lock = lockService.acquire("_name"); + assertThat(lockService.getAlertLock().hasLockedKeys(), is(true)); + lock.release(); + assertThat(lockService.getAlertLock().hasLockedKeys(), is(false)); + lockService.stop(); + } + + @Test + public void testLocking_alreadyHeld() { + AlertLockService lockService = new AlertLockService(); + lockService.start(); + AlertLockService.Lock lock1 = lockService.acquire("_name"); + try { + lockService.acquire("_name"); + fail("exception expected"); + } catch (ElasticsearchIllegalStateException e) { + assertThat(e.getMessage(), containsString("Lock already acquired")); + } + lock1.release(); + lockService.stop(); + } + +} diff --git a/src/test/java/org/elasticsearch/alerts/AlertServiceTests.java b/src/test/java/org/elasticsearch/alerts/AlertServiceTests.java new file mode 100644 index 00000000000..ff020690dd6 --- /dev/null +++ b/src/test/java/org/elasticsearch/alerts/AlertServiceTests.java @@ -0,0 +1,171 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.alerts.history.HistoryService; +import org.elasticsearch.alerts.scheduler.Scheduler; +import org.elasticsearch.alerts.scheduler.schedule.Schedule; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +/** + */ +public class AlertServiceTests extends ElasticsearchTestCase { + + private Scheduler scheduler; + private AlertsStore alertsStore; + private AlertsService alertsService; + private HistoryService historyService; + private AlertLockService alertLockService; + + @Before + public void init() throws Exception { + scheduler = mock(Scheduler.class); + alertsStore = mock(AlertsStore.class); + historyService = mock(HistoryService.class); + alertLockService = mock(AlertLockService.class); + alertsService = new AlertsService(ImmutableSettings.EMPTY, scheduler, alertsStore, historyService, alertLockService); + Field field = AlertsService.class.getDeclaredField("state"); + field.setAccessible(true); + AtomicReference state = (AtomicReference) field.get(alertsService); + state.set(AlertsService.State.STARTED); + } + + @Test + public void testPutAlert() { + IndexResponse indexResponse = mock(IndexResponse.class); + Alert alert = mock(Alert.class); + AlertsStore.AlertPut alertPut = mock(AlertsStore.AlertPut.class); + when(alertPut.indexResponse()).thenReturn(indexResponse); + when(alertPut.current()).thenReturn(alert); + + AlertLockService.Lock lock = mock(AlertLockService.Lock.class); + when(alertLockService.acquire(any(String.class))).thenReturn(lock); + when(alertsStore.putAlert(any(String.class), any(BytesReference.class))).thenReturn(alertPut); + IndexResponse response = alertsService.putAlert("_name", new BytesArray("{}")); + assertThat(response, sameInstance(indexResponse)); + + verify(scheduler, times(1)).add(any(Scheduler.Job.class)); + } + + @Test + public void testPutAlert_notSchedule() { + Schedule schedule = mock(Schedule.class); + + IndexResponse indexResponse = mock(IndexResponse.class); + Alert alert = mock(Alert.class); + when(alert.schedule()).thenReturn(schedule); + AlertsStore.AlertPut alertPut = mock(AlertsStore.AlertPut.class); + when(alertPut.indexResponse()).thenReturn(indexResponse); + when(alertPut.current()).thenReturn(alert); + Alert previousAlert = mock(Alert.class); + when(previousAlert.schedule()).thenReturn(schedule); + when(alertPut.previous()).thenReturn(previousAlert); + + AlertLockService.Lock lock = mock(AlertLockService.Lock.class); + when(alertLockService.acquire(any(String.class))).thenReturn(lock); + when(alertsStore.putAlert(any(String.class), any(BytesReference.class))).thenReturn(alertPut); + IndexResponse response = alertsService.putAlert("_name", new BytesArray("{}")); + assertThat(response, sameInstance(indexResponse)); + + verifyZeroInteractions(scheduler); + } + + @Test + public void testDeleteAlert() throws Exception { + AlertLockService.Lock lock = mock(AlertLockService.Lock.class); + when(alertLockService.acquire("_name")).thenReturn(lock); + + AlertsStore.AlertDelete expectedAlertDelete = mock(AlertsStore.AlertDelete.class); + DeleteResponse deleteResponse = mock(DeleteResponse.class); + when(deleteResponse.isFound()).thenReturn(true); + when(expectedAlertDelete.deleteResponse()).thenReturn(deleteResponse); + when(alertsStore.deleteAlert("_name")).thenReturn(expectedAlertDelete); + AlertsStore.AlertDelete alertDelete = alertsService.deleteAlert("_name"); + + assertThat(alertDelete, sameInstance(expectedAlertDelete)); + verify(scheduler, times(1)).remove("_name"); + } + + @Test + public void testDeleteAlert_notFound() throws Exception { + AlertLockService.Lock lock = mock(AlertLockService.Lock.class); + when(alertLockService.acquire("_name")).thenReturn(lock); + + AlertsStore.AlertDelete expectedAlertDelete = mock(AlertsStore.AlertDelete.class); + DeleteResponse deleteResponse = mock(DeleteResponse.class); + when(deleteResponse.isFound()).thenReturn(false); + when(expectedAlertDelete.deleteResponse()).thenReturn(deleteResponse); + when(alertsStore.deleteAlert("_name")).thenReturn(expectedAlertDelete); + AlertsStore.AlertDelete alertDelete = alertsService.deleteAlert("_name"); + + assertThat(alertDelete, sameInstance(expectedAlertDelete)); + verifyZeroInteractions(scheduler); + } + + @Test + public void testAckAlert() throws Exception { + AlertLockService.Lock lock = mock(AlertLockService.Lock.class); + when(alertLockService.acquire("_name")).thenReturn(lock); + Alert alert = mock(Alert.class); + when(alert.ack()).thenReturn(true); + Alert.Status status = new Alert.Status(); + when(alert.status()).thenReturn(status); + when(alertsStore.getAlert("_name")).thenReturn(alert); + + Alert.Status result = alertsService.ackAlert("_name"); + assertThat(result, not(sameInstance(status))); + + verify(alertsStore, times(1)).updateAlertStatus(alert); + } + + @Test + public void testAckAlert_notAck() throws Exception { + AlertLockService.Lock lock = mock(AlertLockService.Lock.class); + when(alertLockService.acquire("_name")).thenReturn(lock); + Alert alert = mock(Alert.class); + when(alert.ack()).thenReturn(false); + Alert.Status status = new Alert.Status(); + when(alert.status()).thenReturn(status); + when(alertsStore.getAlert("_name")).thenReturn(alert); + + Alert.Status result = alertsService.ackAlert("_name"); + assertThat(result, not(sameInstance(status))); + + verify(alertsStore, never()).updateAlertStatus(alert); + } + + @Test + public void testAckAlert_noAlert() throws Exception { + AlertLockService.Lock lock = mock(AlertLockService.Lock.class); + when(alertLockService.acquire("_name")).thenReturn(lock); + when(alertsStore.getAlert("_name")).thenReturn(null); + + try { + alertsService.ackAlert("_name"); + fail(); + } catch (AlertsException e) { + // expected + } + + verify(alertsStore, never()).updateAlertStatus(any(Alert.class)); + } + +} diff --git a/src/test/java/org/elasticsearch/alerts/AlertStoreTests.java b/src/test/java/org/elasticsearch/alerts/AlertStoreTests.java new file mode 100644 index 00000000000..e97f0a1a9ee --- /dev/null +++ b/src/test/java/org/elasticsearch/alerts/AlertStoreTests.java @@ -0,0 +1,306 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.search.*; +import org.elasticsearch.alerts.support.Callback; +import org.elasticsearch.alerts.support.TemplateUtils; +import org.elasticsearch.alerts.support.init.proxy.ClientProxy; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.text.StringText; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchHitField; +import org.elasticsearch.search.internal.InternalSearchHit; +import org.elasticsearch.search.internal.InternalSearchHits; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +/** + */ +public class AlertStoreTests extends ElasticsearchTestCase { + + private AlertsStore alertsStore; + private ClientProxy clientProxy; + private TemplateUtils templateUtils; + private Alert.Parser parser; + private ClusterService clusterService; + private ThreadPool threadPool; + private Callback callback; + + @Before + public void init() { + clientProxy = mock(ClientProxy.class); + templateUtils = mock(TemplateUtils.class); + parser = mock(Alert.Parser.class); + clusterService = mock(ClusterService.class); + threadPool = mock(ThreadPool.class); + callback = mock(Callback.class); + alertsStore = new AlertsStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser, clusterService, threadPool); + } + + @Test + public void testStartNoPreviousAlertsIndex() { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + MetaData.Builder metaDateBuilder = MetaData.builder(); + csBuilder.metaData(metaDateBuilder); + ClusterState cs = csBuilder.build(); + + alertsStore.start(cs, callback); + assertThat(alertsStore.started(), is(true)); + assertThat(alertsStore.getAlerts().size(), equalTo(0)); + verify(callback, times(1)).onSuccess(any(ClusterState.class)); + verify(callback, never()).onFailure(any(Throwable.class)); + verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "alerts"); + verifyZeroInteractions(clientProxy); + verifyZeroInteractions(clusterService); + + alertsStore.start(cs, callback); + verify(callback, times(2)).onSuccess(any(ClusterState.class)); + verify(callback, never()).onFailure(any(Throwable.class)); + verifyNoMoreInteractions(templateUtils); + verifyZeroInteractions(clientProxy); + verifyZeroInteractions(clusterService); + } + + @Test + public void testStartPrimaryShardNotReady() { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(AlertsStore.ALERT_INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(AlertsStore.ALERT_INDEX); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(AlertsStore.ALERT_INDEX, 0), false) + .addShard(new ImmutableShardRouting(AlertsStore.ALERT_INDEX, 0, "_node_id", null, true, ShardRoutingState.UNASSIGNED, 1)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDateBuilder); + csBuilder.routingTable(routingTableBuilder); + + ClusterState cs = csBuilder.build(); + + alertsStore.start(cs, callback); + verify(clusterService, timeout(1)).add(any(ClusterStateListener.class)); + verifyZeroInteractions(callback); + verifyZeroInteractions(templateUtils); + verifyZeroInteractions(clientProxy); + } + + @Test + public void testStartRefreshFailed() { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(AlertsStore.ALERT_INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(AlertsStore.ALERT_INDEX); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(AlertsStore.ALERT_INDEX, 0), false) + .addShard(new ImmutableShardRouting(AlertsStore.ALERT_INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED, 1)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDateBuilder); + csBuilder.routingTable(routingTableBuilder); + + RefreshResponse refreshResponse = mockRefreshResponse(1, 0); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); + + ClusterState cs = csBuilder.build(); + + alertsStore.start(cs, callback); + verify(clusterService, timeout(1)).add(any(ClusterStateListener.class)); + verifyZeroInteractions(callback); + verifyZeroInteractions(templateUtils); + verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); + verify(clientProxy, never()).search(any(SearchRequest.class)); + verify(clientProxy, never()).clearScroll(anyString()); + } + + @Test + public void testStartSearchFailed() { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(AlertsStore.ALERT_INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(AlertsStore.ALERT_INDEX); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(AlertsStore.ALERT_INDEX, 0), false) + .addShard(new ImmutableShardRouting(AlertsStore.ALERT_INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED, 1)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDateBuilder); + csBuilder.routingTable(routingTableBuilder); + + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); + + SearchResponse searchResponse = mockSearchResponse(1, 0, 0); + when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse); + + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); + + ClusterState cs = csBuilder.build(); + + alertsStore.start(cs, callback); + verify(clusterService, timeout(1)).add(any(ClusterStateListener.class)); + verifyZeroInteractions(callback); + verifyZeroInteractions(templateUtils); + verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); + verify(clientProxy, times(1)).search(any(SearchRequest.class)); + verify(clientProxy, times(1)).clearScroll(anyString()); + } + + @Test + public void testStartNoAlertStored() { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(AlertsStore.ALERT_INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(AlertsStore.ALERT_INDEX); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(AlertsStore.ALERT_INDEX, 0), false) + .addShard(new ImmutableShardRouting(AlertsStore.ALERT_INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED, 1)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDateBuilder); + csBuilder.routingTable(routingTableBuilder); + + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); + + SearchResponse searchResponse = mockSearchResponse(1, 1, 0); + when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse); + + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); + + ClusterState cs = csBuilder.build(); + + alertsStore.start(cs, callback); + verifyZeroInteractions(clusterService); + assertThat(alertsStore.started(), is(true)); + assertThat(alertsStore.getAlerts().size(), equalTo(0)); + verify(callback, times(1)).onSuccess(any(ClusterState.class)); + verify(callback, never()).onFailure(any(Throwable.class)); + verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "alerts"); + verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); + verify(clientProxy, times(1)).search(any(SearchRequest.class)); + verify(clientProxy, times(1)).clearScroll(anyString()); + } + + @Test + public void testStartAlertStored() { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(AlertsStore.ALERT_INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(AlertsStore.ALERT_INDEX); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(AlertsStore.ALERT_INDEX, 0), false) + .addShard(new ImmutableShardRouting(AlertsStore.ALERT_INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED, 1)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDateBuilder); + csBuilder.routingTable(routingTableBuilder); + + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); + + SearchResponse searchResponse1 = mockSearchResponse(1, 1, 2); + when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse1); + + BytesReference source = new BytesArray("{}"); + InternalSearchHit hit1 = new InternalSearchHit(0, "_id1", new StringText("type"), Collections.emptyMap()); + hit1.sourceRef(source); + InternalSearchHit hit2 = new InternalSearchHit(1, "_id2", new StringText("type"), Collections.emptyMap()); + hit2.sourceRef(source); + SearchResponse searchResponse2 = mockSearchResponse(1, 1, 2, hit1, hit2); + SearchResponse searchResponse3 = mockSearchResponse(1, 1, 2); + when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(searchResponse2, searchResponse3); + + Alert alert1 = mock(Alert.class); + Alert.Status status = new Alert.Status(); + when(alert1.status()).thenReturn(status); + Alert alert2 = mock(Alert.class); + when(alert2.status()).thenReturn(status); + when(parser.parse("_id1", true, source)).thenReturn(alert1); + when(parser.parse("_id2", true, source)).thenReturn(alert2); + + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); + + ClusterState cs = csBuilder.build(); + + alertsStore.start(cs, callback); + verifyZeroInteractions(clusterService); + assertThat(alertsStore.started(), is(true)); + assertThat(alertsStore.getAlerts().size(), equalTo(2)); + verify(callback, times(1)).onSuccess(any(ClusterState.class)); + verify(callback, never()).onFailure(any(Throwable.class)); + verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "alerts"); + verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); + verify(clientProxy, times(1)).search(any(SearchRequest.class)); + verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class)); + verify(clientProxy, times(1)).clearScroll(anyString()); + } + + private RefreshResponse mockRefreshResponse(int total, int successful) { + RefreshResponse refreshResponse = mock(RefreshResponse.class); + when(refreshResponse.getTotalShards()).thenReturn(total); + when(refreshResponse.getSuccessfulShards()).thenReturn(successful); + return refreshResponse; + } + + private SearchResponse mockSearchResponse(int total, int successful, int totalHits, InternalSearchHit... hits) { + InternalSearchHits internalSearchHits = new InternalSearchHits(hits, totalHits, 1f); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getTotalShards()).thenReturn(total); + when(searchResponse.getSuccessfulShards()).thenReturn(successful); + when(searchResponse.getHits()).thenReturn(internalSearchHits); + when(searchResponse.getHits()).thenReturn(internalSearchHits); + return searchResponse; + } + +} diff --git a/src/test/java/org/elasticsearch/alerts/history/HistoryStoreTests.java b/src/test/java/org/elasticsearch/alerts/history/HistoryStoreTests.java index 5e9c7ea8c0f..83b8ba57de8 100644 --- a/src/test/java/org/elasticsearch/alerts/history/HistoryStoreTests.java +++ b/src/test/java/org/elasticsearch/alerts/history/HistoryStoreTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.alerts.history; import com.google.common.collect.ImmutableSet; -import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.index.IndexRequest; @@ -17,8 +16,6 @@ import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.condition.simple.AlwaysTrueCondition; import org.elasticsearch.alerts.support.TemplateUtils; import org.elasticsearch.alerts.support.init.proxy.ClientProxy; -import org.elasticsearch.client.AdminClient; -import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -46,7 +43,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; -import static org.mockito.Mockito.verifyZeroInteractions; /** */ @@ -194,13 +190,14 @@ public class HistoryStoreTests extends ElasticsearchTestCase { csBuilder.routingTable(routingTableBuilder); ClusterState cs = csBuilder.build(); - mockRefresh(0); + RefreshResponse refreshResponse = mockRefreshResponse(1, 0); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); HistoryStore.LoadResult result = historyStore.loadFiredAlerts(cs, FiredAlert.State.AWAITS_EXECUTION); assertThat(result.succeeded(), is(false)); assertThat(ImmutableSet.copyOf(result).size(), equalTo(0)); verifyZeroInteractions(templateUtils); - verify(clientProxy, times(1)).admin(); + verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); } @Test @@ -225,26 +222,22 @@ public class HistoryStoreTests extends ElasticsearchTestCase { csBuilder.routingTable(routingTableBuilder); ClusterState cs = csBuilder.build(); - mockRefresh(1); + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); SearchResponse searchResponse = mock(SearchResponse.class); when(searchResponse.getSuccessfulShards()).thenReturn(0); when(searchResponse.getTotalShards()).thenReturn(1); - ActionFuture actionFuture = mock(ActionFuture.class); - when(actionFuture.actionGet()).thenReturn(searchResponse); - when(clientProxy.search(any(SearchRequest.class))).thenReturn(actionFuture); + when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse); - ClearScrollRequestBuilder clearScrollRequestBuilder = mock(ClearScrollRequestBuilder.class); - when(clearScrollRequestBuilder.addScrollId(anyString())).thenReturn(clearScrollRequestBuilder); - when(clearScrollRequestBuilder.get()).thenReturn(new ClearScrollResponse(true, 1)); - when(clientProxy.prepareClearScroll()).thenReturn(clearScrollRequestBuilder); + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1)); HistoryStore.LoadResult result = historyStore.loadFiredAlerts(cs, FiredAlert.State.AWAITS_EXECUTION); assertThat(result.succeeded(), is(false)); assertThat(ImmutableSet.copyOf(result).size(), equalTo(0)); verifyZeroInteractions(templateUtils); - verify(clientProxy, times(1)).admin(); + verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); } @Test @@ -269,27 +262,23 @@ public class HistoryStoreTests extends ElasticsearchTestCase { csBuilder.routingTable(routingTableBuilder); ClusterState cs = csBuilder.build(); - mockRefresh(1); + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); SearchResponse searchResponse = mock(SearchResponse.class); when(searchResponse.getSuccessfulShards()).thenReturn(1); when(searchResponse.getTotalShards()).thenReturn(1); when(searchResponse.getHits()).thenReturn(InternalSearchHits.empty()); - ActionFuture actionFuture = mock(ActionFuture.class); - when(actionFuture.actionGet()).thenReturn(searchResponse); - when(clientProxy.search(any(SearchRequest.class))).thenReturn(actionFuture); + when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse); - ClearScrollRequestBuilder clearScrollRequestBuilder = mock(ClearScrollRequestBuilder.class); - when(clearScrollRequestBuilder.addScrollId(anyString())).thenReturn(clearScrollRequestBuilder); - when(clearScrollRequestBuilder.get()).thenReturn(new ClearScrollResponse(true, 1)); - when(clientProxy.prepareClearScroll()).thenReturn(clearScrollRequestBuilder); + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1)); HistoryStore.LoadResult result = historyStore.loadFiredAlerts(cs, FiredAlert.State.AWAITS_EXECUTION); assertThat(result.succeeded(), is(true)); assertThat(ImmutableSet.copyOf(result).size(), equalTo(0)); verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "alerthistory"); - verify(clientProxy, times(1)).admin(); + verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); } @Test @@ -314,7 +303,8 @@ public class HistoryStoreTests extends ElasticsearchTestCase { csBuilder.routingTable(routingTableBuilder); ClusterState cs = csBuilder.build(); - mockRefresh(1); + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); SearchResponse searchResponse1 = mock(SearchResponse.class); when(searchResponse1.getSuccessfulShards()).thenReturn(1); @@ -325,38 +315,27 @@ public class HistoryStoreTests extends ElasticsearchTestCase { hit.sourceRef(new BytesArray("{}")); InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[]{hit}, 1, 1.0f); when(searchResponse1.getHits()).thenReturn(hits); - ActionFuture actionFuture = mock(ActionFuture.class); - when(actionFuture.actionGet()).thenReturn(searchResponse1); - when(clientProxy.search(any(SearchRequest.class))).thenReturn(actionFuture); + when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse1); // First return a scroll response with a single hit - SearchScrollRequestBuilder searchScrollRequestBuilder1 = mock(SearchScrollRequestBuilder.class); - when(searchScrollRequestBuilder1.setScroll(any(TimeValue.class))).thenReturn(searchScrollRequestBuilder1); - when(searchScrollRequestBuilder1.get()).thenReturn(searchResponse1); - when(clientProxy.prepareSearchScroll(anyString())).thenReturn(searchScrollRequestBuilder1); + when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(searchResponse1); // then with no hits - SearchScrollRequestBuilder searchScrollRequestBuilder2 = mock(SearchScrollRequestBuilder.class); - when(searchScrollRequestBuilder2.setScroll(any(TimeValue.class))).thenReturn(searchScrollRequestBuilder2); SearchResponse searchResponse2 = new SearchResponse(InternalSearchResponse.empty(), null, 1, 1, 1, null); - when(searchScrollRequestBuilder2.get()).thenReturn(searchResponse2); - when(clientProxy.prepareSearchScroll(anyString())).thenReturn(searchScrollRequestBuilder2); + when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(searchResponse2); FiredAlert firedAlert = mock(FiredAlert.class); when(firedAlert.state()).thenReturn(FiredAlert.State.AWAITS_EXECUTION); when(parser.parse(any(BytesReference.class), eq("_id"), eq(1l))).thenReturn(firedAlert); - ClearScrollRequestBuilder clearScrollRequestBuilder = mock(ClearScrollRequestBuilder.class); - when(clearScrollRequestBuilder.addScrollId(anyString())).thenReturn(clearScrollRequestBuilder); - when(clearScrollRequestBuilder.get()).thenReturn(new ClearScrollResponse(true, 1)); - when(clientProxy.prepareClearScroll()).thenReturn(clearScrollRequestBuilder); + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1)); HistoryStore.LoadResult result = historyStore.loadFiredAlerts(cs, FiredAlert.State.AWAITS_EXECUTION); assertThat(result.succeeded(), is(true)); assertThat(ImmutableSet.copyOf(result).size(), equalTo(0)); verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "alerthistory"); - verify(clientProxy, times(1)).admin(); + verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); } @Test @@ -367,16 +346,11 @@ public class HistoryStoreTests extends ElasticsearchTestCase { assertThat(HistoryStore.getAlertHistoryIndexNameForTime(new DateTime(2833165811000L, DateTimeZone.UTC)), equalTo(".alert_history_2059-10-12")); } - private void mockRefresh(int numSuccessfulShards) { - AdminClient adminClient = mock(AdminClient.class); - when(clientProxy.admin()).thenReturn(adminClient); - IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); - when(adminClient.indices()).thenReturn(indicesAdminClient); - ActionFuture actionFuture = mock(ActionFuture.class); - when(indicesAdminClient.refresh(any(RefreshRequest.class))).thenReturn(actionFuture); + private RefreshResponse mockRefreshResponse(int total, int successful) { RefreshResponse refreshResponse = mock(RefreshResponse.class); - when(actionFuture.actionGet()).thenReturn(refreshResponse); - when(refreshResponse.getSuccessfulShards()).thenReturn(numSuccessfulShards); + when(refreshResponse.getTotalShards()).thenReturn(total); + when(refreshResponse.getSuccessfulShards()).thenReturn(successful); + return refreshResponse; } }