[7.5][Transform] prevent assignment if any node is older than 7.4 (#48055)
disable task assignment of transforms if any node uses version 7.2 or 7.2 (mixed cluster). fixes #48019
This commit is contained in:
parent
f0cb43fb96
commit
b2ce72850b
|
@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.lucene.util.SetOnce;
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.ResourceNotFoundException;
|
import org.elasticsearch.ResourceNotFoundException;
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.LatchedActionListener;
|
import org.elasticsearch.action.LatchedActionListener;
|
||||||
import org.elasticsearch.action.support.IndicesOptions;
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
|
@ -33,18 +34,18 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
||||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
import org.elasticsearch.xpack.core.transform.TransformField;
|
||||||
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
||||||
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
|
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
|
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
|
||||||
|
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
|
||||||
|
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
|
||||||
import org.elasticsearch.xpack.transform.Transform;
|
import org.elasticsearch.xpack.transform.Transform;
|
||||||
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
|
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
|
||||||
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
||||||
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
|
|
||||||
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
|
||||||
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
|
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
|
||||||
|
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
||||||
|
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
|
||||||
import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil;
|
import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -99,8 +100,18 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
|
||||||
logger.debug(reason);
|
logger.debug(reason);
|
||||||
return new PersistentTasksCustomMetaData.Assignment(null, reason);
|
return new PersistentTasksCustomMetaData.Assignment(null, reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// see gh#48019 disable assignment if any node is using 7.2 or 7.3
|
||||||
|
if (clusterState.getNodes().getMinNodeVersion().before(Version.V_7_4_0)) {
|
||||||
|
String reason = "Not starting transform [" + params.getId() + "], " +
|
||||||
|
"because cluster contains nodes with version older than 7.4.0";
|
||||||
|
logger.debug(reason);
|
||||||
|
return new PersistentTasksCustomMetaData.Assignment(null, reason);
|
||||||
|
}
|
||||||
|
|
||||||
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, (node) ->
|
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, (node) ->
|
||||||
node.isDataNode() && node.getVersion().onOrAfter(params.getVersion())
|
node.isDataNode() &&
|
||||||
|
node.getVersion().onOrAfter(params.getVersion())
|
||||||
);
|
);
|
||||||
return discoveryNode == null ? NO_NODE_FOUND : new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), "");
|
return discoveryNode == null ? NO_NODE_FOUND : new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), "");
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,8 +34,8 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
|
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
|
||||||
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
|
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
|
||||||
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
||||||
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests;
|
|
||||||
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
||||||
|
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -76,7 +76,7 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
|
||||||
buildNewFakeTransportAddress(),
|
buildNewFakeTransportAddress(),
|
||||||
Collections.emptyMap(),
|
Collections.emptyMap(),
|
||||||
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
|
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
|
||||||
Version.V_7_2_0))
|
Version.V_7_4_0))
|
||||||
.add(new DiscoveryNode("current-data-node-with-2-tasks",
|
.add(new DiscoveryNode("current-data-node-with-2-tasks",
|
||||||
buildNewFakeTransportAddress(),
|
buildNewFakeTransportAddress(),
|
||||||
Collections.emptyMap(),
|
Collections.emptyMap(),
|
||||||
|
@ -123,6 +123,83 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
|
||||||
equalTo("past-data-node-1"));
|
equalTo("past-data-node-1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDoNotSelectOldNodes() {
|
||||||
|
MetaData.Builder metaData = MetaData.builder();
|
||||||
|
RoutingTable.Builder routingTable = RoutingTable.builder();
|
||||||
|
addIndices(metaData, routingTable);
|
||||||
|
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
|
||||||
|
.addTask("transform-task-1",
|
||||||
|
TransformTaskParams.NAME,
|
||||||
|
new TransformTaskParams("transform-task-1", Version.CURRENT, null),
|
||||||
|
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-task", ""));
|
||||||
|
|
||||||
|
PersistentTasksCustomMetaData pTasks = pTasksBuilder.build();
|
||||||
|
|
||||||
|
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, pTasks);
|
||||||
|
|
||||||
|
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
|
||||||
|
.add(new DiscoveryNode("old-data-node-1",
|
||||||
|
buildNewFakeTransportAddress(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
|
||||||
|
Version.V_7_2_0))
|
||||||
|
.add(new DiscoveryNode("current-data-node-with-1-task",
|
||||||
|
buildNewFakeTransportAddress(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
|
||||||
|
Version.CURRENT))
|
||||||
|
.add(new DiscoveryNode("non-data-node-1",
|
||||||
|
buildNewFakeTransportAddress(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.singleton(DiscoveryNodeRole.MASTER_ROLE),
|
||||||
|
Version.CURRENT));
|
||||||
|
|
||||||
|
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
.nodes(nodes);
|
||||||
|
csBuilder.routingTable(routingTable.build());
|
||||||
|
csBuilder.metaData(metaData);
|
||||||
|
|
||||||
|
ClusterState cs = csBuilder.build();
|
||||||
|
Client client = mock(Client.class);
|
||||||
|
TransformAuditor mockAuditor = mock(TransformAuditor.class);
|
||||||
|
TransformConfigManager transformsConfigManager = new TransformConfigManager(client, xContentRegistry());
|
||||||
|
TransformCheckpointService transformCheckpointService = new TransformCheckpointService(client,
|
||||||
|
transformsConfigManager, mockAuditor);
|
||||||
|
ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY,
|
||||||
|
Collections.singleton(TransformTask.NUM_FAILURE_RETRIES_SETTING));
|
||||||
|
ClusterService clusterService = mock(ClusterService.class);
|
||||||
|
when(clusterService.getClusterSettings()).thenReturn(cSettings);
|
||||||
|
when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE);
|
||||||
|
TransformPersistentTasksExecutor executor = new TransformPersistentTasksExecutor(client,
|
||||||
|
transformsConfigManager,
|
||||||
|
transformCheckpointService, mock(SchedulerEngine.class),
|
||||||
|
new TransformAuditor(client, ""),
|
||||||
|
mock(ThreadPool.class),
|
||||||
|
clusterService,
|
||||||
|
Settings.EMPTY);
|
||||||
|
|
||||||
|
// old-data-node-1 prevents assignment
|
||||||
|
assertNull(executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null), cs).getExecutorNode());
|
||||||
|
|
||||||
|
// remove the old 7.2 node
|
||||||
|
nodes = DiscoveryNodes.builder()
|
||||||
|
.add(new DiscoveryNode("current-data-node-with-1-task",
|
||||||
|
buildNewFakeTransportAddress(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
|
||||||
|
Version.CURRENT))
|
||||||
|
.add(new DiscoveryNode("non-data-node-1",
|
||||||
|
buildNewFakeTransportAddress(),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Collections.singleton(DiscoveryNodeRole.MASTER_ROLE),
|
||||||
|
Version.CURRENT));
|
||||||
|
csBuilder.nodes(nodes);
|
||||||
|
cs = csBuilder.build();
|
||||||
|
|
||||||
|
assertThat(executor.getAssignment(new TransformTaskParams("new-old-task-id", Version.V_7_2_0, null), cs).getExecutorNode(),
|
||||||
|
equalTo("current-data-node-with-1-task"));
|
||||||
|
}
|
||||||
|
|
||||||
public void testVerifyIndicesPrimaryShardsAreActive() {
|
public void testVerifyIndicesPrimaryShardsAreActive() {
|
||||||
MetaData.Builder metaData = MetaData.builder();
|
MetaData.Builder metaData = MetaData.builder();
|
||||||
RoutingTable.Builder routingTable = RoutingTable.builder();
|
RoutingTable.Builder routingTable = RoutingTable.builder();
|
||||||
|
|
Loading…
Reference in New Issue