Moved alert loading to use scan scroll instead of a single normal search

Original commit: elastic/x-pack-elasticsearch@06e70836ec
This commit is contained in:
Martijn van Groningen 2014-10-29 10:20:36 +01:00
parent 1e7fc84f06
commit 12a6de0a57
2 changed files with 25 additions and 15 deletions

View File

@ -15,6 +15,7 @@ import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.alerts.actions.AlertAction; import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerManager;
@ -60,12 +61,17 @@ public class AlertsStore extends AbstractComponent {
private final AlertActionRegistry alertActionRegistry; private final AlertActionRegistry alertActionRegistry;
private final ConcurrentMap<String,Alert> alertMap; private final ConcurrentMap<String,Alert> alertMap;
private final int scrollSize;
private final TimeValue scrollTimeout;
@Inject @Inject
public AlertsStore(Settings settings, Client client, AlertActionRegistry alertActionRegistry) { public AlertsStore(Settings settings, Client client, AlertActionRegistry alertActionRegistry) {
super(settings); super(settings);
this.client = client; this.client = client;
this.alertActionRegistry = alertActionRegistry; this.alertActionRegistry = alertActionRegistry;
alertMap = ConcurrentCollections.newConcurrentMap(); this.alertMap = ConcurrentCollections.newConcurrentMap();
this.scrollSize = componentSettings.getAsInt("scroll.size", 100);
this.scrollTimeout = componentSettings.getAsTime("scroll.timeout", TimeValue.timeValueSeconds(30));
} }
/** /**
@ -172,22 +178,25 @@ public class AlertsStore extends AbstractComponent {
createAlertsIndex(); createAlertsIndex();
} }
SearchResponse searchResponse = client.prepareSearch().setSource( SearchResponse response = client.prepareSearch()
"{ \"query\" : " + .setSearchType(SearchType.SCAN)
"{ \"match_all\" : {}}," + .setScroll(scrollTimeout)
"\"size\" : \"100\"" + .setSize(scrollSize)
"}" .setTypes(AlertManager.ALERT_TYPE)
).setTypes(AlertManager.ALERT_TYPE).setIndices(AlertManager.ALERT_INDEX).execute().actionGet(); .setIndices(AlertManager.ALERT_INDEX).get();
for (SearchHit sh : searchResponse.getHits()) { try {
String alertId = sh.getId(); for (; response.getHits().hits().length != 0; response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get()) {
try { for (SearchHit sh : response.getHits()) {
Alert alert = parseAlert(alertId, sh); String alertId = sh.getId();
alertMap.put(alertId, alert); Alert alert = parseAlert(alertId, sh);
} catch (ElasticsearchException e) { alertMap.put(alertId, alert);
logger.error("Unable to parse [{}] as an alert this alert will be skipped.",e,sh); }
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
} }
} finally {
client.prepareClearScroll().addScrollId(response.getScrollId()).get();
} }
logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size()); logger.info("Loaded [{}] alerts from the alert index.", alertMap.size());
} }
private Alert parseAlert(String alertId, SearchHit sh) { private Alert parseAlert(String alertId, SearchHit sh) {

View File

@ -37,6 +37,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder() return ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal)) .put(super.nodeSettings(nodeOrdinal))
.put("scroll.size", randomIntBetween(1, 100))
.put("plugin.mandatory", "alerts") .put("plugin.mandatory", "alerts")
.put("plugin.types", AlertsPlugin.class.getName()) .put("plugin.types", AlertsPlugin.class.getName())
.put("node.mode", "network") .put("node.mode", "network")