Watcher: Refresh watches index after write operation (elastic/elasticsearch#4865)

When a watch gets written by an external process, we should refresh
immediately, so that watches and their state are immediately available,
also via search which is needed for the Watcher UI.

Closes elastic/elasticsearch#4695

Original commit: elastic/x-pack-elasticsearch@8bed60c928
This commit is contained in:
Alexander Reelsen 2017-02-10 10:50:32 +01:00 committed by GitHub
parent 497147c30d
commit 232f3e76a4
8 changed files with 36 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -135,11 +136,13 @@ public class WatcherClientProxy extends ClientProxy {
public void getWatch(String id, ActionListener<GetResponse> listener) { public void getWatch(String id, ActionListener<GetResponse> listener) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, id).preference(Preference.LOCAL.type()).realtime(true); GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, id).preference(Preference.LOCAL.type()).realtime(true);
getRequest.realtime(true);
client.get(preProcess(getRequest), listener); client.get(preProcess(getRequest), listener);
} }
public void deleteWatch(String id, ActionListener<DeleteResponse> listener) { public void deleteWatch(String id, ActionListener<DeleteResponse> listener) {
DeleteRequest request = new DeleteRequest(Watch.INDEX, Watch.DOC_TYPE, id); DeleteRequest request = new DeleteRequest(Watch.INDEX, Watch.DOC_TYPE, id);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(preProcess(request), listener); client.delete(preProcess(request), listener);
} }

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
@ -93,6 +94,7 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()); UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
// this may reject this action, but prevents concurrent updates from a watch execution // this may reject this action, but prevents concurrent updates from a watch execution
updateRequest.version(response.getVersion()); updateRequest.version(response.getVersion());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
XContentBuilder builder = jsonBuilder(); XContentBuilder builder = jsonBuilder();
builder.startObject() builder.startObject()
.startObject(Watch.Field.STATUS.getPreferredName()) .startObject(Watch.Field.STATUS.getPreferredName())

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
@ -81,6 +82,7 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
DateTime now = new DateTime(clock.millis(), UTC); DateTime now = new DateTime(clock.millis(), UTC);
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()); UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
XContentBuilder builder = activateWatchBuilder(request.isActivate(), now); XContentBuilder builder = activateWatchBuilder(request.isActivate(), now);
updateRequest.doc(builder); updateRequest.doc(builder);

View File

@ -83,6 +83,7 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
BytesReference bytesReference = builder.bytes(); BytesReference bytesReference = builder.bytes();
IndexRequest indexRequest = new IndexRequest(Watch.INDEX).type(Watch.DOC_TYPE).id(request.getId()); IndexRequest indexRequest = new IndexRequest(Watch.INDEX).type(Watch.DOC_TYPE).id(request.getId());
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
indexRequest.source(bytesReference, XContentType.JSON); indexRequest.source(bytesReference, XContentType.JSON);
client.index(indexRequest, ActionListener.wrap(indexResponse -> { client.index(indexRequest, ActionListener.wrap(indexResponse -> {

View File

@ -45,6 +45,12 @@
- match: { "_status.actions.test_index.ack.state" : "awaits_successful_execution" } - match: { "_status.actions.test_index.ack.state" : "awaits_successful_execution" }
- do:
search:
index: .watches
- match: { hits.total: 1 }
- match: { hits.hits.0._source._status.actions.test_index.ack.state: "awaits_successful_execution" }
- do: - do:
xpack.watcher.delete_watch: xpack.watcher.delete_watch:
id: "my_watch" id: "my_watch"

View File

@ -49,6 +49,12 @@
- match: { _status.state.active : false } - match: { _status.state.active : false }
- do:
search:
index: .watches
- match: { hits.total: 1 }
- match: { hits.hits.0._source._status.state.active: false }
- do: - do:
xpack.watcher.get_watch: xpack.watcher.get_watch:
id: "my_watch" id: "my_watch"
@ -62,6 +68,12 @@
- match: { _status.state.active : true } - match: { _status.state.active : true }
- do:
search:
index: .watches
- match: { hits.total: 1 }
- match: { hits.hits.0._source._status.state.active: true }
- do: - do:
xpack.watcher.get_watch: xpack.watcher.get_watch:
id: "my_watch" id: "my_watch"

View File

@ -43,6 +43,11 @@
id: "my_watch" id: "my_watch"
- match: { found: true } - match: { found: true }
- do:
search:
index: .watches
- match: { hits.total: 0 }
--- ---
"Non existent watch returns 404": "Non existent watch returns 404":
- do: - do:

View File

@ -47,6 +47,11 @@ teardown:
- match: { _id: "my_watch" } - match: { _id: "my_watch" }
- match: { created: true } - match: { created: true }
- do:
search:
index: .watches
- match: { hits.total: 1 }
- do: - do:
xpack.watcher.get_watch: xpack.watcher.get_watch:
id: "my_watch" id: "my_watch"