diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 438b491f1e4..79f65a65ef8 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; import org.elasticsearch.alerts.actions.AlertAction; import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.triggers.TriggerManager; @@ -60,12 +61,17 @@ public class AlertsStore extends AbstractComponent { private final AlertActionRegistry alertActionRegistry; private final ConcurrentMap alertMap; + private final int scrollSize; + private final TimeValue scrollTimeout; + @Inject public AlertsStore(Settings settings, Client client, AlertActionRegistry alertActionRegistry) { super(settings); this.client = client; this.alertActionRegistry = alertActionRegistry; - alertMap = ConcurrentCollections.newConcurrentMap(); + this.alertMap = ConcurrentCollections.newConcurrentMap(); + this.scrollSize = componentSettings.getAsInt("scroll.size", 100); + this.scrollTimeout = componentSettings.getAsTime("scroll.timeout", TimeValue.timeValueSeconds(30)); } /** @@ -172,22 +178,25 @@ public class AlertsStore extends AbstractComponent { createAlertsIndex(); } - SearchResponse searchResponse = client.prepareSearch().setSource( - "{ \"query\" : " + - "{ \"match_all\" : {}}," + - "\"size\" : \"100\"" + - "}" - ).setTypes(AlertManager.ALERT_TYPE).setIndices(AlertManager.ALERT_INDEX).execute().actionGet(); - for (SearchHit sh : searchResponse.getHits()) { - String alertId = sh.getId(); - try { - Alert alert = parseAlert(alertId, sh); - alertMap.put(alertId, alert); - } catch (ElasticsearchException e) { - logger.error("Unable to parse [{}] as an alert this alert will be skipped.",e,sh); + SearchResponse response = client.prepareSearch() + .setSearchType(SearchType.SCAN) + .setScroll(scrollTimeout) + .setSize(scrollSize) + .setTypes(AlertManager.ALERT_TYPE) + .setIndices(AlertManager.ALERT_INDEX).get(); + try { + for (; response.getHits().hits().length != 0; response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get()) { + for (SearchHit sh : response.getHits()) { + String alertId = sh.getId(); + Alert alert = parseAlert(alertId, sh); + alertMap.put(alertId, alert); + } + response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); } + } finally { + client.prepareClearScroll().addScrollId(response.getScrollId()).get(); } - logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size()); + logger.info("Loaded [{}] alerts from the alert index.", alertMap.size()); } private Alert parseAlert(String alertId, SearchHit sh) { diff --git a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java index ea281f309f2..ba82aa119e7 100644 --- a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java +++ b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java @@ -37,6 +37,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { protected Settings nodeSettings(int nodeOrdinal) { return ImmutableSettings.builder() .put(super.nodeSettings(nodeOrdinal)) + .put("scroll.size", randomIntBetween(1, 100)) .put("plugin.mandatory", "alerts") .put("plugin.types", AlertsPlugin.class.getName()) .put("node.mode", "network")