Watcher: Remove all traces from execution on master node (elastic/x-pack-elasticsearch#2383)

As there are no master node operations anymore.

* TransportActions are regular Actions now
* Watcher requests are now ActionRequests, no MasterNodeRequests anymore
* REST spec does not contain master node timeout parameters anymore
* WatcherLifeCycleService does not have a check anymore if watcher is able to run distributed, this will be a given in 7.0
* Some serialization BWC checks against version 5 have been removed

Original commit: elastic/x-pack-elasticsearch@4607dd538c
This commit is contained in:
Alexander Reelsen 2017-09-12 15:05:26 +02:00 committed by GitHub
parent e4882b36b7
commit c3f3ae5391
45 changed files with 103 additions and 584 deletions

View File

@ -166,7 +166,6 @@ setups['my_inactive_watch'] = '''
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
active: false
body: >
{

View File

@ -35,21 +35,3 @@ Response:
--------------------------------------------------
// TESTRESPONSE
[float]
==== Timeouts
When deleting a watch while it is executing, the delete action will block and
wait for the watch execution to finish. Depending on the nature of the watch, in
some situations this can take a while. For this reason, the delete watch action
is associated with a timeout that is set to 10 seconds by default. You can
control this timeout by passing in the `master_timeout` parameter.
The following snippet shows how to change the default timeout of the delete
action to 30 seconds:
[source,js]
--------------------------------------------------
DELETE _xpack/watcher/watch/my_watch?master_timeout=30s
--------------------------------------------------
// CONSOLE
// TEST[setup:my_active_watch]

View File

@ -98,22 +98,6 @@ A watch has the following fields:
|======
[float]
==== Timeouts
When updating a watch while it is executing, the put action will block and wait
for the watch execution to finish. Depending on the nature of the watch, in some
situations this can take a while. For this reason, the put watch action is
associated with a timeout that is set to 10 seconds by default. You can control
this timeout by passing in the `master_timeout` parameter.
The following snippet shows how to change the default timeout of the put action
to 30 seconds:
[source,js]
--------------------------------------------------
PUT _xpack/watcher/watch/my-watch?master_timeout=30s
--------------------------------------------------
[[watcher-api-put-watch-active-state]]
==== Controlling Default Active State

View File

@ -192,23 +192,18 @@ final class WatcherIndexingListener extends AbstractComponent implements Indexin
*/
@Override
public void clusterChanged(ClusterChangedEvent event) {
boolean isWatchExecutionDistributed = WatcherLifeCycleService.isWatchExecutionDistributed(event.state());
if (isWatchExecutionDistributed) {
if (event.state().nodes().getLocalNode().isDataNode() && event.metaDataChanged()) {
try {
IndexMetaData metaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
if (metaData == null) {
configuration = INACTIVE;
} else {
checkWatchIndexHasChanged(metaData, event);
}
} catch (IllegalStateException e) {
logger.error("error loading watches index: [{}]", e.getMessage());
if (event.state().nodes().getLocalNode().isDataNode() && event.metaDataChanged()) {
try {
IndexMetaData metaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
if (metaData == null) {
configuration = INACTIVE;
} else {
checkWatchIndexHasChanged(metaData, event);
}
} catch (IllegalStateException e) {
logger.error("error loading watches index: [{}]", e.getMessage());
configuration = INACTIVE;
}
} else {
configuration = INACTIVE;
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
@ -115,90 +114,60 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
if (currentWatcherStopped) {
executor.execute(() -> this.stop("watcher manually marked to shutdown in cluster state update, shutting down"));
} else {
// if there are old nodes in the cluster hosting the watch index shards, we cannot run distributed, only on the master node
boolean isDistributedWatchExecutionEnabled = isWatchExecutionDistributed(event.state());
if (isDistributedWatchExecutionEnabled) {
if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) {
DiscoveryNode localNode = event.state().nodes().getLocalNode();
RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId());
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) {
DiscoveryNode localNode = event.state().nodes().getLocalNode();
RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId());
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
// no watcher index, time to pause, as there are for sure no shards on this node
if (watcherIndexMetaData == null) {
if (previousAllocationIds.get().isEmpty() == false) {
previousAllocationIds.set(Collections.emptyList());
executor.execute(() -> watcherService.pauseExecution("no watcher index found"));
}
return;
}
String watchIndex = watcherIndexMetaData.getIndex().getName();
List<ShardRouting> localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED);
// no local shards, empty out watcher and not waste resources!
if (localShards.isEmpty()) {
if (previousAllocationIds.get().isEmpty() == false) {
executor.execute(() -> watcherService.pauseExecution("no local watcher shards"));
previousAllocationIds.set(Collections.emptyList());
}
return;
}
List<String> currentAllocationIds = localShards.stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toList());
Collections.sort(currentAllocationIds);
if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
previousAllocationIds.set(currentAllocationIds);
executor.execute(() -> watcherService.reload(event.state(), "different shard allocation ids"));
}
} else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) {
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStore.INDEX_NAME,
event.state().metaData());
boolean isIndexInternalFormatWatchIndex = watcherIndexMetaData == null ||
Upgrade.checkInternalIndexFormat(watcherIndexMetaData);
boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null ||
Upgrade.checkInternalIndexFormat(triggeredWatchesIndexMetaData);
if (isIndexInternalFormatTriggeredWatchIndex && isIndexInternalFormatWatchIndex) {
executor.execute(() -> start(event.state(), false));
} else {
logger.warn("not starting watcher, upgrade API run required: .watches[{}], .triggered_watches[{}]",
isIndexInternalFormatWatchIndex, isIndexInternalFormatTriggeredWatchIndex);
// no watcher index, time to pause, as there are for sure no shards on this node
if (watcherIndexMetaData == null) {
if (previousAllocationIds.get().isEmpty() == false) {
previousAllocationIds.set(Collections.emptyList());
executor.execute(() -> watcherService.pauseExecution("no watcher index found"));
}
return;
}
} else {
if (event.localNodeMaster()) {
if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) {
executor.execute(() -> start(event.state(), false));
String watchIndex = watcherIndexMetaData.getIndex().getName();
List<ShardRouting> localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED);
// no local shards, empty out watcher and not waste resources!
if (localShards.isEmpty()) {
if (previousAllocationIds.get().isEmpty() == false) {
executor.execute(() -> watcherService.pauseExecution("no local watcher shards"));
previousAllocationIds.set(Collections.emptyList());
}
return;
}
List<String> currentAllocationIds = localShards.stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toList());
Collections.sort(currentAllocationIds);
if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
previousAllocationIds.set(currentAllocationIds);
executor.execute(() -> watcherService.reload(event.state(), "different shard allocation ids"));
}
} else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) {
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStore.INDEX_NAME,
event.state().metaData());
boolean isIndexInternalFormatWatchIndex = watcherIndexMetaData == null ||
Upgrade.checkInternalIndexFormat(watcherIndexMetaData);
boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null ||
Upgrade.checkInternalIndexFormat(triggeredWatchesIndexMetaData);
if (isIndexInternalFormatTriggeredWatchIndex && isIndexInternalFormatWatchIndex) {
executor.execute(() -> start(event.state(), false));
} else {
if (watcherService.state() == WatcherState.STARTED || watcherService.state() == WatcherState.STARTING) {
executor.execute(() -> watcherService.pauseExecution("Pausing watcher, cluster contains old nodes not supporting" +
" distributed watch execution"));
}
logger.warn("not starting watcher, upgrade API run required: .watches[{}], .triggered_watches[{}]",
isIndexInternalFormatWatchIndex, isIndexInternalFormatTriggeredWatchIndex);
}
}
}
}
/**
* Checks if the preconditions are given to run watcher with distributed watch execution.
* The following requirements need to be fulfilled
*
* 1. The master node must run on a version greather than or equal 6.0
* 2. The nodes holding the watcher shards must run on a version greater than or equal 6.0
*
* @param state The cluster to check against
* @return true, if the above requirements are fulfilled, false otherwise
*/
public static boolean isWatchExecutionDistributed(ClusterState state) {
// short circuit if all nodes are on 6.x, should be standard after upgrade
return state.nodes().getMinNodeVersion().onOrAfter(Version.V_6_0_0_beta1);
}
public WatcherMetaData watcherMetaData() {
return watcherMetaData;
}

View File

@ -29,6 +29,7 @@ import static org.elasticsearch.rest.RestRequest.Method.PUT;
* The rest action to ack a watch
*/
public class RestAckWatchAction extends WatcherRestHandler {
public RestAckWatchAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(POST, URI_BASE + "/watch/{id}/_ack", this);
@ -49,7 +50,6 @@ public class RestAckWatchAction extends WatcherRestHandler {
if (actions != null) {
ackWatchRequest.setActionIds(actions);
}
ackWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", ackWatchRequest.masterNodeTimeout()));
return channel -> client.ackWatch(ackWatchRequest, new RestBuilderListener<AckWatchResponse>(channel) {
@Override
public RestResponse buildResponse(AckWatchResponse response, XContentBuilder builder) throws Exception {
@ -60,5 +60,4 @@ public class RestAckWatchAction extends WatcherRestHandler {
}
});
}
}

View File

@ -38,7 +38,6 @@ public class RestDeleteWatchAction extends WatcherRestHandler {
@Override
protected RestChannelConsumer doPrepareRequest(final RestRequest request, WatcherClient client) throws IOException {
DeleteWatchRequest deleteWatchRequest = new DeleteWatchRequest(request.param("id"));
deleteWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWatchRequest.masterNodeTimeout()));
return channel -> client.deleteWatch(deleteWatchRequest, new RestBuilderListener<DeleteWatchResponse>(channel) {
@Override
public RestResponse buildResponse(DeleteWatchResponse response, XContentBuilder builder) throws Exception {

View File

@ -46,7 +46,6 @@ public class RestPutWatchAction extends WatcherRestHandler implements RestReques
protected RestChannelConsumer doPrepareRequest(final RestRequest request, WatcherClient client) throws IOException {
PutWatchRequest putWatchRequest =
new PutWatchRequest(request.param("id"), request.content(), request.getXContentType());
putWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putWatchRequest.masterNodeTimeout()));
putWatchRequest.setActive(request.paramAsBoolean("active", putWatchRequest.isActive()));
return channel -> client.putWatch(putWatchRequest, new RestBuilderListener<PutWatchResponse>(channel) {
@Override

View File

@ -6,16 +6,11 @@
package org.elasticsearch.xpack.watcher.transport.actions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
@ -23,54 +18,25 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.watcher.WatcherLifeCycleService;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
import java.util.function.Supplier;
public abstract class WatcherTransportAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse>
extends TransportMasterNodeAction<Request, Response> {
public abstract class WatcherTransportAction<Request extends ActionRequest, Response extends ActionResponse>
extends HandledTransportAction<Request, Response> {
protected final XPackLicenseState licenseState;
private final ClusterService clusterService;
private final Supplier<Response> response;
public WatcherTransportAction(Settings settings, String actionName, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
XPackLicenseState licenseState, ClusterService clusterService, Supplier<Request> request,
Supplier<Response> response) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, request);
XPackLicenseState licenseState, Supplier<Request> request) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this.licenseState = licenseState;
this.clusterService = clusterService;
this.response = response;
}
protected String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
protected Response newResponse() {
return response.get();
}
protected abstract void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception;
protected boolean localExecute(Request request) {
return WatcherLifeCycleService.isWatchExecutionDistributed(clusterService.state());
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
IndexMetaData index = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData());
if (index != null) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, index.getIndex().getName());
} else {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
}
@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
if (licenseState.isWatcherAllowed()) {

View File

@ -5,13 +5,12 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.ack;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.watcher.watch.Watch;
import java.io.IOException;
@ -20,9 +19,7 @@ import java.util.Locale;
/**
* A ack watch request to ack a watch by name (id)
*/
public class AckWatchRequest extends MasterNodeRequest<AckWatchRequest> {
private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(10);
public class AckWatchRequest extends ActionRequest {
private String watchId;
private String[] actionIds = Strings.EMPTY_ARRAY;
@ -34,7 +31,6 @@ public class AckWatchRequest extends MasterNodeRequest<AckWatchRequest> {
public AckWatchRequest(String watchId, String... actionIds) {
this.watchId = watchId;
this.actionIds = actionIds;
masterNodeTimeout(DEFAULT_TIMEOUT);
}
/**

View File

@ -5,13 +5,13 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.ack;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
* A ack watch action request builder.
*/
public class AckWatchRequestBuilder extends MasterNodeOperationRequestBuilder<AckWatchRequest, AckWatchResponse, AckWatchRequestBuilder> {
public class AckWatchRequestBuilder extends ActionRequestBuilder<AckWatchRequest, AckWatchResponse, AckWatchRequestBuilder> {
public AckWatchRequestBuilder(ElasticsearchClient client) {
super(client, AckWatchAction.INSTANCE, new AckWatchRequest());
@ -25,6 +25,4 @@ public class AckWatchRequestBuilder extends MasterNodeOperationRequestBuilder<Ac
request.setActionIds(actionIds);
return this;
}
}

View File

@ -12,10 +12,8 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
@ -46,17 +44,16 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
@Inject
public TransportAckWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Clock clock, XPackLicenseState licenseState,
Watch.Parser parser, InternalClient client, ClusterService clusterService) {
Watch.Parser parser, InternalClient client) {
super(settings, AckWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, clusterService, AckWatchRequest::new, AckWatchResponse::new);
licenseState, AckWatchRequest::new);
this.clock = clock;
this.parser = parser;
this.client = client;
}
@Override
protected void masterOperation(AckWatchRequest request, ClusterState state,
ActionListener<AckWatchResponse> listener) throws Exception {
protected void doExecute(AckWatchRequest request, ActionListener<AckWatchResponse> listener) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);

View File

@ -5,12 +5,11 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.activate;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.watcher.watch.Watch;
import java.io.IOException;
@ -18,9 +17,7 @@ import java.io.IOException;
/**
* A ack watch request to ack a watch by name (id)
*/
public class ActivateWatchRequest extends MasterNodeRequest<ActivateWatchRequest> {
private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(10);
public class ActivateWatchRequest extends ActionRequest {
private String watchId;
private boolean activate;
@ -32,7 +29,6 @@ public class ActivateWatchRequest extends MasterNodeRequest<ActivateWatchRequest
public ActivateWatchRequest(String watchId, boolean activate) {
this.watchId = watchId;
this.activate = activate;
masterNodeTimeout(DEFAULT_TIMEOUT);
}
/**

View File

@ -5,13 +5,13 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.activate;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
* A activate watch action request builder.
*/
public class ActivateWatchRequestBuilder extends MasterNodeOperationRequestBuilder<ActivateWatchRequest, ActivateWatchResponse,
public class ActivateWatchRequestBuilder extends ActionRequestBuilder<ActivateWatchRequest, ActivateWatchResponse,
ActivateWatchRequestBuilder> {
public ActivateWatchRequestBuilder(ElasticsearchClient client) {

View File

@ -12,10 +12,8 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -25,7 +23,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
@ -45,25 +42,20 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
private final Clock clock;
private final Watch.Parser parser;
private final Client client;
private final TriggerService triggerService;
@Inject
public TransportActivateWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Clock clock,
XPackLicenseState licenseState, Watch.Parser parser, ClusterService clusterService,
InternalClient client, TriggerService triggerService) {
XPackLicenseState licenseState, Watch.Parser parser, InternalClient client) {
super(settings, ActivateWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, clusterService, ActivateWatchRequest::new, ActivateWatchResponse::new);
licenseState, ActivateWatchRequest::new);
this.clock = clock;
this.parser = parser;
this.client = client;
this.triggerService = triggerService;
}
@Override
protected void masterOperation(ActivateWatchRequest request, ClusterState state, ActionListener<ActivateWatchResponse> listener)
throws Exception {
protected void doExecute(ActivateWatchRequest request, ActionListener<ActivateWatchResponse> listener) {
try {
DateTime now = new DateTime(clock.millis(), UTC);
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
@ -84,13 +76,6 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
XContentType.JSON);
watch.version(getResponse.getVersion());
watch.status().version(getResponse.getVersion());
if (localExecute(request)) {
if (watch.status().state().isActive()) {
triggerService.add(watch);
} else {
triggerService.remove(watch.id());
}
}
listener.onResponse(new ActivateWatchResponse(watch.status()));
} else {
listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId()));
@ -114,5 +99,4 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
return builder;
}
}
}

View File

@ -5,13 +5,12 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.delete;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.watcher.watch.Watch;
import java.io.IOException;
@ -19,9 +18,7 @@ import java.io.IOException;
/**
* A delete watch request to delete an watch by name (id)
*/
public class DeleteWatchRequest extends MasterNodeRequest<DeleteWatchRequest> {
private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(10);
public class DeleteWatchRequest extends ActionRequest {
private String id;
private long version = Versions.MATCH_ANY;
@ -32,7 +29,6 @@ public class DeleteWatchRequest extends MasterNodeRequest<DeleteWatchRequest> {
public DeleteWatchRequest(String id) {
this.id = id;
masterNodeTimeout(DEFAULT_TIMEOUT);
}
/**

View File

@ -5,13 +5,13 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.delete;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
* A delete document action request builder.
*/
public class DeleteWatchRequestBuilder extends MasterNodeOperationRequestBuilder<DeleteWatchRequest, DeleteWatchResponse,
public class DeleteWatchRequestBuilder extends ActionRequestBuilder<DeleteWatchRequest, DeleteWatchResponse,
DeleteWatchRequestBuilder> {
public DeleteWatchRequestBuilder(ElasticsearchClient client) {

View File

@ -5,15 +5,13 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.execute;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.watcher.execution.ActionExecutionMode;
@ -28,7 +26,7 @@ import java.util.Map;
/**
* An execute watch request to execute a watch by id
*/
public class ExecuteWatchRequest extends MasterNodeReadRequest<ExecuteWatchRequest> {
public class ExecuteWatchRequest extends ActionRequest {
public static final String INLINE_WATCH_ID = "_inlined_";
@ -240,11 +238,7 @@ public class ExecuteWatchRequest extends MasterNodeReadRequest<ExecuteWatchReque
}
if (in.readBoolean()) {
watchSource = in.readBytesReference();
if (in.getVersion().onOrAfter(Version.V_5_3_0)) {
xContentType = XContentType.readFrom(in);
} else {
xContentType = XContentFactory.xContentType(watchSource);
}
xContentType = XContentType.readFrom(in);
}
debug = in.readBoolean();
}
@ -272,9 +266,7 @@ public class ExecuteWatchRequest extends MasterNodeReadRequest<ExecuteWatchReque
out.writeBoolean(watchSource != null);
if (watchSource != null) {
out.writeBytesReference(watchSource);
if (out.getVersion().onOrAfter(Version.V_5_3_0)) {
xContentType.writeTo(out);
}
xContentType.writeTo(out);
}
out.writeBoolean(debug);
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.execute;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentType;
@ -19,7 +19,7 @@ import java.util.Map;
/**
* A execute watch action request builder.
*/
public class ExecuteWatchRequestBuilder extends MasterNodeOperationRequestBuilder<ExecuteWatchRequest, ExecuteWatchResponse,
public class ExecuteWatchRequestBuilder extends ActionRequestBuilder<ExecuteWatchRequest, ExecuteWatchResponse,
ExecuteWatchRequestBuilder> {
public ExecuteWatchRequestBuilder(ElasticsearchClient client) {

View File

@ -12,10 +12,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -62,10 +60,9 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
public TransportExecuteWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ExecutionService executionService, Clock clock, XPackLicenseState licenseState,
Watch.Parser watchParser, InternalClient client, TriggerService triggerService,
ClusterService clusterService) {
Watch.Parser watchParser, InternalClient client, TriggerService triggerService) {
super(settings, ExecuteWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, clusterService, ExecuteWatchRequest::new, ExecuteWatchResponse::new);
licenseState, ExecuteWatchRequest::new);
this.executionService = executionService;
this.clock = clock;
this.triggerService = triggerService;
@ -74,8 +71,7 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
}
@Override
protected void masterOperation(ExecuteWatchRequest request, ClusterState state,
ActionListener<ExecuteWatchResponse> listener) throws Exception {
protected void doExecute(ExecuteWatchRequest request, ActionListener<ExecuteWatchResponse> listener) {
if (request.getId() != null) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);
@ -139,5 +135,4 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
}
});
}
}

View File

@ -5,9 +5,9 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.get;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.watcher.watch.Watch;
@ -17,7 +17,7 @@ import java.io.IOException;
/**
* The request to get the watch by name (id)
*/
public class GetWatchRequest extends MasterNodeReadRequest<GetWatchRequest> {
public class GetWatchRequest extends ActionRequest {
private String id;

View File

@ -5,13 +5,13 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.get;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
* A delete document action request builder.
*/
public class GetWatchRequestBuilder extends MasterNodeReadOperationRequestBuilder<GetWatchRequest, GetWatchResponse,
public class GetWatchRequestBuilder extends ActionRequestBuilder<GetWatchRequest, GetWatchResponse,
GetWatchRequestBuilder> {
public GetWatchRequestBuilder(ElasticsearchClient client, String id) {

View File

@ -9,10 +9,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -41,17 +39,16 @@ public class TransportGetWatchAction extends WatcherTransportAction<GetWatchRequ
@Inject
public TransportGetWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, XPackLicenseState licenseState,
Watch.Parser parser, Clock clock, InternalClient client, ClusterService clusterService) {
Watch.Parser parser, Clock clock, InternalClient client) {
super(settings, GetWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, clusterService, GetWatchRequest::new, GetWatchResponse::new);
licenseState, GetWatchRequest::new);
this.parser = parser;
this.clock = clock;
this.client = client;
}
@Override
protected void masterOperation(GetWatchRequest request, ClusterState state,
ActionListener<GetWatchResponse> listener) throws Exception {
protected void doExecute(GetWatchRequest request, ActionListener<GetWatchResponse> listener) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);

View File

@ -6,15 +6,12 @@
package org.elasticsearch.xpack.watcher.transport.actions.put;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.watcher.watch.Watch;
@ -25,9 +22,7 @@ import java.io.IOException;
* This request class contains the data needed to create a watch along with the name of the watch.
* The name of the watch will become the ID of the indexed document.
*/
public class PutWatchRequest extends MasterNodeRequest<PutWatchRequest> {
private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(10);
public class PutWatchRequest extends ActionRequest {
private String id;
private BytesReference source;
@ -45,7 +40,6 @@ public class PutWatchRequest extends MasterNodeRequest<PutWatchRequest> {
this.id = id;
this.source = source;
this.xContentType = xContentType;
masterNodeTimeout(DEFAULT_TIMEOUT);
}
/**
@ -125,11 +119,7 @@ public class PutWatchRequest extends MasterNodeRequest<PutWatchRequest> {
id = in.readString();
source = in.readBytesReference();
active = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_5_3_0)) {
xContentType = XContentType.readFrom(in);
} else {
xContentType = XContentFactory.xContentType(source);
}
xContentType = XContentType.readFrom(in);
}
@Override
@ -138,9 +128,6 @@ public class PutWatchRequest extends MasterNodeRequest<PutWatchRequest> {
out.writeString(id);
out.writeBytesReference(source);
out.writeBoolean(active);
if (out.getVersion().onOrAfter(Version.V_5_3_0)) {
xContentType.writeTo(out);
}
xContentType.writeTo(out);
}
}

View File

@ -5,13 +5,13 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.put;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
public class PutWatchRequestBuilder extends MasterNodeOperationRequestBuilder<PutWatchRequest, PutWatchResponse, PutWatchRequestBuilder> {
public class PutWatchRequestBuilder extends ActionRequestBuilder<PutWatchRequest, PutWatchResponse, PutWatchRequestBuilder> {
public PutWatchRequestBuilder(ElasticsearchClient client) {
super(client, PutWatchAction.INSTANCE, new PutWatchRequest());

View File

@ -10,9 +10,7 @@ import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -24,7 +22,6 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
@ -39,24 +36,20 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
private final Clock clock;
private final Watch.Parser parser;
private final InternalClient client;
private final TriggerService triggerService;
@Inject
public TransportPutWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Clock clock, XPackLicenseState licenseState,
Watch.Parser parser, InternalClient client, ClusterService clusterService,
TriggerService triggerService) {
Watch.Parser parser, InternalClient client) {
super(settings, PutWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, clusterService, PutWatchRequest::new, PutWatchResponse::new);
licenseState, PutWatchRequest::new);
this.clock = clock;
this.parser = parser;
this.client = client;
this.triggerService = triggerService;
}
@Override
protected void masterOperation(PutWatchRequest request, ClusterState state,
ActionListener<PutWatchResponse> listener) throws Exception {
protected void doExecute(PutWatchRequest request, ActionListener<PutWatchResponse> listener) {
try {
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getId(), false, request.getSource(), now, request.xContentType());
@ -73,9 +66,6 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
client.index(indexRequest, ActionListener.wrap(indexResponse -> {
boolean created = indexResponse.getResult() == DocWriteResponse.Result.CREATED;
if (localExecute(request) == false && watch.status().state().isActive()) {
triggerService.add(watch);
}
listener.onResponse(new PutWatchResponse(indexResponse.getId(), indexResponse.getVersion(), created));
}, listener::onFailure));
}

View File

@ -41,7 +41,6 @@ import java.util.concurrent.ExecutorService;
import static java.util.Arrays.asList;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
@ -406,43 +405,6 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
verify(watcherService, never()).start(any(ClusterState.class));
}
public void testWatcherPausesOnNonMasterWhenOldNodesHoldWatcherIndex() {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("node_1").localNodeId("node_2")
.add(newNode("node_1"))
.add(newNode("node_2"))
.add(newNode("oldNode", VersionUtils.randomVersionBetween(random(), Version.V_5_5_0, Version.V_6_0_0_alpha2)))
.build();
Index index = new Index(Watch.INDEX, "foo");
ShardId shardId = new ShardId(index, 0);
IndexRoutingTable routingTable = IndexRoutingTable.builder(index)
.addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED))
.addShard(TestShardRouting.newShardRouting(shardId, "oldNode", false, STARTED)).build();
Settings.Builder indexSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6);
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(Watch.INDEX).settings(indexSettings);
ClusterState state = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes)
.routingTable(RoutingTable.builder().add(routingTable).build())
.metaData(MetaData.builder().put(indexMetaDataBuilder))
.build();
WatcherState watcherState = randomFrom(WatcherState.values());
when(watcherService.state()).thenReturn(watcherState);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
if (watcherState == WatcherState.STARTED || watcherState == WatcherState.STARTING) {
verify(watcherService).pauseExecution(any(String.class));
}
}
public void testWatcherStartsOnlyOnMasterWhenOldNodesAreInCluster() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("node_1").localNodeId("node_1")
@ -459,29 +421,6 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
verify(watcherService).start(any(ClusterState.class));
}
public void testDistributedWatchExecutionDisabledWith5xNodesInCluster() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("node_1").localNodeId("node_1")
.add(newNode("node_1"))
.add(newNode("node_2", VersionUtils.randomVersionBetween(random(), Version.V_5_5_0, Version.V_6_0_0_alpha2)))
.build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
assertThat(WatcherLifeCycleService.isWatchExecutionDistributed(state), is(false));
}
public void testDistributedWatchExecutionEnabled() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("master_node").localNodeId("master_node")
.add(newNode("master_node", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_beta1, Version.CURRENT)))
.add(newNode("data_node_6x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_beta1, Version.CURRENT)))
.build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
assertThat(WatcherLifeCycleService.isWatchExecutionDistributed(state), is(true));
}
private static DiscoveryNode newNode(String nodeName) {
return newNode(nodeName, Version.CURRENT);
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.transport.action.put;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
@ -14,9 +13,6 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchRequest;
import java.io.IOException;
import java.util.Base64;
import static org.hamcrest.Matchers.is;
public class PutWatchSerializationTests extends ESTestCase {
@ -61,25 +57,4 @@ public class PutWatchSerializationTests extends ESTestCase {
assertThat(readRequest.getSource(), is(request.getSource()));
assertThat(readRequest.xContentType(), is(XContentType.JSON));
}
public void testPutWatchSerializationXContentBwc() throws IOException {
final byte[] data = Base64.getDecoder().decode("ADwDAmlkDXsiZm9vIjoiYmFyIn0BAAAA");
final Version version = randomFrom(Version.V_5_0_0, Version.V_5_0_1, Version.V_5_0_2,
Version.V_5_1_1, Version.V_5_1_2, Version.V_5_2_0);
try (StreamInput in = StreamInput.wrap(data)) {
in.setVersion(version);
PutWatchRequest request = new PutWatchRequest();
request.readFrom(in);
assertEquals(XContentType.JSON, request.xContentType());
assertEquals("id", request.getId());
assertTrue(request.isActive());
assertEquals("{\"foo\":\"bar\"}", request.getSource().utf8ToString());
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
request.writeTo(out);
assertArrayEquals(data, out.bytes().toBytesRef().bytes);
}
}
}
}

View File

@ -1,172 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.actions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.util.Collections;
import java.util.HashSet;
import static java.util.Arrays.asList;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class WatcherTransportActionTests extends ESTestCase {
private MyTransportAction transportAction;
private ClusterService clusterService;
private TransportService transportService;
@Before
public void createTransportAction() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(any())).thenReturn(EsExecutors.newDirectExecutorService());
clusterService = mock(ClusterService.class);
transportService = mock(TransportService.class);
transportAction = new MyTransportAction(transportService, threadPool, clusterService);
}
public void testThatRequestIsExecutedLocallyWithDistributedExecutionEnabled() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("master_node").localNodeId("data_node")
.add(newNode("master_node", Version.CURRENT))
.add(newNode("data_node", Version.CURRENT))
.build();
Index watchIndex = new Index(Watch.INDEX, "foo");
ShardId shardId = new ShardId(watchIndex, 0);
IndexRoutingTable routingTable = IndexRoutingTable.builder(watchIndex)
.addShard(TestShardRouting.newShardRouting(shardId, "data_node", true, STARTED)).build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes)
.routingTable(RoutingTable.builder().add(routingTable).build())
.build();
when(clusterService.state()).thenReturn(state);
when(clusterService.localNode()).thenReturn(state.nodes().getLocalNode());
MyActionRequest request = new MyActionRequest();
PlainActionFuture<MyActionResponse> future = PlainActionFuture.newFuture();
Task task = request.createTask(1, "type", "action", new TaskId("parent", 0));
transportAction.doExecute(task, request, future);
MyActionResponse response = future.actionGet(1000);
assertThat(response.request, is(request));
}
public void testThatRequestIsExecutedByMasterWithDistributedExecutionDisabled() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("master_node").localNodeId("master_node")
.add(newNode("master_node", VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, Version.V_6_0_0_alpha2)))
.build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
when(clusterService.state()).thenReturn(state);
when(clusterService.localNode()).thenReturn(state.nodes().getLocalNode());
MyActionRequest request = new MyActionRequest();
PlainActionFuture<MyActionResponse> future = PlainActionFuture.newFuture();
Task task = request.createTask(1, "type", "action", new TaskId("parent", 0));
transportAction.doExecute(task, request, future);
MyActionResponse response = future.actionGet(1000);
assertThat(response.request, is(request));
}
public void testThatRequestIsForwardedToMasterWithDistributedExecutionDisabled() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("master_node").localNodeId("non_master_node")
.add(newNode("master_node", VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, Version.V_6_0_0_alpha2)))
.add(newNode("non_master_node", Version.CURRENT))
.build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
when(clusterService.state()).thenReturn(state);
when(clusterService.localNode()).thenReturn(state.nodes().getLocalNode());
MyActionRequest request = new MyActionRequest();
Task task = request.createTask(1, "type", "action", new TaskId("parent", 0));
transportAction.doExecute(task, request, PlainActionFuture.newFuture());
// dont wait for the future here, we would need to stub the action listener of the sendRequest call
ArgumentCaptor<DiscoveryNode> nodeArgumentCaptor = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(nodeArgumentCaptor.capture(), eq("my_action_name"), eq(request), any());
assertThat(nodeArgumentCaptor.getValue().getId(), is("master_node"));
}
private static DiscoveryNode newNode(String nodeName, Version version) {
return new DiscoveryNode(nodeName, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(asList(DiscoveryNode.Role.values())), version);
}
private final class MyTransportAction extends WatcherTransportAction<MyActionRequest, MyActionResponse> {
MyTransportAction(TransportService transportService, ThreadPool threadPool, ClusterService clusterService) {
super(Settings.EMPTY, "my_action_name", transportService, threadPool, new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver(Settings.EMPTY), new XPackLicenseState(), clusterService, MyActionRequest::new,
MyActionResponse::new);
}
@Override
protected void masterOperation(MyActionRequest request, ClusterState state,
ActionListener<MyActionResponse> listener) throws Exception {
listener.onResponse(new MyActionResponse(request));
}
}
private static final class MyActionResponse extends ActionResponse {
MyActionRequest request;
MyActionResponse(MyActionRequest request) {
super();
this.request = request;
}
MyActionResponse() {}
}
private static final class MyActionRequest extends MasterNodeRequest<MyActionRequest> {
@Override
public ActionRequestValidationException validate() {
return null;
}
}
}

View File

@ -15,12 +15,6 @@
"type" : "list",
"description" : "A comma-separated list of the action ids to be acked"
}
},
"params": {
"master_timeout": {
"type": "time",
"description": "Explicit operation timeout for connection to master node"
}
}
},
"body": null

View File

@ -11,12 +11,6 @@
"description" : "Watch ID",
"required" : true
}
},
"params": {
"master_timeout": {
"type": "time",
"description": "Explicit operation timeout for connection to master node"
}
}
},
"body": null

View File

@ -11,12 +11,6 @@
"description" : "Watch ID",
"required" : true
}
},
"params": {
"master_timeout": {
"type": "time",
"description": "Explicit operation timeout for connection to master node"
}
}
},
"body": null

View File

@ -12,12 +12,6 @@
"description" : "Watch ID",
"required" : true
}
},
"params": {
"master_timeout": {
"type": "time",
"description": "Explicit operation timeout for connection to master node"
}
}
},
"body": null

View File

@ -13,10 +13,6 @@
}
},
"params": {
"master_timeout": {
"type": "time",
"description": "Explicit operation timeout for connection to master node"
},
"active": {
"type": "boolean",
"description": "Specify whether the watch is in/active by default"

View File

@ -7,7 +7,6 @@
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger" : {

View File

@ -7,7 +7,6 @@
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger" : {

View File

@ -7,7 +7,6 @@
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger" : {

View File

@ -16,7 +16,6 @@ teardown:
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {

View File

@ -7,7 +7,6 @@
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {

View File

@ -16,7 +16,6 @@ teardown:
- do:
xpack.watcher.put_watch:
id: "my_watch1"
master_timeout: "40s"
body: >
{
"throttle_period" : "10s",

View File

@ -16,7 +16,6 @@ teardown:
- do:
xpack.watcher.put_watch:
id: "my_watch1"
master_timeout: "40s"
body: >
{
"trigger": {

View File

@ -16,7 +16,6 @@ teardown:
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
active: false
body: >
{

View File

@ -8,7 +8,6 @@
catch: /Configured URL is empty/
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {
@ -48,7 +47,6 @@
catch: /Malformed URL/
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {

View File

@ -16,7 +16,6 @@ teardown:
- do:
xpack.watcher.put_watch:
id: "my_watch1"
master_timeout: "40s"
body: >
{
"trigger": {

View File

@ -16,7 +16,6 @@ teardown:
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {
@ -65,7 +64,6 @@ teardown:
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {
@ -114,7 +112,6 @@ teardown:
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {
@ -174,7 +171,6 @@ teardown:
- do:
xpack.watcher.put_watch:
id: "my_watch"
master_timeout: "40s"
body: >
{
"trigger": {