Make Persistent Tasks implementations version and feature aware (#31045)
With #31020 we introduced the ability for transport clients to indicate what features they support in order to make sure we don't serialize object to them they don't support. This PR adapts the serialization logic of persistent tasks to be aware of those features and not serialize tasks that aren't supported. Also, a version check is added for the future where we may add new tasks implementations and need to be able to indicate they shouldn't be serialized both to nodes and clients. As the implementation relies on the interface of `PersistentTaskParams`, these are no longer optional. That's acceptable as all current implementation have them and we plan to make `PersistentTaskParams` more central in the future. Relates to #30731
This commit is contained in:
parent
65d3f0efca
commit
a7ceefe93f
|
@ -22,7 +22,6 @@ package org.elasticsearch.cluster;
|
|||
import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
|
@ -50,6 +49,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
|||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -122,7 +122,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
* @param <T> the type of the custom
|
||||
* @return true if the custom should be serialized and false otherwise
|
||||
*/
|
||||
static <T extends NamedDiffable & FeatureAware> boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
|
||||
static <T extends VersionedNamedWriteable & FeatureAware> boolean shouldSerialize(final StreamOutput out, final T custom) {
|
||||
if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
|
||||
return false;
|
||||
}
|
||||
|
@ -748,13 +748,13 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
// filter out custom states not supported by the other node
|
||||
int numberOfCustoms = 0;
|
||||
for (final ObjectCursor<Custom> cursor : customs.values()) {
|
||||
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
|
||||
if (FeatureAware.shouldSerialize(out, cursor.value)) {
|
||||
numberOfCustoms++;
|
||||
}
|
||||
}
|
||||
out.writeVInt(numberOfCustoms);
|
||||
for (final ObjectCursor<Custom> cursor : customs.values()) {
|
||||
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
|
||||
if (FeatureAware.shouldSerialize(out, cursor.value)) {
|
||||
out.writeNamedWriteable(cursor.value);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,17 +19,10 @@
|
|||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
|
||||
|
||||
/**
|
||||
* Diff that also support NamedWriteable interface
|
||||
* Diff that also support {@link VersionedNamedWriteable} interface
|
||||
*/
|
||||
public interface NamedDiffable<T> extends Diffable<T>, NamedWriteable {
|
||||
/**
|
||||
* The minimal version of the recipient this custom object can be sent to
|
||||
*/
|
||||
default Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumIndexCompatibilityVersion();
|
||||
}
|
||||
public interface NamedDiffable<T> extends Diffable<T>, VersionedNamedWriteable {
|
||||
}
|
||||
|
|
|
@ -20,14 +20,15 @@
|
|||
package org.elasticsearch.cluster;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState.Custom;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -382,6 +383,11 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
|
||||
public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
|
||||
return readDiffFrom(Custom.class, TYPE, in);
|
||||
}
|
||||
|
|
|
@ -395,6 +395,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
|
||||
public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
|
||||
return readDiffFrom(Custom.class, TYPE, in);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -34,8 +35,6 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -44,7 +43,6 @@ import java.util.Collections;
|
|||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* A collection of tombstones for explicitly marking indices as deleted in the cluster state.
|
||||
|
@ -97,6 +95,11 @@ public final class IndexGraveyard implements MetaData.Custom {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return MetaData.API_AND_GATEWAY;
|
||||
|
|
|
@ -786,13 +786,13 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
// filter out custom states not supported by the other node
|
||||
int numberOfCustoms = 0;
|
||||
for (final ObjectCursor<Custom> cursor : customs.values()) {
|
||||
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
|
||||
if (FeatureAware.shouldSerialize(out, cursor.value)) {
|
||||
numberOfCustoms++;
|
||||
}
|
||||
}
|
||||
out.writeVInt(numberOfCustoms);
|
||||
for (final ObjectCursor<Custom> cursor : customs.values()) {
|
||||
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
|
||||
if (FeatureAware.shouldSerialize(out, cursor.value)) {
|
||||
out.writeNamedWriteable(cursor.value);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.AbstractNamedDiffable;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.cluster.metadata.MetaData.Custom;
|
||||
|
@ -103,6 +104,11 @@ public class RepositoriesMetaData extends AbstractNamedDiffable<Custom> implemen
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
|
||||
public RepositoriesMetaData(StreamInput in) throws IOException {
|
||||
RepositoryMetaData[] repository = new RepositoryMetaData[in.readVInt()];
|
||||
for (int i = 0; i < repository.length; i++) {
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.io.stream;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
|
||||
/**
|
||||
* A {@link NamedWriteable} that has a minimum version associated with it.
|
||||
*/
|
||||
public interface VersionedNamedWriteable extends NamedWriteable {
|
||||
|
||||
/**
|
||||
* Returns the name of the writeable object
|
||||
*/
|
||||
String getWriteableName();
|
||||
|
||||
/**
|
||||
* The minimal version of the recipient this object can be sent to
|
||||
*/
|
||||
Version getMinimalSupportedVersion();
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.DiffableUtils;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
|
@ -69,6 +70,11 @@ public final class IngestMetadata implements MetaData.Custom {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
|
||||
public Map<String, PipelineConfiguration> getPipelines() {
|
||||
return pipelines;
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ public class NodePersistentTasksExecutor {
|
|||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
public <Params extends PersistentTaskParams> void executeTask(@Nullable Params params,
|
||||
public <Params extends PersistentTaskParams> void executeTask(Params params,
|
||||
@Nullable Task.Status status,
|
||||
AllocatedPersistentTask task,
|
||||
PersistentTasksExecutor<Params> executor) {
|
||||
|
|
|
@ -19,12 +19,13 @@
|
|||
|
||||
package org.elasticsearch.persistent;
|
||||
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
|
||||
/**
|
||||
* Parameters used to start persistent task
|
||||
*/
|
||||
public interface PersistentTaskParams extends NamedWriteable, ToXContentObject {
|
||||
public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject, ClusterState.FeatureAware {
|
||||
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
|
@ -65,7 +64,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
* @param taskParams the task's parameters
|
||||
* @param listener the listener that will be called when task is started
|
||||
*/
|
||||
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String taskName, @Nullable Params taskParams,
|
||||
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String taskName, Params taskParams,
|
||||
ActionListener<PersistentTask<?>> listener) {
|
||||
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
|
@ -225,7 +224,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
* @return a new {@link Assignment}
|
||||
*/
|
||||
private <Params extends PersistentTaskParams> Assignment createAssignment(final String taskName,
|
||||
final @Nullable Params taskParams,
|
||||
final Params taskParams,
|
||||
final ClusterState currentState) {
|
||||
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
|
||||
|
||||
|
|
|
@ -49,8 +49,8 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -264,7 +264,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
private final String id;
|
||||
private final long allocationId;
|
||||
private final String taskName;
|
||||
@Nullable
|
||||
private final P params;
|
||||
@Nullable
|
||||
private final Status status;
|
||||
|
@ -314,7 +313,11 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
id = in.readString();
|
||||
allocationId = in.readLong();
|
||||
taskName = in.readString();
|
||||
params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class);
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
params = (P) in.readNamedWriteable(PersistentTaskParams.class);
|
||||
} else {
|
||||
params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class);
|
||||
}
|
||||
status = in.readOptionalNamedWriteable(Task.Status.class);
|
||||
assignment = new Assignment(in.readOptionalString(), in.readString());
|
||||
allocationIdOnLastStatusUpdate = in.readOptionalLong();
|
||||
|
@ -325,7 +328,11 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
out.writeString(id);
|
||||
out.writeLong(allocationId);
|
||||
out.writeString(taskName);
|
||||
out.writeOptionalNamedWriteable(params);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
out.writeNamedWriteable(params);
|
||||
} else {
|
||||
out.writeOptionalNamedWriteable(params);
|
||||
}
|
||||
out.writeOptionalNamedWriteable(status);
|
||||
out.writeOptionalString(assignment.executorNode);
|
||||
out.writeString(assignment.explanation);
|
||||
|
@ -500,7 +507,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeLong(lastAllocationId);
|
||||
out.writeMap(tasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream));
|
||||
Map<String, PersistentTask<?>> filteredTasks = tasks.values().stream()
|
||||
.filter(t -> ClusterState.FeatureAware.shouldSerialize(out, t.getParams()))
|
||||
.collect(Collectors.toMap(PersistentTask::getId, Function.identity()));
|
||||
out.writeMap(filteredTasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream));
|
||||
}
|
||||
|
||||
public static NamedDiff<MetaData.Custom> readDiffFrom(StreamInput in) throws IOException {
|
||||
|
|
|
@ -24,10 +24,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.function.Predicate;
|
||||
|
@ -118,7 +118,7 @@ public abstract class PersistentTasksExecutor<Params extends PersistentTaskParam
|
|||
* NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
|
||||
* indicate that the persistent task has finished.
|
||||
*/
|
||||
protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params, @Nullable Task.Status status);
|
||||
protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable Task.Status status);
|
||||
|
||||
public String getExecutor() {
|
||||
return executor;
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.persistent;
|
|||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||
|
@ -69,7 +68,7 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
*/
|
||||
public <Params extends PersistentTaskParams> void sendStartRequest(final String taskId,
|
||||
final String taskName,
|
||||
final @Nullable Params taskParams,
|
||||
final Params taskParams,
|
||||
final ActionListener<PersistentTask<Params>> listener) {
|
||||
@SuppressWarnings("unchecked")
|
||||
final ActionListener<PersistentTask<?>> wrappedListener =
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.persistent;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
|
@ -36,9 +37,9 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -66,7 +67,6 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
|
|||
|
||||
private String taskId;
|
||||
|
||||
@Nullable
|
||||
private String taskName;
|
||||
|
||||
private PersistentTaskParams params;
|
||||
|
@ -86,7 +86,11 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
|
|||
super.readFrom(in);
|
||||
taskId = in.readString();
|
||||
taskName = in.readString();
|
||||
params = in.readOptionalNamedWriteable(PersistentTaskParams.class);
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
params = in.readNamedWriteable(PersistentTaskParams.class);
|
||||
} else {
|
||||
params = in.readOptionalNamedWriteable(PersistentTaskParams.class);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,7 +98,11 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
|
|||
super.writeTo(out);
|
||||
out.writeString(taskId);
|
||||
out.writeString(taskName);
|
||||
out.writeOptionalNamedWriteable(params);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
out.writeNamedWriteable(params);
|
||||
} else {
|
||||
out.writeOptionalNamedWriteable(params);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -383,6 +383,11 @@ public final class ScriptMetaData implements MetaData.Custom, Writeable, ToXCont
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return MetaData.ALL_CONTEXTS;
|
||||
|
|
|
@ -308,6 +308,11 @@ public class ClusterChangedEventTests extends ESTestCase {
|
|||
return "2";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return EnumSet.of(MetaData.XContentContext.GATEWAY);
|
||||
|
@ -324,6 +329,11 @@ public class ClusterChangedEventTests extends ESTestCase {
|
|||
return "1";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return EnumSet.of(MetaData.XContentContext.GATEWAY);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexGraveyard;
|
||||
|
@ -73,7 +74,8 @@ import static org.hamcrest.Matchers.instanceOf;
|
|||
@ESIntegTestCase.ClusterScope(scope = TEST)
|
||||
public class ClusterStateIT extends ESIntegTestCase {
|
||||
|
||||
public abstract static class Custom implements MetaData.Custom {
|
||||
public abstract static
|
||||
class Custom implements MetaData.Custom {
|
||||
|
||||
private static final ParseField VALUE = new ParseField("value");
|
||||
|
||||
|
@ -131,6 +133,11 @@ public class ClusterStateIT extends ESIntegTestCase {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> getRequiredFeature() {
|
||||
return Optional.of("node");
|
||||
|
@ -155,6 +162,11 @@ public class ClusterStateIT extends ESIntegTestCase {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
/*
|
||||
* This custom should always be returned yet we randomize whether it has a required feature that the client is expected to have
|
||||
* versus not requiring any feature. We use a field to make the random choice exactly once.
|
||||
|
|
|
@ -116,7 +116,7 @@ public class FeatureAwareTests extends ESTestCase {
|
|||
if (custom.getRequiredFeature().isPresent()) {
|
||||
out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
|
||||
}
|
||||
assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
|
||||
assertTrue(FeatureAware.shouldSerialize(out, custom));
|
||||
}
|
||||
{
|
||||
final BytesStreamOutput out = new BytesStreamOutput();
|
||||
|
@ -126,7 +126,7 @@ public class FeatureAwareTests extends ESTestCase {
|
|||
if (custom.getRequiredFeature().isPresent() && randomBoolean()) {
|
||||
out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
|
||||
}
|
||||
assertFalse(FeatureAware.shouldSerializeCustom(out, custom));
|
||||
assertFalse(FeatureAware.shouldSerialize(out, custom));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ public class FeatureAwareTests extends ESTestCase {
|
|||
out.setVersion(afterVersion);
|
||||
assertTrue(custom.getRequiredFeature().isPresent());
|
||||
out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
|
||||
assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
|
||||
assertTrue(FeatureAware.shouldSerialize(out, custom));
|
||||
}
|
||||
{
|
||||
// the feature is present and the client is a transport client
|
||||
|
@ -149,7 +149,7 @@ public class FeatureAwareTests extends ESTestCase {
|
|||
out.setVersion(afterVersion);
|
||||
assertTrue(custom.getRequiredFeature().isPresent());
|
||||
out.setFeatures(new HashSet<>(Arrays.asList(custom.getRequiredFeature().get(), TransportClient.TRANSPORT_CLIENT_FEATURE)));
|
||||
assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
|
||||
assertTrue(FeatureAware.shouldSerialize(out, custom));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -161,14 +161,14 @@ public class FeatureAwareTests extends ESTestCase {
|
|||
// the feature is missing but we should serialize it anyway because the client is not a transport client
|
||||
final BytesStreamOutput out = new BytesStreamOutput();
|
||||
out.setVersion(afterVersion);
|
||||
assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
|
||||
assertTrue(FeatureAware.shouldSerialize(out, custom));
|
||||
}
|
||||
{
|
||||
// the feature is missing and we should not serialize it because the client is a transport client
|
||||
final BytesStreamOutput out = new BytesStreamOutput();
|
||||
out.setVersion(afterVersion);
|
||||
out.setFeatures(Collections.singleton(TransportClient.TRANSPORT_CLIENT_FEATURE));
|
||||
assertFalse(FeatureAware.shouldSerializeCustom(out, custom));
|
||||
assertFalse(FeatureAware.shouldSerialize(out, custom));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
|
@ -37,7 +38,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
|
@ -304,6 +304,11 @@ public class SimpleClusterStateIT extends ESIntegTestCase {
|
|||
return "test";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeInt(value);
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.elasticsearch.snapshots.SnapshotId;
|
|||
import org.elasticsearch.test.VersionUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
|
@ -133,7 +132,7 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
|
|||
|
||||
// serialize with current version
|
||||
BytesStreamOutput outStream = new BytesStreamOutput();
|
||||
Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumIndexCompatibilityVersion(), Version.CURRENT);
|
||||
Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
|
||||
outStream.setVersion(version);
|
||||
diffs.writeTo(outStream);
|
||||
StreamInput inStream = outStream.bytes().streamInput();
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.cluster.service;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -43,6 +44,11 @@ public class ClusterSerivceTests extends ESTestCase {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
|
||||
|
|
|
@ -239,6 +239,11 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return EnumSet.of(MetaData.XContentContext.GATEWAY, MetaData.XContentContext.SNAPSHOT);
|
||||
|
|
|
@ -492,6 +492,11 @@ public class GatewayMetaStateTests extends ESAllocationTestCase {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return EnumSet.of(MetaData.XContentContext.GATEWAY);
|
||||
|
@ -510,6 +515,11 @@ public class GatewayMetaStateTests extends ESAllocationTestCase {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return EnumSet.of(MetaData.XContentContext.GATEWAY);
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.elasticsearch.persistent;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
|
@ -26,8 +28,11 @@ import org.elasticsearch.cluster.metadata.MetaData.Custom;
|
|||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
|
@ -43,13 +48,22 @@ import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTask
|
|||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.AbstractDiffableSerializationTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY;
|
||||
import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT;
|
||||
import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND;
|
||||
import static org.elasticsearch.test.VersionUtils.compatibleFutureVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.getFirstVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.getPreviousVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializationTestCase<Custom> {
|
||||
|
||||
|
@ -228,7 +242,63 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
|
|||
assertEquals(changed, builder.isChanged());
|
||||
persistentTasks = builder.build();
|
||||
}
|
||||
}
|
||||
|
||||
public void testMinVersionSerialization() throws IOException {
|
||||
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
|
||||
|
||||
Version minVersion = getFirstVersion();
|
||||
final Version streamVersion = randomVersionBetween(random(), minVersion, getPreviousVersion(Version.CURRENT));
|
||||
tasks.addTask("test_compatible_version", TestPersistentTasksExecutor.NAME,
|
||||
new TestParams(null, randomVersionBetween(random(), minVersion, streamVersion),
|
||||
randomBoolean() ? Optional.empty() : Optional.of("test")),
|
||||
randomAssignment());
|
||||
tasks.addTask("test_incompatible_version", TestPersistentTasksExecutor.NAME,
|
||||
new TestParams(null, randomVersionBetween(random(), compatibleFutureVersion(streamVersion), Version.CURRENT),
|
||||
randomBoolean() ? Optional.empty() : Optional.of("test")),
|
||||
randomAssignment());
|
||||
final BytesStreamOutput out = new BytesStreamOutput();
|
||||
out.setVersion(streamVersion);
|
||||
Set<String> features = new HashSet<>();
|
||||
if (randomBoolean()) {
|
||||
features.add("test");
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
features.add(TransportClient.TRANSPORT_CLIENT_FEATURE);
|
||||
}
|
||||
out.setFeatures(features);
|
||||
tasks.build().writeTo(out);
|
||||
|
||||
final StreamInput input = out.bytes().streamInput();
|
||||
input.setVersion(streamVersion);
|
||||
PersistentTasksCustomMetaData read =
|
||||
new PersistentTasksCustomMetaData(new NamedWriteableAwareStreamInput(input, getNamedWriteableRegistry()));
|
||||
|
||||
assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible_version")));
|
||||
}
|
||||
|
||||
public void testFeatureSerialization() throws IOException {
|
||||
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
|
||||
|
||||
Version minVersion = getFirstVersion();
|
||||
tasks.addTask("test_compatible", TestPersistentTasksExecutor.NAME,
|
||||
new TestParams(null, randomVersionBetween(random(), minVersion, Version.CURRENT),
|
||||
randomBoolean() ? Optional.empty() : Optional.of("existing")),
|
||||
randomAssignment());
|
||||
tasks.addTask("test_incompatible", TestPersistentTasksExecutor.NAME,
|
||||
new TestParams(null, randomVersionBetween(random(), minVersion, Version.CURRENT), Optional.of("non_existing")),
|
||||
randomAssignment());
|
||||
final BytesStreamOutput out = new BytesStreamOutput();
|
||||
out.setVersion(Version.CURRENT);
|
||||
Set<String> features = new HashSet<>();
|
||||
features.add("existing");
|
||||
features.add(TransportClient.TRANSPORT_CLIENT_FEATURE);
|
||||
out.setFeatures(features);
|
||||
tasks.build().writeTo(out);
|
||||
|
||||
PersistentTasksCustomMetaData read = new PersistentTasksCustomMetaData(
|
||||
new NamedWriteableAwareStreamInput(out.bytes().streamInput(), getNamedWriteableRegistry()));
|
||||
assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible")));
|
||||
}
|
||||
|
||||
private Assignment randomAssignment() {
|
||||
|
|
|
@ -20,12 +20,12 @@ package org.elasticsearch.persistent;
|
|||
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -35,8 +35,6 @@ import java.util.List;
|
|||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 1)
|
||||
public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
|
||||
|
@ -65,7 +63,7 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
|
|||
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
|
||||
futures.add(future);
|
||||
taskIds[i] = UUIDs.base64UUID();
|
||||
service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"), future);
|
||||
service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
|
||||
}
|
||||
|
||||
for (int i = 0; i < numberOfTasks; i++) {
|
||||
|
|
|
@ -22,8 +22,8 @@ import org.elasticsearch.common.UUIDs;
|
|||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
|
||||
import org.elasticsearch.persistent.StartPersistentTaskAction.Request;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
|
||||
import java.util.Collections;
|
||||
|
@ -32,17 +32,12 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas
|
|||
|
||||
@Override
|
||||
protected Request createTestInstance() {
|
||||
TestParams testParams;
|
||||
TestParams testParams = new TestParams();
|
||||
if (randomBoolean()) {
|
||||
testParams = new TestParams();
|
||||
if (randomBoolean()) {
|
||||
testParams.setTestParam(randomAlphaOfLengthBetween(1, 20));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
testParams.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20));
|
||||
}
|
||||
} else {
|
||||
testParams = null;
|
||||
testParams.setTestParam(randomAlphaOfLengthBetween(1, 20));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
testParams.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20));
|
||||
}
|
||||
return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), testParams);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.persistent;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
|
@ -49,6 +50,8 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.PersistentTaskPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -57,8 +60,6 @@ import org.elasticsearch.tasks.TaskCancelledException;
|
|||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -67,6 +68,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -120,6 +122,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
REQUEST_PARSER.declareString(constructorArg(), new ParseField("param"));
|
||||
}
|
||||
|
||||
private final Version minVersion;
|
||||
private final Optional<String> feature;
|
||||
|
||||
private String executorNodeAttr = null;
|
||||
|
||||
private String responseNode = null;
|
||||
|
@ -127,17 +132,25 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
private String testParam = null;
|
||||
|
||||
public TestParams() {
|
||||
|
||||
this((String)null);
|
||||
}
|
||||
|
||||
public TestParams(String testParam) {
|
||||
this(testParam, Version.CURRENT, Optional.empty());
|
||||
}
|
||||
|
||||
public TestParams(String testParam, Version minVersion, Optional<String> feature) {
|
||||
this.testParam = testParam;
|
||||
this.minVersion = minVersion;
|
||||
this.feature = feature;
|
||||
}
|
||||
|
||||
public TestParams(StreamInput in) throws IOException {
|
||||
executorNodeAttr = in.readOptionalString();
|
||||
responseNode = in.readOptionalString();
|
||||
testParam = in.readOptionalString();
|
||||
minVersion = Version.readVersion(in);
|
||||
feature = Optional.ofNullable(in.readOptionalString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -166,6 +179,8 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
out.writeOptionalString(executorNodeAttr);
|
||||
out.writeOptionalString(responseNode);
|
||||
out.writeOptionalString(testParam);
|
||||
Version.writeVersion(minVersion, out);
|
||||
out.writeOptionalString(feature.orElse(null));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -194,6 +209,16 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
public int hashCode() {
|
||||
return Objects.hash(executorNodeAttr, responseNode, testParam);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return minVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> getRequiredFeature() {
|
||||
return feature;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Status implements Task.Status {
|
||||
|
|
|
@ -71,7 +71,7 @@ public class EnableAssignmentDeciderIT extends ESIntegTestCase {
|
|||
final CountDownLatch latch = new CountDownLatch(numberOfTasks);
|
||||
for (int i = 0; i < numberOfTasks; i++) {
|
||||
PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class);
|
||||
service.sendStartRequest("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(),
|
||||
service.sendStartRequest("task_" + i, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)),
|
||||
new ActionListener<PersistentTask<PersistentTaskParams>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<PersistentTaskParams> task) {
|
||||
|
@ -163,11 +163,4 @@ public class EnableAssignmentDeciderIT extends ESIntegTestCase {
|
|||
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
|
||||
}
|
||||
|
||||
/** Returns a random task parameter **/
|
||||
private static PersistentTaskParams randomTaskParams() {
|
||||
if (randomBoolean()) {
|
||||
return null;
|
||||
}
|
||||
return new TestParams(randomAlphaOfLength(10));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,9 +21,9 @@ package org.elasticsearch.snapshots;
|
|||
|
||||
import com.carrotsearch.hppc.IntHashSet;
|
||||
import com.carrotsearch.hppc.IntSet;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
|
||||
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
|
||||
|
@ -1162,6 +1162,11 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
public static SnapshottableMetadata readFrom(StreamInput in) throws IOException {
|
||||
return readFrom(SnapshottableMetadata::new, in);
|
||||
}
|
||||
|
@ -1193,6 +1198,11 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
public static NonSnapshottableMetadata readFrom(StreamInput in) throws IOException {
|
||||
return readFrom(NonSnapshottableMetadata::new, in);
|
||||
}
|
||||
|
@ -1223,6 +1233,11 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
public static SnapshottableGatewayMetadata readFrom(StreamInput in) throws IOException {
|
||||
return readFrom(SnapshottableGatewayMetadata::new, in);
|
||||
}
|
||||
|
@ -1253,6 +1268,11 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
public static NonSnapshottableGatewayMetadata readFrom(StreamInput in) throws IOException {
|
||||
return readFrom(NonSnapshottableGatewayMetadata::new, in);
|
||||
}
|
||||
|
@ -1284,6 +1304,11 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT;
|
||||
}
|
||||
|
||||
public static SnapshotableGatewayNoApiMetadata readFrom(StreamInput in) throws IOException {
|
||||
return readFrom(SnapshotableGatewayNoApiMetadata::new, in);
|
||||
}
|
||||
|
|
|
@ -19,22 +19,19 @@
|
|||
|
||||
package org.elasticsearch.test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/** Utilities for selecting versions in tests */
|
||||
public class VersionUtils {
|
||||
|
||||
|
@ -228,6 +225,13 @@ public class VersionUtils {
|
|||
return opt.get();
|
||||
}
|
||||
|
||||
/** returns the first future compatible version */
|
||||
public static Version compatibleFutureVersion(Version version) {
|
||||
final Optional<Version> opt = ALL_VERSIONS.stream().filter(version::before).filter(v -> v.isCompatible(version)).findAny();
|
||||
assert opt.isPresent() : "no future compatible version for " + version;
|
||||
return opt.get();
|
||||
}
|
||||
|
||||
/** Returns the maximum {@link Version} that is compatible with the given version. */
|
||||
public static Version maxCompatibleVersion(Version version) {
|
||||
final List<Version> compatible = ALL_VERSIONS.stream().filter(version::isCompatible).filter(version::onOrBefore)
|
||||
|
|
|
@ -108,6 +108,11 @@ public class LicensesMetaData extends AbstractNamedDiffable<MetaData.Custom> imp
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return EnumSet.of(MetaData.XContentContext.GATEWAY);
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.license.LicenseService;
|
|||
import org.elasticsearch.license.LicensesMetaData;
|
||||
import org.elasticsearch.license.Licensing;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.persistent.PersistentTaskParams;
|
||||
import org.elasticsearch.plugins.ExtensiblePlugin;
|
||||
import org.elasticsearch.plugins.ScriptPlugin;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
|
@ -331,4 +332,12 @@ public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, Exte
|
|||
|
||||
}
|
||||
|
||||
public interface XPackPersistentTaskParams extends PersistentTaskParams {
|
||||
|
||||
@Override
|
||||
default Optional<String> getRequiredFeature() {
|
||||
return XPackClientPlugin.X_PACK_FEATURE;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,10 +23,10 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.persistent.PersistentTaskParams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -127,7 +127,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
}
|
||||
}
|
||||
|
||||
public static class JobParams implements PersistentTaskParams {
|
||||
public static class JobParams implements XPackPlugin.XPackPersistentTaskParams {
|
||||
|
||||
/** TODO Remove in 7.0.0 */
|
||||
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
|
||||
|
@ -237,6 +237,11 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
}
|
||||
|
||||
public static class Response extends AcknowledgedResponse {
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.xpack.core.ml.action;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
|
@ -24,10 +25,10 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.persistent.PersistentTaskParams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -138,7 +139,7 @@ public class StartDatafeedAction extends Action<StartDatafeedAction.Request, Sta
|
|||
}
|
||||
}
|
||||
|
||||
public static class DatafeedParams implements PersistentTaskParams {
|
||||
public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams {
|
||||
|
||||
public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(TASK_NAME, DatafeedParams::new);
|
||||
|
||||
|
@ -231,6 +232,11 @@ public class StartDatafeedAction extends Action<StartDatafeedAction.Request, Sta
|
|||
return TASK_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(datafeedId);
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.rollup.job;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.AbstractDiffable;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -13,7 +14,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.persistent.PersistentTaskParams;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
@ -25,7 +26,7 @@ import java.util.Objects;
|
|||
* It holds the config (RollupJobConfig) and a map of authentication headers. Only RollupJobConfig
|
||||
* is ever serialized to the user, so the headers should never leak
|
||||
*/
|
||||
public class RollupJob extends AbstractDiffable<RollupJob> implements PersistentTaskParams {
|
||||
public class RollupJob extends AbstractDiffable<RollupJob> implements XPackPlugin.XPackPersistentTaskParams {
|
||||
|
||||
public static final String NAME = "xpack/rollup/job";
|
||||
|
||||
|
@ -110,4 +111,9 @@ public class RollupJob extends AbstractDiffable<RollupJob> implements Persistent
|
|||
public int hashCode() {
|
||||
return Objects.hash(config, headers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.V_6_3_0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.watcher;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.AbstractNamedDiffable;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
|
@ -38,6 +39,11 @@ public class WatcherMetaData extends AbstractNamedDiffable<MetaData.Custom> impl
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return EnumSet.of(MetaData.XContentContext.GATEWAY);
|
||||
|
|
Loading…
Reference in New Issue