Watcher: Restore old WatcherStatsAction for BWC (elastic/x-pack-elasticsearch#2022)

To achieve backwards compatibility the easiest way is
to restore the old watcher stats, which are supposed to run
on the master node only.

The distributed watcher stats have been moved under the statsdist
package and the action name has been changed as well.

This way there is no need to have a serialization BWC layer,
we can just call different actions.

Note: With the current approach developers still need to change
their java applications if they try to receive watcher stats,
as by default we are now using the distributed stats in the
watcher client.

Original commit: elastic/x-pack-elasticsearch@49b3a45452
This commit is contained in:
Alexander Reelsen 2017-07-19 13:34:14 +02:00 committed by GitHub
parent aeed4cb3e4
commit 5e6c56bfc1
12 changed files with 461 additions and 58 deletions

View File

@ -136,6 +136,8 @@ import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchAction;
import org.elasticsearch.xpack.watcher.transport.actions.put.TransportPutWatchAction;
import org.elasticsearch.xpack.watcher.transport.actions.service.TransportWatcherServiceAction;
import org.elasticsearch.xpack.watcher.transport.actions.service.WatcherServiceAction;
import org.elasticsearch.xpack.watcher.transport.actions.stats.OldTransportWatcherStatsAction;
import org.elasticsearch.xpack.watcher.transport.actions.stats.OldWatcherStatsAction;
import org.elasticsearch.xpack.watcher.transport.actions.stats.TransportWatcherStatsAction;
import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsAction;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
@ -413,6 +415,8 @@ public class Watcher implements ActionPlugin {
new ActionHandler<>(DeleteWatchAction.INSTANCE, TransportDeleteWatchAction.class),
new ActionHandler<>(GetWatchAction.INSTANCE, TransportGetWatchAction.class),
new ActionHandler<>(WatcherStatsAction.INSTANCE, TransportWatcherStatsAction.class),
new ActionHandler<>(OldWatcherStatsAction.INSTANCE,
OldTransportWatcherStatsAction.class),
new ActionHandler<>(AckWatchAction.INSTANCE, TransportAckWatchAction.class),
new ActionHandler<>(ActivateWatchAction.INSTANCE, TransportActivateWatchAction.class),
new ActionHandler<>(WatcherServiceAction.INSTANCE, TransportWatcherServiceAction.class),

View File

@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.actions.stats;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.watcher.WatcherLifeCycleService;
import org.elasticsearch.xpack.watcher.WatcherService;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
/**
* Performs the stats operation required for the rolling upfrade from 5.x
*/
public class OldTransportWatcherStatsAction extends TransportMasterNodeAction<OldWatcherStatsRequest, OldWatcherStatsResponse> {
private final WatcherService watcherService;
private final ExecutionService executionService;
private final XPackLicenseState licenseState;
private final WatcherLifeCycleService lifeCycleService;
private final TriggerService triggerService;
@Inject
public OldTransportWatcherStatsAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, WatcherService watcherService,
ExecutionService executionService, XPackLicenseState licenseState,
WatcherLifeCycleService lifeCycleService, TriggerService triggerService) {
super(settings, OldWatcherStatsAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, OldWatcherStatsRequest::new);
this.watcherService = watcherService;
this.executionService = executionService;
this.licenseState = licenseState;
this.lifeCycleService = lifeCycleService;
this.triggerService = triggerService;
}
@Override
protected String executor() {
// cheap operation, no need to fork into another thread
return ThreadPool.Names.SAME;
}
@Override
protected void doExecute(Task task, OldWatcherStatsRequest request, ActionListener<OldWatcherStatsResponse> listener) {
if (licenseState.isWatcherAllowed()) {
super.doExecute(task, request, listener);
} else {
listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.WATCHER));
}
}
@Override
protected OldWatcherStatsResponse newResponse() {
return new OldWatcherStatsResponse();
}
@Override
protected void masterOperation(OldWatcherStatsRequest request, ClusterState state,
ActionListener<OldWatcherStatsResponse> listener) throws ElasticsearchException {
OldWatcherStatsResponse statsResponse = new OldWatcherStatsResponse();
statsResponse.setWatcherState(watcherService.state());
statsResponse.setThreadPoolQueueSize(executionService.executionThreadPoolQueueSize());
statsResponse.setWatchesCount(triggerService.count());
statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize());
statsResponse.setWatcherMetaData(lifeCycleService.watcherMetaData());
if (request.includeCurrentWatches()) {
statsResponse.setSnapshots(executionService.currentExecutions());
}
if (request.includeQueuedWatches()) {
statsResponse.setQueuedWatches(executionService.queuedWatches());
}
listener.onResponse(statsResponse);
}
@Override
protected ClusterBlockException checkBlock(OldWatcherStatsRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.actions.stats;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
* This exists only for BWC against older 5.x nodes, which do not gather stats in a distributed fashion to support rolling upgrades
*/
public class OldWatcherStatsAction extends Action<OldWatcherStatsRequest, OldWatcherStatsResponse, OldWatcherStatsRequestBuilder> {
public static final OldWatcherStatsAction INSTANCE = new OldWatcherStatsAction();
public static final String NAME = "cluster:monitor/xpack/watcher/stats";
private OldWatcherStatsAction() {
super(NAME);
}
@Override
public OldWatcherStatsResponse newResponse() {
return new OldWatcherStatsResponse();
}
@Override
public OldWatcherStatsRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new OldWatcherStatsRequestBuilder(client);
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.actions.stats;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* The Request to get the watcher stats
*/
public class OldWatcherStatsRequest extends MasterNodeReadRequest<OldWatcherStatsRequest> {
private boolean includeCurrentWatches;
private boolean includeQueuedWatches;
public OldWatcherStatsRequest() {
}
public boolean includeCurrentWatches() {
return includeCurrentWatches;
}
public void includeCurrentWatches(boolean currentWatches) {
this.includeCurrentWatches = currentWatches;
}
public boolean includeQueuedWatches() {
return includeQueuedWatches;
}
public void includeQueuedWatches(boolean includeQueuedWatches) {
this.includeQueuedWatches = includeQueuedWatches;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
includeCurrentWatches = in.readBoolean();
includeQueuedWatches = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(includeCurrentWatches);
out.writeBoolean(includeQueuedWatches);
}
@Override
public String toString() {
return "watcher_stats";
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.actions.stats;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
* Watcher stats request builder.
*/
public class OldWatcherStatsRequestBuilder extends MasterNodeReadOperationRequestBuilder<OldWatcherStatsRequest, OldWatcherStatsResponse,
OldWatcherStatsRequestBuilder> {
public OldWatcherStatsRequestBuilder(ElasticsearchClient client) {
super(client, OldWatcherStatsAction.INSTANCE, new OldWatcherStatsRequest());
}
public OldWatcherStatsRequestBuilder setIncludeCurrentWatches(boolean includeCurrentWatches) {
request().includeCurrentWatches(includeCurrentWatches);
return this;
}
public OldWatcherStatsRequestBuilder setIncludeQueuedWatches(boolean includeQueuedWatches) {
request().includeQueuedWatches(includeQueuedWatches);
return this;
}
}

View File

@ -0,0 +1,195 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.actions.stats;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.watcher.WatcherMetaData;
import org.elasticsearch.xpack.watcher.WatcherState;
import org.elasticsearch.xpack.watcher.execution.QueuedWatch;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionSnapshot;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
public class OldWatcherStatsResponse extends ActionResponse implements ToXContentObject {
private long watchesCount;
private WatcherState watcherState;
private long threadPoolQueueSize;
private long threadPoolMaxSize;
private WatcherMetaData watcherMetaData;
private List<WatchExecutionSnapshot> snapshots;
private List<QueuedWatch> queuedWatches;
OldWatcherStatsResponse() {
}
/**
* @return The current execution thread pool queue size
*/
public long getThreadPoolQueueSize() {
return threadPoolQueueSize;
}
void setThreadPoolQueueSize(long threadPoolQueueSize) {
this.threadPoolQueueSize = threadPoolQueueSize;
}
/**
* @return The max number of threads in the execution thread pool
*/
public long getThreadPoolMaxSize() {
return threadPoolMaxSize;
}
void setThreadPoolMaxSize(long threadPoolMaxSize) {
this.threadPoolMaxSize = threadPoolMaxSize;
}
/**
* @return The number of watches currently registered in the system
*/
public long getWatchesCount() {
return watchesCount;
}
void setWatchesCount(long watchesCount) {
this.watchesCount = watchesCount;
}
/**
* @return The state of the watch service.
*/
public WatcherState getWatcherState() {
return watcherState;
}
void setWatcherState(WatcherState watcherServiceState) {
this.watcherState = watcherServiceState;
}
@Nullable
public List<WatchExecutionSnapshot> getSnapshots() {
return snapshots;
}
void setSnapshots(List<WatchExecutionSnapshot> snapshots) {
this.snapshots = snapshots;
}
@Nullable
public List<QueuedWatch> getQueuedWatches() {
return queuedWatches;
}
public void setQueuedWatches(List<QueuedWatch> queuedWatches) {
this.queuedWatches = queuedWatches;
}
public WatcherMetaData getWatcherMetaData() {
return watcherMetaData;
}
public void setWatcherMetaData(WatcherMetaData watcherMetaData) {
this.watcherMetaData = watcherMetaData;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
watchesCount = in.readLong();
threadPoolQueueSize = in.readLong();
threadPoolMaxSize = in.readLong();
watcherState = WatcherState.fromId(in.readByte());
if (in.readBoolean()) {
int size = in.readVInt();
snapshots = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
WatchExecutionSnapshot snapshot = new WatchExecutionSnapshot();
snapshot.readFrom(in);
snapshots.add(snapshot);
}
}
if (in.readBoolean()) {
int size = in.readVInt();
queuedWatches = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
QueuedWatch queuedWatch = new QueuedWatch();
queuedWatch.readFrom(in);
queuedWatches.add(queuedWatch);
}
}
watcherMetaData = new WatcherMetaData(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(watchesCount);
out.writeLong(threadPoolQueueSize);
out.writeLong(threadPoolMaxSize);
out.writeByte(watcherState.getId());
if (snapshots != null) {
out.writeBoolean(true);
out.writeVInt(snapshots.size());
for (WatchExecutionSnapshot snapshot : snapshots) {
snapshot.writeTo(out);
}
} else {
out.writeBoolean(false);
}
if (queuedWatches != null) {
out.writeBoolean(true);
out.writeVInt(queuedWatches.size());
for (QueuedWatch pending : this.queuedWatches) {
pending.writeTo(out);
}
} else {
out.writeBoolean(false);
}
watcherMetaData.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("watcher_state", watcherState.toString().toLowerCase(Locale.ROOT));
builder.field("watch_count", watchesCount);
builder.startObject("execution_thread_pool");
builder.field("queue_size", threadPoolQueueSize);
builder.field("max_size", threadPoolMaxSize);
builder.endObject();
if (snapshots != null) {
builder.startArray("current_watches");
for (WatchExecutionSnapshot snapshot : snapshots) {
snapshot.toXContent(builder, params);
}
builder.endArray();
}
if (queuedWatches != null) {
builder.startArray("queued_watches");
for (QueuedWatch queuedWatch : queuedWatches) {
queuedWatch.toXContent(builder, params);
}
builder.endArray();
}
watcherMetaData.toXContent(builder, params);
builder.endObject();
return builder;
}
}

View File

@ -14,7 +14,7 @@ import org.elasticsearch.client.ElasticsearchClient;
public class WatcherStatsAction extends Action<WatcherStatsRequest, WatcherStatsResponse, WatcherStatsRequestBuilder> {
public static final WatcherStatsAction INSTANCE = new WatcherStatsAction();
public static final String NAME = "cluster:monitor/xpack/watcher/stats";
public static final String NAME = "cluster:monitor/xpack/watcher/stats/dist";
private WatcherStatsAction() {
super(NAME);

View File

@ -10,7 +10,6 @@ import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.watcher.transport.actions.service.WatcherServiceRequest;
import java.io.IOException;

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.stats;
import org.elasticsearch.Version;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
@ -41,63 +40,14 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
@Override
public void writeTo(StreamOutput out) throws IOException {
// if (out.getVersion().after(Version.V_6_0_0_alpha1_UNRELEASED)) {
super.writeTo(out);
out.writeBoolean(watcherMetaData.manuallyStopped());
/*
} else {
// BWC layer for older versions, this is not considered exact
// this mimics the behaviour of 5.x
out.writeLong(getNodes().stream().mapToLong(Node::getWatchesCount).sum());
out.writeLong(getNodes().stream().mapToLong(Node::getThreadPoolQueueSize).sum());
out.writeLong(getNodes().stream().mapToLong(Node::getThreadPoolMaxSize).sum());
// byte, watcher state, cannot be exact, just pick the first
out.writeByte(getNodes().get(0).getWatcherState().getId());
out.writeString(Version.CURRENT.toString()); // version
out.writeString(XPackBuild.CURRENT.shortHash()); // hash
out.writeString(XPackBuild.CURRENT.shortHash()); // short hash
out.writeString(XPackBuild.CURRENT.date()); // date
List<WatchExecutionSnapshot> snapshots = getNodes().stream().map(Node::getSnapshots)
.flatMap(List::stream)
.collect(Collectors.toList());
if (snapshots != null) {
out.writeBoolean(true);
out.writeVInt(snapshots.size());
for (WatchExecutionSnapshot snapshot : snapshots) {
snapshot.writeTo(out);
}
} else {
out.writeBoolean(false);
}
List<QueuedWatch> queuedWatches = getNodes().stream().map(Node::getQueuedWatches)
.flatMap(List::stream)
.collect(Collectors.toList());
if (queuedWatches != null) {
out.writeBoolean(true);
out.writeVInt(queuedWatches.size());
for (QueuedWatch pending : queuedWatches) {
pending.writeTo(out);
}
} else {
out.writeBoolean(false);
}
watcherMetaData.writeTo(out);
}
*/
super.writeTo(out);
out.writeBoolean(watcherMetaData.manuallyStopped());
}
@Override
public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.readFrom(in);
watcherMetaData = new WatcherMetaData(in.readBoolean());
} else {
// TODO what to do here? create another BWC helping stuff here...
}
super.readFrom(in);
watcherMetaData = new WatcherMetaData(in.readBoolean());
}
@Override

View File

@ -43,7 +43,8 @@ public class BasicSecurityTests extends AbstractWatcherIntegrationTestCase {
String basicAuth = basicAuthHeaderValue("transport_client", SecuritySettingsSource.TEST_PASSWORD_SECURE_STRING);
WatcherClient watcherClient = watcherClient().filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuth));
Exception e = expectThrows(Exception.class, () -> watcherClient.prepareWatcherStats().get());
assertThat(e.getMessage(), equalTo("action [cluster:monitor/xpack/watcher/stats] is unauthorized for user [transport_client]"));
assertThat(e.getMessage(),
equalTo("action [cluster:monitor/xpack/watcher/stats/dist] is unauthorized for user [transport_client]"));
}
public void testWatcherMonitorRole() throws Exception {
@ -54,7 +55,7 @@ public class BasicSecurityTests extends AbstractWatcherIntegrationTestCase {
.get();
fail("authentication failure should have occurred");
} catch (Exception e) {
assertThat(e.getMessage(), equalTo("action [cluster:monitor/xpack/watcher/stats] is unauthorized for user [test]"));
assertThat(e.getMessage(), equalTo("action [cluster:monitor/xpack/watcher/stats/dist] is unauthorized for user [test]"));
}
try {

View File

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.action.stats;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.stats.OldWatcherStatsAction;
import org.elasticsearch.xpack.watcher.transport.actions.stats.OldWatcherStatsRequest;
import org.elasticsearch.xpack.watcher.transport.actions.stats.OldWatcherStatsResponse;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
public class OldWatcherStatsTests extends AbstractWatcherIntegrationTestCase {
// these stats are not exposed in the watcherclient as they are only needed for the rolling upgrade
public void testPre6xWatcherStats() throws Exception {
OldWatcherStatsResponse response = client().execute(OldWatcherStatsAction.INSTANCE, new OldWatcherStatsRequest()).actionGet();
assertThat(response.getThreadPoolMaxSize(), is(greaterThanOrEqualTo(0L)));
assertThat(response.getThreadPoolQueueSize(), is(greaterThanOrEqualTo(0L)));
assertThat(response.getWatchesCount(), is(greaterThanOrEqualTo(0L)));
assertThat(response.getWatcherMetaData().manuallyStopped(), is(false));
}
}

View File

@ -106,6 +106,7 @@ cluster:admin/xpack/watcher/watch/ack
cluster:admin/xpack/watcher/watch/activate
cluster:monitor/xpack/watcher/watch/get
cluster:monitor/xpack/watcher/stats
cluster:monitor/xpack/watcher/stats/dist
internal:indices/admin/upgrade
cluster:admin/ingest/pipeline/delete
cluster:admin/ingest/pipeline/get