Move the get alert api over to be a master node api and that it fetches the alert from the in memory alert store instead via core get api from an index.
Original commit: elastic/x-pack-elasticsearch@6bf471bf74
This commit is contained in:
parent
05848603d8
commit
a7e85df649
|
@ -121,6 +121,14 @@ public class AlertManager extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves an alert by name from memory.
|
||||
*/
|
||||
// TODO: add version, fields, etc support that the core get api has as well.
|
||||
public Alert getAlert(String alertName) {
|
||||
return alertsStore.getAlert(alertName);
|
||||
}
|
||||
|
||||
public State getState() {
|
||||
return state.get();
|
||||
}
|
||||
|
|
|
@ -5,11 +5,9 @@
|
|||
*/
|
||||
package org.elasticsearch.alerts.transport.actions.get;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.ValidateActions;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
|
||||
import org.elasticsearch.alerts.AlertsStore;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -21,7 +19,7 @@ import java.io.IOException;
|
|||
/**
|
||||
* The request to get the alert by name (id)
|
||||
*/
|
||||
public class GetAlertRequest extends ActionRequest<GetAlertRequest> implements IndicesRequest {
|
||||
public class GetAlertRequest extends MasterNodeOperationRequest<GetAlertRequest> {
|
||||
|
||||
private String alertName;
|
||||
private long version = Versions.MATCH_ANY;
|
||||
|
@ -49,16 +47,6 @@ public class GetAlertRequest extends ActionRequest<GetAlertRequest> implements I
|
|||
return validationException;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indices() {
|
||||
return new String[]{AlertsStore.ALERT_INDEX};
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicesOptions indicesOptions() {
|
||||
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The name of the alert to retrieve
|
||||
|
|
|
@ -16,7 +16,7 @@ import java.io.IOException;
|
|||
* The GetAlertResponse the response class wraps a GetResponse containing the alert source
|
||||
*/
|
||||
public class GetAlertResponse extends ActionResponse {
|
||||
private boolean found = false;
|
||||
|
||||
private GetResponse getResponse;
|
||||
|
||||
public GetAlertResponse() {
|
||||
|
|
|
@ -5,39 +5,78 @@
|
|||
*/
|
||||
package org.elasticsearch.alerts.transport.actions.get;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
|
||||
import org.elasticsearch.alerts.Alert;
|
||||
import org.elasticsearch.alerts.AlertManager;
|
||||
import org.elasticsearch.alerts.AlertsStore;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.get.GetResult;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Performs the get operation.
|
||||
*/
|
||||
public class TransportGetAlertAction extends TransportAction<GetAlertRequest, GetAlertResponse> {
|
||||
public class TransportGetAlertAction extends TransportMasterNodeOperationAction<GetAlertRequest, GetAlertResponse> {
|
||||
|
||||
private final Client client;
|
||||
private final AlertManager alertManager;
|
||||
|
||||
@Inject
|
||||
public TransportGetAlertAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, Client client) {
|
||||
super(settings, GetAlertAction.NAME, threadPool, actionFilters);
|
||||
this.client = client;
|
||||
public TransportGetAlertAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters, AlertManager alertManager) {
|
||||
super(settings, GetAlertAction.NAME, transportService, clusterService, threadPool, actionFilters);
|
||||
this.alertManager = alertManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(GetAlertRequest request, ActionListener<GetAlertResponse> listener) {
|
||||
try {
|
||||
GetResponse getResponse = client.prepareGet(AlertsStore.ALERT_INDEX, AlertsStore.ALERT_TYPE, request.alertName())
|
||||
.setVersion(request.version())
|
||||
.setVersionType(request.versionType()).execute().actionGet();
|
||||
GetAlertResponse response = new GetAlertResponse(getResponse);
|
||||
listener.onResponse(response);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME; // Super lightweight operation, so don't fork
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GetAlertRequest newRequest() {
|
||||
return new GetAlertRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GetAlertResponse newResponse() {
|
||||
return new GetAlertResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(GetAlertRequest request, ClusterState state, ActionListener<GetAlertResponse> listener) throws ElasticsearchException {
|
||||
Alert alert = alertManager.getAlert(request.alertName());
|
||||
GetResult getResult;
|
||||
if (alert != null) {
|
||||
BytesReference alertSource = null;
|
||||
try (XContentBuilder builder = XContentBuilder.builder(alert.getContentType().xContent())) {
|
||||
builder.value(alert);
|
||||
alertSource = builder.bytes();
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
getResult = new GetResult(AlertsStore.ALERT_INDEX, AlertsStore.ALERT_TYPE, alert.getAlertName(), alert.getVersion(), true, alertSource, null);
|
||||
} else {
|
||||
getResult = new GetResult(AlertsStore.ALERT_INDEX, AlertsStore.ALERT_TYPE, request.alertName(), -1, false, null, null);
|
||||
}
|
||||
listener.onResponse(new GetAlertResponse(new GetResponse(getResult)));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(GetAlertRequest request, ClusterState state) {
|
||||
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, AlertsStore.ALERT_INDEX);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.action.search.SearchType;
|
|||
import org.elasticsearch.alerts.client.AlertsClient;
|
||||
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
|
||||
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
|
||||
import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
|
||||
import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.index.query.FilterBuilders;
|
||||
|
@ -30,6 +31,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -47,6 +49,11 @@ public class BasicAlertingTest extends AbstractAlertingTests {
|
|||
.setAlertSource(alertSource)
|
||||
.get();
|
||||
assertAlertTriggered("my-first-alert", 1);
|
||||
|
||||
GetAlertResponse getAlertResponse = alertClient().prepareGetAlert().setAlertName("my-first-alert").get();
|
||||
assertThat(getAlertResponse.getResponse().isExists(), is(true));
|
||||
assertThat(getAlertResponse.getResponse().isSourceEmpty(), is(false));
|
||||
assertThat(getAlertResponse.getResponse().getSource().get("last_action_executed"), notNullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue