Watcher: Store username on watch execution (#31873)
There is currently no way to see what user executed a watch. This commit adds the decrypted username to each execution in the watch history, in a new field "user". Closes #31772
This commit is contained in:
parent
ecd05d5be4
commit
637cac9061
|
@ -263,7 +263,8 @@ This is an example of the output:
|
|||
"type": "index"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"user": "test_admin" <4>
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
@ -281,6 +282,7 @@ This is an example of the output:
|
|||
<1> The id of the watch record as it would be stored in the `.watcher-history` index.
|
||||
<2> The watch record document as it would be stored in the `.watcher-history` index.
|
||||
<3> The watch execution results.
|
||||
<4> The user used to execute the watch.
|
||||
|
||||
You can set a different execution mode for every action by associating the mode
|
||||
name with the action id:
|
||||
|
|
|
@ -88,13 +88,17 @@ public class Authentication {
|
|||
throws IOException, IllegalArgumentException {
|
||||
assert ctx.getTransient(AuthenticationField.AUTHENTICATION_KEY) == null;
|
||||
|
||||
Authentication authentication = decode(header);
|
||||
ctx.putTransient(AuthenticationField.AUTHENTICATION_KEY, authentication);
|
||||
return authentication;
|
||||
}
|
||||
|
||||
public static Authentication decode(String header) throws IOException {
|
||||
byte[] bytes = Base64.getDecoder().decode(header);
|
||||
StreamInput input = StreamInput.wrap(bytes);
|
||||
Version version = Version.readVersion(input);
|
||||
input.setVersion(version);
|
||||
Authentication authentication = new Authentication(input);
|
||||
ctx.putTransient(AuthenticationField.AUTHENTICATION_KEY, authentication);
|
||||
return authentication;
|
||||
return new Authentication(input);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -8,6 +8,8 @@ package org.elasticsearch.xpack.core.watcher.execution;
|
|||
import org.elasticsearch.common.CheckedSupplier;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.xpack.core.security.authc.Authentication;
|
||||
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
|
||||
import org.elasticsearch.xpack.core.watcher.condition.Condition;
|
||||
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
|
||||
|
@ -18,6 +20,7 @@ import org.elasticsearch.xpack.core.watcher.watch.Payload;
|
|||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -43,6 +46,7 @@ public abstract class WatchExecutionContext {
|
|||
private Transform.Result transformResult;
|
||||
private ConcurrentMap<String, ActionWrapperResult> actionsResults = ConcurrentCollections.newConcurrentMap();
|
||||
private String nodeId;
|
||||
private String user;
|
||||
|
||||
public WatchExecutionContext(String watchId, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
|
||||
this.id = new Wid(watchId, executionTime);
|
||||
|
@ -85,6 +89,7 @@ public abstract class WatchExecutionContext {
|
|||
public final void ensureWatchExists(CheckedSupplier<Watch, Exception> supplier) throws Exception {
|
||||
if (watch == null) {
|
||||
watch = supplier.get();
|
||||
user = WatchExecutionContext.getUsernameFromWatch(watch);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -137,6 +142,11 @@ public abstract class WatchExecutionContext {
|
|||
return nodeId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The user that executes the watch, which will be stored in the watch history
|
||||
*/
|
||||
public String getUser() { return user; }
|
||||
|
||||
public void start() {
|
||||
assert phase == ExecutionPhase.AWAITS_EXECUTION;
|
||||
relativeStartTime = System.nanoTime();
|
||||
|
@ -243,4 +253,19 @@ public abstract class WatchExecutionContext {
|
|||
public WatchExecutionSnapshot createSnapshot(Thread executionThread) {
|
||||
return new WatchExecutionSnapshot(this, executionThread.getStackTrace());
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a watch, this extracts and decodes the relevant auth header and returns the principal of the user that is
|
||||
* executing the watch.
|
||||
*/
|
||||
public static String getUsernameFromWatch(Watch watch) throws IOException {
|
||||
if (watch != null && watch.status() != null && watch.status().getHeaders() != null) {
|
||||
String header = watch.status().getHeaders().get(AuthenticationField.AUTHENTICATION_KEY);
|
||||
if (header != null) {
|
||||
Authentication auth = Authentication.decode(header);
|
||||
return auth.getUser().principal();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,12 +43,14 @@ public abstract class WatchRecord implements ToXContentObject {
|
|||
private static final ParseField METADATA = new ParseField("metadata");
|
||||
private static final ParseField EXECUTION_RESULT = new ParseField("result");
|
||||
private static final ParseField EXCEPTION = new ParseField("exception");
|
||||
private static final ParseField USER = new ParseField("user");
|
||||
|
||||
protected final Wid id;
|
||||
protected final Watch watch;
|
||||
private final String nodeId;
|
||||
protected final TriggerEvent triggerEvent;
|
||||
protected final ExecutionState state;
|
||||
private final String user;
|
||||
|
||||
// only emitted to xcontent in "debug" mode
|
||||
protected final Map<String, Object> vars;
|
||||
|
@ -60,7 +62,7 @@ public abstract class WatchRecord implements ToXContentObject {
|
|||
|
||||
private WatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state, Map<String, Object> vars, ExecutableInput input,
|
||||
ExecutableCondition condition, Map<String, Object> metadata, Watch watch, WatchExecutionResult executionResult,
|
||||
String nodeId) {
|
||||
String nodeId, String user) {
|
||||
this.id = id;
|
||||
this.triggerEvent = triggerEvent;
|
||||
this.state = state;
|
||||
|
@ -71,15 +73,16 @@ public abstract class WatchRecord implements ToXContentObject {
|
|||
this.executionResult = executionResult;
|
||||
this.watch = watch;
|
||||
this.nodeId = nodeId;
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
private WatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state, String nodeId) {
|
||||
this(id, triggerEvent, state, Collections.emptyMap(), null, null, null, null, null, nodeId);
|
||||
this(id, triggerEvent, state, Collections.emptyMap(), null, null, null, null, null, nodeId, null);
|
||||
}
|
||||
|
||||
private WatchRecord(WatchRecord record, ExecutionState state) {
|
||||
this(record.id, record.triggerEvent, state, record.vars, record.input, record.condition, record.metadata, record.watch,
|
||||
record.executionResult, record.nodeId);
|
||||
record.executionResult, record.nodeId, record.user);
|
||||
}
|
||||
|
||||
private WatchRecord(WatchExecutionContext context, ExecutionState state) {
|
||||
|
@ -88,12 +91,13 @@ public abstract class WatchRecord implements ToXContentObject {
|
|||
context.watch() != null ? context.watch().condition() : null,
|
||||
context.watch() != null ? context.watch().metadata() : null,
|
||||
context.watch(),
|
||||
null, context.getNodeId());
|
||||
null, context.getNodeId(), context.getUser());
|
||||
}
|
||||
|
||||
private WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult) {
|
||||
this(context.id(), context.triggerEvent(), getState(executionResult), context.vars(), context.watch().input(),
|
||||
context.watch().condition(), context.watch().metadata(), context.watch(), executionResult, context.getNodeId());
|
||||
context.watch().condition(), context.watch().metadata(), context.watch(), executionResult, context.getNodeId(),
|
||||
context.getUser());
|
||||
}
|
||||
|
||||
public static ExecutionState getState(WatchExecutionResult executionResult) {
|
||||
|
@ -152,6 +156,9 @@ public abstract class WatchRecord implements ToXContentObject {
|
|||
builder.field(NODE.getPreferredName(), nodeId);
|
||||
builder.field(STATE.getPreferredName(), state.id());
|
||||
|
||||
if (user != null) {
|
||||
builder.field(USER.getPreferredName(), user);
|
||||
}
|
||||
if (watch != null && watch.status() != null) {
|
||||
builder.field(STATUS.getPreferredName(), watch.status(), params);
|
||||
}
|
||||
|
|
|
@ -13,8 +13,9 @@ public final class WatcherIndexTemplateRegistryField {
|
|||
// version 6: upgrade to ES 6, removal of _status field
|
||||
// version 7: add full exception stack traces for better debugging
|
||||
// version 8: fix slack attachment property not to be dynamic, causing field type issues
|
||||
// version 9: add a user field defining which user executed the watch
|
||||
// Note: if you change this, also inform the kibana team around the watcher-ui
|
||||
public static final String INDEX_TEMPLATE_VERSION = "8";
|
||||
public static final String INDEX_TEMPLATE_VERSION = "9";
|
||||
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
|
||||
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
|
||||
public static final String WATCHES_TEMPLATE_NAME = ".watches";
|
||||
|
|
|
@ -120,6 +120,9 @@
|
|||
"messages": {
|
||||
"type": "text"
|
||||
},
|
||||
"user": {
|
||||
"type": "text"
|
||||
},
|
||||
"exception" : {
|
||||
"type" : "object",
|
||||
"enabled" : false
|
||||
|
|
|
@ -31,6 +31,9 @@ import org.elasticsearch.index.IndexNotFoundException;
|
|||
import org.elasticsearch.index.get.GetResult;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.security.authc.Authentication;
|
||||
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
|
||||
import org.elasticsearch.xpack.core.security.user.User;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.Action;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.ActionStatus;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
|
||||
|
@ -85,6 +88,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
@ -1072,6 +1076,33 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
assertThat(watchRecord.state(), is(ExecutionState.EXECUTED));
|
||||
}
|
||||
|
||||
public void testLoadingWatchExecutionUser() throws Exception {
|
||||
DateTime now = now(UTC);
|
||||
Watch watch = mock(Watch.class);
|
||||
WatchStatus status = mock(WatchStatus.class);
|
||||
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
|
||||
|
||||
// Should be null
|
||||
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
|
||||
context.ensureWatchExists(() -> watch);
|
||||
assertNull(context.getUser());
|
||||
|
||||
// Should still be null, header is not yet set
|
||||
when(watch.status()).thenReturn(status);
|
||||
context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
|
||||
context.ensureWatchExists(() -> watch);
|
||||
assertNull(context.getUser());
|
||||
|
||||
Authentication authentication = new Authentication(new User("joe", "admin"),
|
||||
new Authentication.RealmRef("native_realm", "native", "node1"), null);
|
||||
|
||||
// Should no longer be null now that the proper header is set
|
||||
when(status.getHeaders()).thenReturn(Collections.singletonMap(AuthenticationField.AUTHENTICATION_KEY, authentication.encode()));
|
||||
context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
|
||||
context.ensureWatchExists(() -> watch);
|
||||
assertThat(context.getUser(), equalTo("joe"));
|
||||
}
|
||||
|
||||
private WatchExecutionContext createMockWatchExecutionContext(String watchId, DateTime executionTime) {
|
||||
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
|
||||
when(ctx.id()).thenReturn(new Wid(watchId, executionTime));
|
||||
|
|
|
@ -21,6 +21,7 @@ watcher_manager:
|
|||
run_as:
|
||||
- powerless_user
|
||||
- watcher_manager
|
||||
- x_pack_rest_user
|
||||
|
||||
watcher_monitor:
|
||||
cluster:
|
||||
|
|
|
@ -74,10 +74,63 @@ teardown:
|
|||
id: "my_watch"
|
||||
- match: { watch_record.watch_id: "my_watch" }
|
||||
- match: { watch_record.state: "executed" }
|
||||
- match: { watch_record.user: "watcher_manager" }
|
||||
|
||||
|
||||
|
||||
|
||||
---
|
||||
"Test watch is runas user properly recorded":
|
||||
- do:
|
||||
xpack.watcher.put_watch:
|
||||
id: "my_watch"
|
||||
body: >
|
||||
{
|
||||
"trigger": {
|
||||
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
|
||||
},
|
||||
"input": {
|
||||
"search" : {
|
||||
"request" : {
|
||||
"indices" : [ "my_test_index" ],
|
||||
"body" :{
|
||||
"query" : { "match_all": {} }
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"condition" : {
|
||||
"compare" : {
|
||||
"ctx.payload.hits.total" : {
|
||||
"gte" : 1
|
||||
}
|
||||
}
|
||||
},
|
||||
"actions": {
|
||||
"logging": {
|
||||
"logging": {
|
||||
"text": "Successfully ran my_watch to test for search input"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
- match: { _id: "my_watch" }
|
||||
|
||||
- do:
|
||||
xpack.watcher.get_watch:
|
||||
id: "my_watch"
|
||||
- match: { _id: "my_watch" }
|
||||
- is_false: watch.status.headers
|
||||
|
||||
- do:
|
||||
headers: { es-security-runas-user: x_pack_rest_user }
|
||||
xpack.watcher.execute_watch:
|
||||
id: "my_watch"
|
||||
- match: { watch_record.watch_id: "my_watch" }
|
||||
- match: { watch_record.state: "executed" }
|
||||
- match: { watch_record.user: "x_pack_rest_user" }
|
||||
|
||||
|
||||
---
|
||||
"Test watch search input does not work against index user is not allowed to read":
|
||||
|
||||
|
@ -130,6 +183,7 @@ teardown:
|
|||
- match: { watch_record.watch_id: "my_watch" }
|
||||
# because we are not allowed to read the index, there wont be any data
|
||||
- match: { watch_record.state: "execution_not_needed" }
|
||||
- match: { watch_record.user: "watcher_manager" }
|
||||
|
||||
|
||||
---
|
||||
|
@ -272,6 +326,7 @@ teardown:
|
|||
id: "my_watch"
|
||||
- match: { watch_record.watch_id: "my_watch" }
|
||||
- match: { watch_record.state: "executed" }
|
||||
- match: { watch_record.user: "watcher_manager" }
|
||||
|
||||
- do:
|
||||
get:
|
||||
|
@ -320,6 +375,7 @@ teardown:
|
|||
id: "my_watch"
|
||||
- match: { watch_record.watch_id: "my_watch" }
|
||||
- match: { watch_record.state: "executed" }
|
||||
- match: { watch_record.user: "watcher_manager" }
|
||||
|
||||
- do:
|
||||
get:
|
||||
|
|
Loading…
Reference in New Issue