diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java
index 39e2ac1fbc0..da8235b6c21 100644
--- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java
+++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java
@@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
@@ -33,18 +34,18 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
-import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
-import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
+import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
+import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.Transform;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
-import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
-import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
+import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
+import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil;
import java.util.ArrayList;
@@ -99,8 +100,18 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor
- node.isDataNode() && node.getVersion().onOrAfter(params.getVersion())
+ node.isDataNode() &&
+ node.getVersion().onOrAfter(params.getVersion())
);
return discoveryNode == null ? NO_NODE_FOUND : new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), "");
}
diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java
index c2105fa3eee..6773b3aec12 100644
--- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java
+++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java
@@ -34,8 +34,8 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
-import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
+import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests;
import java.util.ArrayList;
import java.util.Arrays;
@@ -76,7 +76,7 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
buildNewFakeTransportAddress(),
Collections.emptyMap(),
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
- Version.V_7_2_0))
+ Version.V_7_4_0))
.add(new DiscoveryNode("current-data-node-with-2-tasks",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
@@ -123,6 +123,83 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
equalTo("past-data-node-1"));
}
+ public void testDoNotSelectOldNodes() {
+ MetaData.Builder metaData = MetaData.builder();
+ RoutingTable.Builder routingTable = RoutingTable.builder();
+ addIndices(metaData, routingTable);
+ PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
+ .addTask("transform-task-1",
+ TransformTaskParams.NAME,
+ new TransformTaskParams("transform-task-1", Version.CURRENT, null),
+ new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-task", ""));
+
+ PersistentTasksCustomMetaData pTasks = pTasksBuilder.build();
+
+ metaData.putCustom(PersistentTasksCustomMetaData.TYPE, pTasks);
+
+ DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
+ .add(new DiscoveryNode("old-data-node-1",
+ buildNewFakeTransportAddress(),
+ Collections.emptyMap(),
+ new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
+ Version.V_7_2_0))
+ .add(new DiscoveryNode("current-data-node-with-1-task",
+ buildNewFakeTransportAddress(),
+ Collections.emptyMap(),
+ new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
+ Version.CURRENT))
+ .add(new DiscoveryNode("non-data-node-1",
+ buildNewFakeTransportAddress(),
+ Collections.emptyMap(),
+ Collections.singleton(DiscoveryNodeRole.MASTER_ROLE),
+ Version.CURRENT));
+
+ ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
+ .nodes(nodes);
+ csBuilder.routingTable(routingTable.build());
+ csBuilder.metaData(metaData);
+
+ ClusterState cs = csBuilder.build();
+ Client client = mock(Client.class);
+ TransformAuditor mockAuditor = mock(TransformAuditor.class);
+ TransformConfigManager transformsConfigManager = new TransformConfigManager(client, xContentRegistry());
+ TransformCheckpointService transformCheckpointService = new TransformCheckpointService(client,
+ transformsConfigManager, mockAuditor);
+ ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY,
+ Collections.singleton(TransformTask.NUM_FAILURE_RETRIES_SETTING));
+ ClusterService clusterService = mock(ClusterService.class);
+ when(clusterService.getClusterSettings()).thenReturn(cSettings);
+ when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE);
+ TransformPersistentTasksExecutor executor = new TransformPersistentTasksExecutor(client,
+ transformsConfigManager,
+ transformCheckpointService, mock(SchedulerEngine.class),
+ new TransformAuditor(client, ""),
+ mock(ThreadPool.class),
+ clusterService,
+ Settings.EMPTY);
+
+ // old-data-node-1 prevents assignment
+ assertNull(executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null), cs).getExecutorNode());
+
+ // remove the old 7.2 node
+ nodes = DiscoveryNodes.builder()
+ .add(new DiscoveryNode("current-data-node-with-1-task",
+ buildNewFakeTransportAddress(),
+ Collections.emptyMap(),
+ new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
+ Version.CURRENT))
+ .add(new DiscoveryNode("non-data-node-1",
+ buildNewFakeTransportAddress(),
+ Collections.emptyMap(),
+ Collections.singleton(DiscoveryNodeRole.MASTER_ROLE),
+ Version.CURRENT));
+ csBuilder.nodes(nodes);
+ cs = csBuilder.build();
+
+ assertThat(executor.getAssignment(new TransformTaskParams("new-old-task-id", Version.V_7_2_0, null), cs).getExecutorNode(),
+ equalTo("current-data-node-with-1-task"));
+ }
+
public void testVerifyIndicesPrimaryShardsAreActive() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();