Watcher: Start watcher on master node only with mixed versions (elastic/x-pack-elasticsearch#1983)

When there are data or master nodes in the cluster, that are older
than ES 6.0 alpha3, then watcher will only start on the master node.

Changed all transport actions to be master node actions, as there is
already a method to decide to run locally, which we can piggyback on.

Original commit: elastic/x-pack-elasticsearch@65cecb6d69
This commit is contained in:
Alexander Reelsen 2017-07-12 20:58:47 +02:00 committed by GitHub
parent 51adbf0f51
commit e64cf23b13
15 changed files with 561 additions and 186 deletions

View File

@ -132,17 +132,24 @@ public class Upgrade implements ActionPlugin {
}
private static void preWatcherUpgrade(Client client, ActionListener<Boolean> listener) {
ActionListener<DeleteIndexTemplateResponse> triggeredWatchIndexTemplateListener = deleteIndexTemplateListener("triggered_watches",
listener, () -> listener.onResponse(true));
ActionListener<DeleteIndexTemplateResponse> watchIndexTemplateListener = deleteIndexTemplateListener("watches", listener,
() -> client.admin().indices().prepareDeleteTemplate("triggered_watches").execute(triggeredWatchIndexTemplateListener));
new WatcherClient(client).watcherStats(new WatcherStatsRequest(), ActionListener.wrap(
stats -> {
if (stats.watcherMetaData().manuallyStopped()) {
// don't start the watcher after upgrade
// don't start watcher after upgrade
listener.onResponse(false);
} else {
// stop the watcher
// stop watcher
new WatcherClient(client).watcherService(new WatcherServiceRequest().stop(), ActionListener.wrap(
stopResponse -> {
if (stopResponse.isAcknowledged()) {
listener.onResponse(true);
// delete old templates before indexing
client.admin().indices().prepareDeleteTemplate("watches").execute(watchIndexTemplateListener);
} else {
listener.onFailure(new IllegalStateException("unable to stop watcher service"));
}
@ -153,9 +160,6 @@ public class Upgrade implements ActionPlugin {
}
private static void postWatcherUpgrade(Client client, Boolean shouldStartWatcher, ActionListener<TransportResponse.Empty> listener) {
// if you are confused that these steps are numbered reversed, we are creating the action listeners first
// but only calling the deletion at the end of the method (inception style)
// step 3, after successful deletion of triggered watch index: start watcher
ActionListener<DeleteIndexResponse> deleteTriggeredWatchIndexResponse = ActionListener.wrap(deleteIndexResponse ->
startWatcherIfNeeded(shouldStartWatcher, client, listener), e -> {
if (e instanceof IndexNotFoundException) {
@ -165,25 +169,18 @@ public class Upgrade implements ActionPlugin {
}
});
// step 2, after acknowledged delete triggered watches template: delete triggered watches index
ActionListener<DeleteIndexTemplateResponse> triggeredWatchIndexTemplateListener = ActionListener.wrap(r -> {
if (r.isAcknowledged()) {
client.admin().indices().prepareDelete(".triggered_watches").execute(deleteTriggeredWatchIndexResponse);
} else {
listener.onFailure(new ElasticsearchException("Deleting triggered_watches template not acknowledged"));
}
client.admin().indices().prepareDelete(".triggered_watches").execute(deleteTriggeredWatchIndexResponse);
}
}, listener::onFailure);
// step 1, after acknowledged watches template deletion: delete triggered_watches template
ActionListener<DeleteIndexTemplateResponse> watchIndexTemplateListener = ActionListener.wrap(r -> {
private static ActionListener<DeleteIndexTemplateResponse> deleteIndexTemplateListener(String name, ActionListener<Boolean> listener,
Runnable runnable) {
return ActionListener.wrap(r -> {
if (r.isAcknowledged()) {
client.admin().indices().prepareDeleteTemplate("triggered_watches").execute(triggeredWatchIndexTemplateListener);
runnable.run();
} else {
listener.onFailure(new ElasticsearchException("Deleting watches template not acknowledged"));
listener.onFailure(new ElasticsearchException("Deleting [{}] template was not acknowledged", name));
}
}, listener::onFailure);
client.admin().indices().prepareDeleteTemplate("watches").execute(watchIndexTemplateListener);
}
private static void startWatcherIfNeeded(Boolean shouldStartWatcher, Client client, ActionListener<TransportResponse.Empty> listener) {

View File

@ -192,18 +192,23 @@ final class WatcherIndexingListener extends AbstractComponent implements Indexin
*/
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().nodes().getLocalNode().isDataNode() && event.metaDataChanged()) {
try {
IndexMetaData metaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
if (metaData == null) {
boolean isWatchExecutionDistributed = WatcherLifeCycleService.isWatchExecutionDistributed(event.state());
if (isWatchExecutionDistributed) {
if (event.state().nodes().getLocalNode().isDataNode() && event.metaDataChanged()) {
try {
IndexMetaData metaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
if (metaData == null) {
configuration = INACTIVE;
} else {
checkWatchIndexHasChanged(metaData, event);
}
} catch (IllegalStateException e) {
logger.error("error loading watches index: [{}]", e.getMessage());
configuration = INACTIVE;
} else {
checkWatchIndexHasChanged(metaData, event);
}
} catch (IllegalStateException e) {
logger.error("error loading watches index: [{}]", e.getMessage());
configuration = INACTIVE;
}
} else {
configuration = INACTIVE;
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.watcher;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
@ -36,17 +37,15 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {
private final WatcherService watcherService;
private final ClusterService clusterService;
private final ExecutorService executor;
private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
private volatile WatcherMetaData watcherMetaData;
public WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
WatcherService watcherService) {
WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
WatcherService watcherService) {
super(settings);
this.executor = threadPool.executor(ThreadPool.Names.GENERIC);
this.watcherService = watcherService;
this.clusterService = clusterService;
clusterService.addListener(this);
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an watch is scheduled.
@ -59,10 +58,6 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
watcherMetaData = new WatcherMetaData(!settings.getAsBoolean("xpack.watcher.start_immediately", true));
}
public void start() {
start(clusterService.state(), true);
}
public void stop(String reason) {
watcherService.stop(reason);
}
@ -120,59 +115,89 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
if (currentWatcherStopped) {
executor.execute(() -> this.stop("watcher manually marked to shutdown in cluster state update, shutting down"));
} else {
if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) {
DiscoveryNode localNode = event.state().nodes().getLocalNode();
RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId());
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
// if there are old nodes in the cluster hosting the watch index shards, we cannot run distributed, only on the master node
boolean isDistributedWatchExecutionEnabled = isWatchExecutionDistributed(event.state());
if (isDistributedWatchExecutionEnabled) {
if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) {
DiscoveryNode localNode = event.state().nodes().getLocalNode();
RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId());
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
// no watcher index, time to pause, if we currently have shards here
if (watcherIndexMetaData == null) {
if (previousAllocationIds.get().isEmpty() == false) {
previousAllocationIds.set(Collections.emptyList());
executor.execute(() -> watcherService.pauseExecution("no watcher index found"));
// no watcher index, time to pause, as there are for sure no shards on this node
if (watcherIndexMetaData == null) {
if (previousAllocationIds.get().isEmpty() == false) {
previousAllocationIds.set(Collections.emptyList());
executor.execute(() -> watcherService.pauseExecution("no watcher index found"));
}
return;
}
return;
}
String watchIndex = watcherIndexMetaData.getIndex().getName();
List<ShardRouting> localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED);
String watchIndex = watcherIndexMetaData.getIndex().getName();
List<ShardRouting> localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED);
// no local shards, empty out watcher and not waste resources!
if (localShards.isEmpty()) {
if (previousAllocationIds.get().isEmpty() == false) {
executor.execute(() -> watcherService.pauseExecution("no local watcher shards"));
previousAllocationIds.set(Collections.emptyList());
// no local shards, empty out watcher and not waste resources!
if (localShards.isEmpty()) {
if (previousAllocationIds.get().isEmpty() == false) {
executor.execute(() -> watcherService.pauseExecution("no local watcher shards"));
previousAllocationIds.set(Collections.emptyList());
}
return;
}
return;
}
List<String> currentAllocationIds = localShards.stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toList());
Collections.sort(currentAllocationIds);
List<String> currentAllocationIds = localShards.stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toList());
Collections.sort(currentAllocationIds);
if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
previousAllocationIds.set(currentAllocationIds);
executor.execute(() -> watcherService.reload(event.state(), "different shard allocation ids"));
if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
previousAllocationIds.set(currentAllocationIds);
executor.execute(() -> watcherService.reload(event.state(), "different shard allocation ids"));
}
} else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) {
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStore.INDEX_NAME,
event.state().metaData());
boolean isIndexInternalFormatWatchIndex = watcherIndexMetaData == null ||
Upgrade.checkInternalIndexFormat(watcherIndexMetaData);
boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null ||
Upgrade.checkInternalIndexFormat(triggeredWatchesIndexMetaData);
if (isIndexInternalFormatTriggeredWatchIndex && isIndexInternalFormatWatchIndex) {
executor.execute(() -> start(event.state(), false));
} else {
logger.warn("Not starting watcher, the indices have not been upgraded yet. Please run the Upgrade API");
}
}
} else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) {
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStore.INDEX_NAME,
event.state().metaData());
boolean isIndexInternalFormatWatchIndex = watcherIndexMetaData == null ||
Upgrade.checkInternalIndexFormat(watcherIndexMetaData);
boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null ||
Upgrade.checkInternalIndexFormat(triggeredWatchesIndexMetaData);
if (isIndexInternalFormatTriggeredWatchIndex && isIndexInternalFormatWatchIndex) {
executor.execute(() -> start(event.state(), false));
} else {
if (event.localNodeMaster()) {
if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) {
executor.execute(() -> start(event.state(), false));
}
} else {
logger.warn("Not starting watcher, the indices have not been upgraded yet. Please run the Upgrade API");
if (watcherService.state() == WatcherState.STARTED || watcherService.state() == WatcherState.STARTING) {
executor.execute(() -> watcherService.pauseExecution("Pausing watcher, cluster contains old nodes not supporting" +
" distributed watch execution"));
}
}
}
}
}
/**
* Checks if the preconditions are given to run watcher with distributed watch execution.
* The following requirements need to be fulfilled
*
* 1. The master node must run on a version greather than or equal 6.0
* 2. The nodes holding the watcher shards must run on a version greater than or equal 6.0
*
* @param state The cluster to check against
* @return true, if the above requirements are fulfilled, false otherwise
*/
public static boolean isWatchExecutionDistributed(ClusterState state) {
// short circuit if all nodes are on 6.x, should be standard after upgrade
return state.nodes().getMinNodeVersion().onOrAfter(Version.V_6_0_0_beta1);
}
public WatcherMetaData watcherMetaData() {
return watcherMetaData;
}

View File

@ -6,11 +6,16 @@
package org.elasticsearch.xpack.watcher.transport.actions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
@ -18,23 +23,56 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.watcher.WatcherLifeCycleService;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
import java.util.function.Supplier;
public abstract class WatcherTransportAction<Request extends ActionRequest, Response extends ActionResponse>
extends HandledTransportAction<Request, Response> {
public abstract class WatcherTransportAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse>
extends TransportMasterNodeAction<Request, Response> {
protected final XPackLicenseState licenseState;
private final ClusterService clusterService;
private final Supplier<Response> response;
public WatcherTransportAction(Settings settings, String actionName, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
XPackLicenseState licenseState, Supplier<Request> request) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
XPackLicenseState licenseState, ClusterService clusterService, Supplier<Request> request,
Supplier<Response> response) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, request);
this.licenseState = licenseState;
this.clusterService = clusterService;
this.response = response;
}
protected String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
protected Response newResponse() {
return response.get();
}
protected abstract void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception;
protected boolean localExecute(Request request) {
return WatcherLifeCycleService.isWatchExecutionDistributed(clusterService.state());
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
IndexMetaData index = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData());
if (index != null) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, index.getIndex().getName());
} else {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
}
@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
if (licenseState.isWatcherAllowed()) {
super.doExecute(task, request, listener);
} else {

View File

@ -12,8 +12,10 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
@ -44,16 +46,17 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
@Inject
public TransportAckWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Clock clock, XPackLicenseState licenseState,
Watch.Parser parser, InternalClient client) {
Watch.Parser parser, InternalClient client, ClusterService clusterService) {
super(settings, AckWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, AckWatchRequest::new);
licenseState, clusterService, AckWatchRequest::new, AckWatchResponse::new);
this.clock = clock;
this.parser = parser;
this.client = client;
}
@Override
protected void doExecute(AckWatchRequest request, ActionListener<AckWatchResponse> listener) {
protected void masterOperation(AckWatchRequest request, ClusterState state,
ActionListener<AckWatchResponse> listener) throws Exception {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);

View File

@ -12,8 +12,10 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -23,6 +25,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
@ -42,21 +45,25 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
private final Clock clock;
private final Watch.Parser parser;
private final Client client;
private final TriggerService triggerService;
@Inject
public TransportActivateWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Clock clock,
XPackLicenseState licenseState, Watch.Parser parser,
InternalClient client) {
XPackLicenseState licenseState, Watch.Parser parser, ClusterService clusterService,
InternalClient client, TriggerService triggerService) {
super(settings, ActivateWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, ActivateWatchRequest::new);
licenseState, clusterService, ActivateWatchRequest::new, ActivateWatchResponse::new);
this.clock = clock;
this.parser = parser;
this.client = client;
this.triggerService = triggerService;
}
@Override
protected void doExecute(ActivateWatchRequest request, ActionListener<ActivateWatchResponse> listener) {
protected void masterOperation(ActivateWatchRequest request, ClusterState state, ActionListener<ActivateWatchResponse> listener)
throws Exception {
try {
DateTime now = new DateTime(clock.millis(), UTC);
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
@ -77,6 +84,13 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
XContentType.JSON);
watch.version(getResponse.getVersion());
watch.status().version(getResponse.getVersion());
if (localExecute(request)) {
if (watch.status().state().isActive()) {
triggerService.add(watch);
} else {
triggerService.remove(watch.id());
}
}
listener.onResponse(new ActivateWatchResponse(watch.status()));
} else {
listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId()));
@ -100,4 +114,5 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
return builder;
}
}
}

View File

@ -12,8 +12,10 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -60,9 +62,10 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
public TransportExecuteWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ExecutionService executionService, Clock clock, XPackLicenseState licenseState,
Watch.Parser watchParser, InternalClient client, TriggerService triggerService) {
Watch.Parser watchParser, InternalClient client, TriggerService triggerService,
ClusterService clusterService) {
super(settings, ExecuteWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, ExecuteWatchRequest::new);
licenseState, clusterService, ExecuteWatchRequest::new, ExecuteWatchResponse::new);
this.executionService = executionService;
this.clock = clock;
this.triggerService = triggerService;
@ -71,7 +74,8 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
}
@Override
protected void doExecute(ExecuteWatchRequest request, ActionListener<ExecuteWatchResponse> listener) {
protected void masterOperation(ExecuteWatchRequest request, ClusterState state,
ActionListener<ExecuteWatchResponse> listener) throws Exception {
if (request.getId() != null) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);
@ -135,4 +139,5 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
}
});
}
}

View File

@ -5,11 +5,13 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.get;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.xpack.watcher.watch.Watch;
import java.io.IOException;
@ -59,12 +61,20 @@ public class GetWatchRequest extends MasterNodeReadRequest<GetWatchRequest> {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readLong();
in.readByte();
}
id = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().before(Version.V_6_0_0_alpha1)) {
out.writeLong(1);
out.writeByte(VersionType.INTERNAL.getValue());
}
out.writeString(id);
}

View File

@ -9,8 +9,10 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -39,16 +41,17 @@ public class TransportGetWatchAction extends WatcherTransportAction<GetWatchRequ
@Inject
public TransportGetWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, XPackLicenseState licenseState,
Watch.Parser parser, Clock clock, InternalClient client) {
Watch.Parser parser, Clock clock, InternalClient client, ClusterService clusterService) {
super(settings, GetWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, GetWatchRequest::new);
licenseState, clusterService, GetWatchRequest::new, GetWatchResponse::new);
this.parser = parser;
this.clock = clock;
this.client = client;
}
@Override
protected void doExecute(GetWatchRequest request, ActionListener<GetWatchResponse> listener) {
protected void masterOperation(GetWatchRequest request, ClusterState state,
ActionListener<GetWatchResponse> listener) throws Exception {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);

View File

@ -10,7 +10,9 @@ import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -22,6 +24,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
@ -36,20 +39,24 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
private final Clock clock;
private final Watch.Parser parser;
private final InternalClient client;
private final TriggerService triggerService;
@Inject
public TransportPutWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Clock clock, XPackLicenseState licenseState,
Watch.Parser parser, InternalClient client) {
Watch.Parser parser, InternalClient client, ClusterService clusterService,
TriggerService triggerService) {
super(settings, PutWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, PutWatchRequest::new);
licenseState, clusterService, PutWatchRequest::new, PutWatchResponse::new);
this.clock = clock;
this.parser = parser;
this.client = client;
this.triggerService = triggerService;
}
@Override
protected void doExecute(final PutWatchRequest request, final ActionListener<PutWatchResponse> listener) {
protected void masterOperation(PutWatchRequest request, ClusterState state,
ActionListener<PutWatchResponse> listener) throws Exception {
try {
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getId(), false, request.getSource(), now, request.xContentType());
@ -66,6 +73,9 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
client.index(indexRequest, ActionListener.wrap(indexResponse -> {
boolean created = indexResponse.getResult() == DocWriteResponse.Result.CREATED;
if (localExecute(request) == false && watch.status().state().isActive()) {
triggerService.add(watch);
}
listener.onResponse(new PutWatchResponse(indexResponse.getId(), indexResponse.getVersion(), created));
}, listener::onFailure));
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.watcher.WatcherIndexingListener.Configuration;
import org.elasticsearch.xpack.watcher.WatcherIndexingListener.ShardAllocationConfiguration;
@ -651,6 +652,20 @@ public class WatcherIndexingListenerTests extends ESTestCase {
assertThat(listener.getConfiguration(), is(INACTIVE));
}
public void testThatIndexingListenerIsInactiveWhenWatchExecutionIsNotDistributed() throws Exception {
listener.setConfiguration(INACTIVE);
Version oldVersion = VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, Version.V_6_0_0_alpha2);
DiscoveryNode node = new DiscoveryNode("node_1", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(asList(DiscoveryNode.Role.values())), oldVersion);
ClusterState state = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(node))
.build();
listener.clusterChanged(new ClusterChangedEvent("something", state, state));
assertThat(listener.getConfiguration(), is(INACTIVE));
}
//
// helper methods
//

View File

@ -27,6 +27,7 @@ import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.xpack.watcher.watch.Watch;
@ -40,6 +41,7 @@ import java.util.concurrent.ExecutorService;
import static java.util.Arrays.asList;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
@ -53,7 +55,6 @@ import static org.mockito.Mockito.when;
public class WatcherLifeCycleServiceTests extends ESTestCase {
private ClusterService clusterService;
private WatcherService watcherService;
private WatcherLifeCycleService lifeCycleService;
@ -62,7 +63,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
ThreadPool threadPool = mock(ThreadPool.class);
final ExecutorService executorService = EsExecutors.newDirectExecutorService();
when(threadPool.executor(anyString())).thenReturn(executorService);
clusterService = mock(ClusterService.class);
ClusterService clusterService = mock(ClusterService.class);
Answer<Object> answer = invocationOnMock -> {
AckedClusterStateUpdateTask updateTask = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1];
updateTask.onAllNodesAcked(null);
@ -116,11 +117,10 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
.routingTable(RoutingTable.builder().add(watchRoutingTable).build())
.build();
when(clusterService.state()).thenReturn(clusterState);
when(watcherService.validate(clusterState)).thenReturn(true);
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
lifeCycleService.start();
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, clusterState));
verify(watcherService, times(1)).start(any(ClusterState.class));
verify(watcherService, never()).stop(anyString());
@ -159,11 +159,11 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(clusterService.state()).thenReturn(clusterState);
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
when(watcherService.validate(clusterState)).thenReturn(false);
lifeCycleService.start();
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, clusterState));
verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop(anyString());
}
@ -172,10 +172,9 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(clusterService.state()).thenReturn(clusterState);
when(watcherService.state()).thenReturn(WatcherState.STOPPING);
lifeCycleService.start();
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, clusterState));
verify(watcherService, never()).validate(any(ClusterState.class));
verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop(anyString());
@ -403,8 +402,88 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
verify(watcherService, never()).start(any(ClusterState.class));
}
public void testWatcherPausesOnNonMasterWhenOldNodesHoldWatcherIndex() {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("node_1").localNodeId("node_2")
.add(newNode("node_1"))
.add(newNode("node_2"))
.add(newNode("oldNode", VersionUtils.randomVersionBetween(random(), Version.V_5_5_0, Version.V_6_0_0_alpha2)))
.build();
Index index = new Index(Watch.INDEX, "foo");
ShardId shardId = new ShardId(index, 0);
IndexRoutingTable routingTable = IndexRoutingTable.builder(index)
.addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED))
.addShard(TestShardRouting.newShardRouting(shardId, "oldNode", false, STARTED)).build();
Settings.Builder indexSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6);
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(Watch.INDEX).settings(indexSettings);
ClusterState state = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes)
.routingTable(RoutingTable.builder().add(routingTable).build())
.metaData(MetaData.builder().put(indexMetaDataBuilder))
.build();
WatcherState watcherState = randomFrom(WatcherState.values());
when(watcherService.state()).thenReturn(watcherState);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
if (watcherState == WatcherState.STARTED || watcherState == WatcherState.STARTING) {
verify(watcherService).pauseExecution(any(String.class));
}
}
public void testWatcherStartsOnlyOnMasterWhenOldNodesAreInCluster() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("node_1").localNodeId("node_1")
.add(newNode("node_1"))
.add(newNode("node_2"))
.add(newNode("oldNode", VersionUtils.randomVersionBetween(random(), Version.V_5_5_0, Version.V_6_0_0_alpha2)))
.build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
when(watcherService.validate(eq(state))).thenReturn(true);
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
verify(watcherService).start(any(ClusterState.class));
}
public void testDistributedWatchExecutionDisabledWith5xNodesInCluster() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("node_1").localNodeId("node_1")
.add(newNode("node_1"))
.add(newNode("node_2", VersionUtils.randomVersionBetween(random(), Version.V_5_5_0, Version.V_6_0_0_alpha2)))
.build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
assertThat(WatcherLifeCycleService.isWatchExecutionDistributed(state), is(false));
}
public void testDistributedWatchExecutionEnabled() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("master_node").localNodeId("master_node")
.add(newNode("master_node", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_beta1, Version.CURRENT)))
.add(newNode("data_node_6x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_beta1, Version.CURRENT)))
.build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
assertThat(WatcherLifeCycleService.isWatchExecutionDistributed(state), is(true));
}
private static DiscoveryNode newNode(String nodeName) {
return newNode(nodeName, Version.CURRENT);
}
private static DiscoveryNode newNode(String nodeName, Version version) {
return new DiscoveryNode(nodeName, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);
new HashSet<>(asList(DiscoveryNode.Role.values())), version);
}
}

View File

@ -0,0 +1,172 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.actions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.util.Collections;
import java.util.HashSet;
import static java.util.Arrays.asList;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class WatcherTransportActionTests extends ESTestCase {
private MyTransportAction transportAction;
private ClusterService clusterService;
private TransportService transportService;
@Before
public void createTransportAction() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(any())).thenReturn(EsExecutors.newDirectExecutorService());
clusterService = mock(ClusterService.class);
transportService = mock(TransportService.class);
transportAction = new MyTransportAction(transportService, threadPool, clusterService);
}
public void testThatRequestIsExecutedLocallyWithDistributedExecutionEnabled() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("master_node").localNodeId("data_node")
.add(newNode("master_node", Version.CURRENT))
.add(newNode("data_node", Version.CURRENT))
.build();
Index watchIndex = new Index(Watch.INDEX, "foo");
ShardId shardId = new ShardId(watchIndex, 0);
IndexRoutingTable routingTable = IndexRoutingTable.builder(watchIndex)
.addShard(TestShardRouting.newShardRouting(shardId, "data_node", true, STARTED)).build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes)
.routingTable(RoutingTable.builder().add(routingTable).build())
.build();
when(clusterService.state()).thenReturn(state);
when(clusterService.localNode()).thenReturn(state.nodes().getLocalNode());
MyActionRequest request = new MyActionRequest();
PlainActionFuture<MyActionResponse> future = PlainActionFuture.newFuture();
Task task = request.createTask(1, "type", "action", new TaskId("parent", 0));
transportAction.doExecute(task, request, future);
MyActionResponse response = future.actionGet(1000);
assertThat(response.request, is(request));
}
public void testThatRequestIsExecutedByMasterWithDistributedExecutionDisabled() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("master_node").localNodeId("master_node")
.add(newNode("master_node", VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, Version.V_6_0_0_alpha2)))
.build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
when(clusterService.state()).thenReturn(state);
when(clusterService.localNode()).thenReturn(state.nodes().getLocalNode());
MyActionRequest request = new MyActionRequest();
PlainActionFuture<MyActionResponse> future = PlainActionFuture.newFuture();
Task task = request.createTask(1, "type", "action", new TaskId("parent", 0));
transportAction.doExecute(task, request, future);
MyActionResponse response = future.actionGet(1000);
assertThat(response.request, is(request));
}
public void testThatRequestIsForwardedToMasterWithDistributedExecutionDisabled() throws Exception {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.masterNodeId("master_node").localNodeId("non_master_node")
.add(newNode("master_node", VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, Version.V_6_0_0_alpha2)))
.add(newNode("non_master_node", Version.CURRENT))
.build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
when(clusterService.state()).thenReturn(state);
when(clusterService.localNode()).thenReturn(state.nodes().getLocalNode());
MyActionRequest request = new MyActionRequest();
Task task = request.createTask(1, "type", "action", new TaskId("parent", 0));
transportAction.doExecute(task, request, PlainActionFuture.newFuture());
// dont wait for the future here, we would need to stub the action listener of the sendRequest call
ArgumentCaptor<DiscoveryNode> nodeArgumentCaptor = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(nodeArgumentCaptor.capture(), eq("my_action_name"), eq(request), any());
assertThat(nodeArgumentCaptor.getValue().getId(), is("master_node"));
}
private static DiscoveryNode newNode(String nodeName, Version version) {
return new DiscoveryNode(nodeName, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(asList(DiscoveryNode.Role.values())), version);
}
private final class MyTransportAction extends WatcherTransportAction<MyActionRequest, MyActionResponse> {
MyTransportAction(TransportService transportService, ThreadPool threadPool, ClusterService clusterService) {
super(Settings.EMPTY, "my_action_name", transportService, threadPool, new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver(Settings.EMPTY), new XPackLicenseState(), clusterService, MyActionRequest::new,
MyActionResponse::new);
}
@Override
protected void masterOperation(MyActionRequest request, ClusterState state,
ActionListener<MyActionResponse> listener) throws Exception {
listener.onResponse(new MyActionResponse(request));
}
}
private static final class MyActionResponse extends ActionResponse {
MyActionRequest request;
MyActionResponse(MyActionRequest request) {
super();
this.request = request;
}
MyActionResponse() {}
}
private static final class MyActionRequest extends MasterNodeRequest<MyActionRequest> {
@Override
public ActionRequestValidationException validate() {
return null;
}
}
}

View File

@ -34,6 +34,7 @@ import org.junit.Before;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -162,63 +163,59 @@ public class FullClusterRestartIT extends ESRestTestCase {
logger.info("Done creating watcher-related indices");
} else {
logger.info("testing against {}", oldClusterVersion);
if (oldClusterVersion.before(Version.V_5_6_0)) {
waitForYellow(".watches,bwc_watch_index,.watcher-history*");
waitForYellow(".watches,bwc_watch_index,.watcher-history*");
logger.info("checking that upgrade procedure on the new cluster is required");
Map<String, Object> response = toMap(client().performRequest("GET", "/_xpack/migration/assistance"));
logger.info(response);
logger.info("checking that upgrade procedure on the new cluster is required");
Map<String, Object> response = toMap(client().performRequest("GET", "/_xpack/migration/assistance"));
logger.info(response);
@SuppressWarnings("unchecked") Map<String, Object> indices = (Map<String, Object>) response.get("indices");
assertThat(indices.entrySet(), hasSize(1));
assertThat(indices.get(".watches"), notNullValue());
@SuppressWarnings("unchecked") Map<String, Object> index = (Map<String, Object>) indices.get(".watches");
assertThat(index.get("action_required"), equalTo("upgrade"));
@SuppressWarnings("unchecked") Map<String, Object> indices = (Map<String, Object>) response.get("indices");
assertThat(indices.entrySet(), hasSize(1));
assertThat(indices.get(".watches"), notNullValue());
@SuppressWarnings("unchecked") Map<String, Object> index = (Map<String, Object>) indices.get(".watches");
assertThat(index.get("action_required"), equalTo("upgrade"));
logger.info("starting upgrade procedure on the new cluster");
logger.info("starting upgrade procedure on the new cluster");
Map<String, Object> upgradeResponse = toMap(client().performRequest("POST", "_xpack/migration/upgrade/.watches"));
assertThat(upgradeResponse.get("timed_out"), equalTo(Boolean.FALSE));
// we posted 3 watches, but monitoring can post a few more
assertThat((int)upgradeResponse.get("total"), greaterThanOrEqualTo(3));
Map<String, String> params = Collections.singletonMap("error_trace", "true");
Map<String, Object> upgradeResponse = toMap(client().performRequest("POST", "_xpack/migration/upgrade/.watches", params));
assertThat(upgradeResponse.get("timed_out"), equalTo(Boolean.FALSE));
// we posted 3 watches, but monitoring can post a few more
assertThat((int)upgradeResponse.get("total"), greaterThanOrEqualTo(3));
logger.info("checking that upgrade procedure on the new cluster is required again");
Map<String, Object> responseAfter = toMap(client().performRequest("GET", "/_xpack/migration/assistance"));
@SuppressWarnings("unchecked") Map<String, Object> indicesAfter = (Map<String, Object>) responseAfter.get("indices");
assertThat(indicesAfter.entrySet(), empty());
logger.info("checking that upgrade procedure on the new cluster is required again");
Map<String, Object> responseAfter = toMap(client().performRequest("GET", "/_xpack/migration/assistance"));
@SuppressWarnings("unchecked") Map<String, Object> indicesAfter = (Map<String, Object>) responseAfter.get("indices");
assertThat(indicesAfter.entrySet(), empty());
// Wait for watcher to actually start....
Map<String, Object> startWatchResponse = toMap(client().performRequest("POST", "_xpack/watcher/_start"));
assertThat(startWatchResponse.get("acknowledged"), equalTo(Boolean.TRUE));
// Wait for watcher to actually start....
Map<String, Object> startWatchResponse = toMap(client().performRequest("POST", "_xpack/watcher/_start"));
assertThat(startWatchResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertBusy(() -> {
Map<String, Object> statsWatchResponse = toMap(client().performRequest("GET", "_xpack/watcher/stats"));
@SuppressWarnings("unchecked")
List<Object> states = ((List<Object>) statsWatchResponse.get("stats"))
.stream().map(o -> ((Map<String, Object>) o).get("watcher_state")).collect(Collectors.toList());
assertThat(states, everyItem(is("started")));
});
try {
assertOldTemplatesAreDeleted();
assertWatchIndexContentsWork();
assertBasicWatchInteractions();
} finally {
/* Shut down watcher after every test because watcher can be a bit finicky about shutting down when the node shuts
* down. This makes super sure it shuts down *and* causes the test to fail in a sensible spot if it doesn't shut down.
*/
Map<String, Object> stopWatchResponse = toMap(client().performRequest("POST", "_xpack/watcher/_stop"));
assertThat(stopWatchResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertBusy(() -> {
Map<String, Object> statsWatchResponse = toMap(client().performRequest("GET", "_xpack/watcher/stats"));
Map<String, Object> statsStoppedWatchResponse = toMap(client().performRequest("GET", "_xpack/watcher/stats"));
@SuppressWarnings("unchecked")
List<Object> states = ((List<Object>) statsWatchResponse.get("stats"))
List<Object> states = ((List<Object>) statsStoppedWatchResponse.get("stats"))
.stream().map(o -> ((Map<String, Object>) o).get("watcher_state")).collect(Collectors.toList());
assertThat(states, everyItem(is("started")));
assertThat(states, everyItem(is("stopped")));
});
try {
assertOldTemplatesAreDeleted();
assertWatchIndexContentsWork();
assertBasicWatchInteractions();
} finally {
/* Shut down watcher after every test because watcher can be a bit finicky about shutting down when the node shuts
* down. This makes super sure it shuts down *and* causes the test to fail in a sensible spot if it doesn't shut down.
*/
Map<String, Object> stopWatchResponse = toMap(client().performRequest("POST", "_xpack/watcher/_stop"));
assertThat(stopWatchResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertBusy(() -> {
Map<String, Object> statsStoppedWatchResponse = toMap(client().performRequest("GET", "_xpack/watcher/stats"));
@SuppressWarnings("unchecked")
List<Object> states = ((List<Object>) statsStoppedWatchResponse.get("stats"))
.stream().map(o -> ((Map<String, Object>) o).get("watcher_state")).collect(Collectors.toList());
assertThat(states, everyItem(is("stopped")));
});
}
} else {
// TODO: remove when 5.6 is fixed
logger.info("Skipping 5.6.0 for now");
}
}
}

View File

@ -5,9 +5,7 @@
*/
package org.elasticsearch.upgrades;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
@ -18,11 +16,12 @@ import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.junit.Before;
import java.io.IOException;
@ -35,7 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_TEMPLATE_NAME;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
@ -131,9 +129,6 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
});
}
// we have to have finish the upgrade API first to make this test work, so we can call it instead of
// https://github.com/elastic/x-pack-elasticsearch/issues/1303
@AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/pull/1603")
public void testWatchCrudApis() throws IOException {
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
@ -146,23 +141,41 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
StringEntity entity = new StringEntity(bytesReference.utf8ToString(),
ContentType.APPLICATION_JSON);
executeAgainstAllNodes(client -> {
fakeUpgradeFrom5x(client);
// execute upgrade if new nodes are in the cluster
executeUpgradeIfClusterHasNewNode();
assertOK(client.performRequest("PUT", "/_xpack/watcher/watch/my-watch", Collections.emptyMap(), entity));
assertOK(client.performRequest("GET", "/_xpack/watcher/watch/my-watch"));
assertOK(client.performRequest("POST", "/_xpack/watcher/watch/my-watch/_execute"));
assertOK(client.performRequest("PUT", "/_xpack/watcher/watch/my-watch/_deactivate"));
assertOK(client.performRequest("PUT", "/_xpack/watcher/watch/my-watch/_activate"));
executeAgainstAllNodes(client -> {
Map<String, String> params = Collections.singletonMap("error_trace", "true");
assertOK(client.performRequest("PUT", "/_xpack/watcher/watch/my-watch", params, entity));
assertOK(client.performRequest("GET", "/_xpack/watcher/watch/my-watch", params));
assertOK(client.performRequest("POST", "/_xpack/watcher/watch/my-watch/_execute", params));
assertOK(client.performRequest("PUT", "/_xpack/watcher/watch/my-watch/_deactivate", params));
assertOK(client.performRequest("PUT", "/_xpack/watcher/watch/my-watch/_activate", params));
});
}
public void executeUpgradeIfClusterHasNewNode()
throws IOException {
HttpHost[] newHosts = nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new);
if (newHosts.length > 0) {
try (RestClient client = buildClient(restClientSettings(), newHosts)) {
logger.info("checking that upgrade procedure on the new cluster is required");
Map<String, Object> response = toMap(client().performRequest("GET", "/_xpack/migration/assistance"));
String action = ObjectPath.evaluate(response, "indices.\\.watches.action_required");
if ("upgrade".equals(action)) {
client.performRequest("POST", "_xpack/migration/upgrade/.watches");
}
}
}
}
public void executeAgainstAllNodes(CheckedConsumer<RestClient, IOException> consumer)
throws IOException {
HttpHost[] newHosts = nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new);
HttpHost[] bwcHosts = nodes.getBWCNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new);
logger.info("# of bwc nodes [{}], number of new nodes [{}]", Arrays.asList(bwcHosts), Arrays.asList(newHosts));
logger.info("# of bwc nodes [{}], number of new nodes [{}], master node [{}]", Arrays.asList(bwcHosts), Arrays.asList(newHosts),
nodes.getMaster().getPublishAddress());
assertTrue("No nodes in cluster, cannot run any tests", newHosts.length > 0 || bwcHosts.length > 0);
if (newHosts.length > 0) {
@ -182,22 +195,6 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
}
// This is needed for fake the upgrade from 5.x to 6.0, where a new watches template is created, that contains mapping for the status
// field, as _status will be moved to status
// This can be removed once the upgrade API supports everything
private void fakeUpgradeFrom5x(RestClient client) throws IOException {
BytesReference mappingJson = jsonBuilder().startObject().startObject("properties").startObject("status")
.field("type", "object")
.field("enabled", false)
.field("dynamic", true)
.endObject().endObject().endObject()
.bytes();
HttpEntity data = new ByteArrayEntity(mappingJson.toBytesRef().bytes, ContentType.APPLICATION_JSON);
Response response = client.performRequest("PUT", "/" + Watch.INDEX + "/_mapping/" + Watch.DOC_TYPE, Collections.emptyMap(), data);
assertOK(response);
}
private Nodes buildNodeAndVersions() throws IOException {
Response response = client().performRequest("GET", "_nodes");
ObjectPath objectPath = ObjectPath.createFromResponse(response);
@ -309,4 +306,8 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
'}';
}
}
static Map<String, Object> toMap(Response response) throws IOException {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
}
}