Watcher: Fix put watch action (#31524)

If no version is specified when putting a watch, the index API should be
used instead of the update API, so that the whole watch gets overwritten
instead of being merged with the existing one.

Merging only happens when a version is specified, so that credentials can be omitted, which is important for the watcher UI.
This commit is contained in:
Alexander Reelsen 2018-06-25 18:25:34 +02:00 committed by GitHub
parent adfcea2af6
commit bb1d4aaf17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 8 deletions

View File

@ -0,0 +1,73 @@
---
"Test put watch api without version overwrites watch":
- do:
cluster.health:
wait_for_status: yellow
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule": {
"hourly": {
"minute": [ 0, 5 ]
}
}
},
"input": {
"simple": {
"foo": "bar"
}
},
"actions": {
"logging": {
"logging": {
"text": "yaml test"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { watch.input.simple.foo: "bar" }
# change the simple input fields, then ensure the old
# field does not exist on get
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule": {
"hourly": {
"minute": [ 0, 5 ]
}
}
},
"input": {
"simple": {
"spam": "eggs"
}
},
"actions": {
"logging": {
"logging": {
"text": "yaml test"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { watch.input.simple.spam: "eggs" }
- is_false: watch.input.simple.foo

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.watcher.transport.actions.put;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
@ -89,18 +91,29 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
try (XContentBuilder builder = jsonBuilder()) {
watch.toXContent(builder, DEFAULT_PARAMS);
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
updateRequest.docAsUpsert(isUpdate == false);
updateRequest.version(request.getVersion());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(builder);
if (isUpdate) {
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
updateRequest.version(request.getVersion());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(builder);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
ActionListener.<UpdateResponse>wrap(response -> {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
ActionListener.<UpdateResponse>wrap(response -> {
boolean created = response.getResult() == DocWriteResponse.Result.CREATED;
listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created));
}, listener::onFailure),
client::update);
} else {
IndexRequest indexRequest = new IndexRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
indexRequest.source(builder);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, indexRequest,
ActionListener.<IndexResponse>wrap(response -> {
boolean created = response.getResult() == DocWriteResponse.Result.CREATED;
listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created));
}, listener::onFailure),
client::update);
client::index);
}
}
} catch (Exception e) {
listener.onFailure(e);