Restored some code and fix the assert trigger methods
Original commit: elastic/x-pack-elasticsearch@7ca18b77b9
This commit is contained in:
parent
871274adbd
commit
168bed9d32
|
@ -33,7 +33,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
|||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -103,28 +102,21 @@ public class AlertActionManager extends AbstractComponent {
|
|||
if (started.get()) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*");
|
||||
if (indices.length == 0) {
|
||||
logger.info("No previous .alerthistory index, skip loading of alert actions");
|
||||
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
|
||||
doStart();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
for (String index : indices) {
|
||||
IndexMetaData indexMetaData = state.getMetaData().index(index);
|
||||
if (indexMetaData != null) {
|
||||
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
|
||||
logger.info("Not all primary shards of the [{}] index are started", index);
|
||||
return false;
|
||||
}
|
||||
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*");
|
||||
if (indices.length == 0) {
|
||||
logger.info("No previous .alerthistory index, skip loading of alert actions");
|
||||
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
|
||||
doStart();
|
||||
return true;
|
||||
}
|
||||
for (String index : indices) {
|
||||
IndexMetaData indexMetaData = state.getMetaData().index(index);
|
||||
if (indexMetaData != null) {
|
||||
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
|
||||
logger.info("Not all primary shards of the [{}] index are started", index);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} catch (Exception e){
|
||||
logger.error("Unable to check index availability", e);
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -263,30 +255,15 @@ public class AlertActionManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
|
||||
addAlertAction(alert, scheduledFireTime, fireTime, true);
|
||||
}
|
||||
|
||||
public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime, boolean retry) throws IOException {
|
||||
ensureStarted();
|
||||
logger.debug("Adding alert action for alert [{}]", alert.alertName());
|
||||
String alertHistoryIndex = getAlertHistoryIndexNameForTime(scheduledFireTime);
|
||||
AlertActionEntry entry = new AlertActionEntry(alert, scheduledFireTime, fireTime, AlertActionState.SEARCH_NEEDED);
|
||||
try {
|
||||
IndexResponse response = client.prepareIndex(alertHistoryIndex, ALERT_HISTORY_TYPE, entry.getId())
|
||||
.setSource(XContentFactory.jsonBuilder().value(entry))
|
||||
.setOpType(IndexRequest.OpType.CREATE)
|
||||
.get();
|
||||
entry.setVersion(response.getVersion());
|
||||
} catch (IndexMissingException ime) {
|
||||
///@TODO This really shouldn't be happening
|
||||
if (retry) {
|
||||
logger.error("Unable to dynamically implicitly create alert history index [" + alertHistoryIndex + "] creating explicitly");
|
||||
client.admin().indices().prepareCreate(alertHistoryIndex).get();
|
||||
addAlertAction(alert, scheduledFireTime, fireTime, false);
|
||||
} else {
|
||||
throw new ElasticsearchException("Unable to create alert history index [" + alertHistoryIndex + "]", ime);
|
||||
}
|
||||
}
|
||||
IndexResponse response = client.prepareIndex(alertHistoryIndex, ALERT_HISTORY_TYPE, entry.getId())
|
||||
.setSource(XContentFactory.jsonBuilder().value(entry))
|
||||
.setOpType(IndexRequest.OpType.CREATE)
|
||||
.get();
|
||||
entry.setVersion(response.getVersion());
|
||||
|
||||
long currentSize = actionsToBeProcessed.size() + 1;
|
||||
actionsToBeProcessed.add(entry);
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
*/
|
||||
package org.elasticsearch.alerts;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
|
@ -17,6 +16,7 @@ import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
|
@ -37,8 +37,7 @@ import java.util.*;
|
|||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.IsNot.not;
|
||||
|
||||
|
@ -132,8 +131,15 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
|
|||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX+"*").get();
|
||||
assertThat(indicesExistsResponse.isExists(), is(true));
|
||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
String[] alertHistoryIndices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*");
|
||||
assertThat(alertHistoryIndices, not(emptyArray()));
|
||||
for (String index : alertHistoryIndices) {
|
||||
IndexRoutingTable routingTable = state.getRoutingTable().index(index);
|
||||
assertThat(routingTable, notNullValue());
|
||||
assertThat(routingTable.allPrimaryShardsActive(), is(true));
|
||||
}
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
|
||||
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString())))
|
||||
|
@ -158,8 +164,15 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
|
|||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX+"*").get();
|
||||
assertThat(indicesExistsResponse.isExists(), is(true));
|
||||
// The alerthistory index gets created in the background when the first alert fires, so we to check first is this index is created and shards are started
|
||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
String[] alertHistoryIndices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*");
|
||||
assertThat(alertHistoryIndices, not(emptyArray()));
|
||||
for (String index : alertHistoryIndices) {
|
||||
IndexRoutingTable routingTable = state.getRoutingTable().index(index);
|
||||
assertThat(routingTable, notNullValue());
|
||||
assertThat(routingTable.allPrimaryShardsActive(), is(true));
|
||||
}
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
|
||||
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||
|
|
Loading…
Reference in New Issue