Introduce formal role for remote cluster client (#54138)
This commit introduce a formal role for identifying nodes that are capable of making connections to remote clusters. Relates #53924
This commit is contained in:
parent
75e80e207a
commit
381d7586e4
|
@ -7,7 +7,7 @@
|
|||
- match:
|
||||
$body: |
|
||||
/ #ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
|
||||
^ ((\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d* \s+ (-)?\d* \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)?\s+ ((-)?\d*(\.\d+)?)? \s+ (-|[dilmt]{1,5}) \s+ [-*x] \s+ (\S+\s?)+ \n)+ $/
|
||||
^ ((\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d* \s+ (-)?\d* \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)?\s+ ((-)?\d*(\.\d+)?)? \s+ (-|[dilmrt]{1,6}) \s+ [-*x] \s+ (\S+\s?)+ \n)+ $/
|
||||
|
||||
- do:
|
||||
cat.nodes:
|
||||
|
@ -16,7 +16,7 @@
|
|||
- match:
|
||||
$body: |
|
||||
/^ ip \s+ heap\.percent \s+ ram\.percent \s+ cpu \s+ load_1m \s+ load_5m \s+ load_15m \s+ node\.role \s+ master \s+ name \n
|
||||
((\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d* \s+ (-)?\d* \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)? \s+ (-|[dilmt]{1,5}) \s+ [-*x] \s+ (\S+\s?)+ \n)+ $/
|
||||
((\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d* \s+ (-)?\d* \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)? \s+ (-|[dilmrt]{1,6}) \s+ [-*x] \s+ (\S+\s?)+ \n)+ $/
|
||||
|
||||
- do:
|
||||
cat.nodes:
|
||||
|
|
|
@ -36,6 +36,22 @@ pre-7.7 default behaviour by adjusting your <<logging>>.
|
|||
[[breaking_77_settings_changes]]
|
||||
=== Settings changes
|
||||
|
||||
[discrete]
|
||||
[[deprecate-listener-thread-pool]]
|
||||
==== `thread_pool.listener.size` and `thread_pool.listener.queue_size` have been deprecated
|
||||
The listener thread pool is no longer used internally by Elasticsearch.
|
||||
Therefore, these settings have been deprecated. You can safely remove these
|
||||
settings from the configuration of your nodes.
|
||||
|
||||
[discrete]
|
||||
[[deprecate-cluster-remote-connect]]
|
||||
==== `cluster.remote.connect` is deprecated in favor of `node.remote_cluster_client`
|
||||
Previously the setting `cluster.remote.connect` was used to configure whether or
|
||||
not the local node is capable of acting as a remote cluster client in
|
||||
cross-cluster search and cross-cluster replication. This setting is deprecated
|
||||
in favor of `node.remote_cluster_client` serves the same purpose and identifies
|
||||
the local node as having the `remote_cluster_client` role.
|
||||
|
||||
[discrete]
|
||||
[[deprecate-missing-realm-order]]
|
||||
==== Authentication realm `order` will be a required config in version 8.0.0.
|
||||
|
@ -72,14 +88,6 @@ for `gt` and `lte` boundaries, the same queries on `date_range` fields didn't
|
|||
do this. The behavior is now the same for both field types like documented in
|
||||
<<range-query-date-math-rounding>>.
|
||||
|
||||
[float]
|
||||
[[deprecate-listener-thread-pool]]
|
||||
==== `thread_pool.listener.size` and `thread_pool.listener.queue_size` have been deprecated
|
||||
|
||||
The listener thread pool is no longer used internally by Elasticsearch.
|
||||
Therefore, these settings have been deprecated. You can safely remove these
|
||||
settings from the configuration of your nodes.
|
||||
|
||||
[discrete]
|
||||
[[breaking_77_highlighters_changes]]
|
||||
=== Highlighters changes
|
||||
|
|
|
@ -797,8 +797,8 @@ An array of index names. Wildcards are supported. For example:
|
|||
`["it_ops_metrics", "server*"]`.
|
||||
+
|
||||
--
|
||||
NOTE: If any indices are in remote clusters then `cluster.remote.connect` must
|
||||
not be set to `false` on any {ml} nodes.
|
||||
NOTE: If any indices are in remote clusters then `node.remote_cluster_client`
|
||||
must not be set to `false` on any {ml} nodes.
|
||||
|
||||
--
|
||||
end::indices[]
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
[[modules-node]]
|
||||
== Node
|
||||
|
||||
Any time that you start an instance of Elasticsearch, you are starting a _node_.
|
||||
A collection of connected nodes is called a <<modules-cluster,cluster>>. If you
|
||||
Any time that you start an instance of Elasticsearch, you are starting a _node_.
|
||||
A collection of connected nodes is called a <<modules-cluster,cluster>>. If you
|
||||
are running a single node of {es}, then you have a cluster of one node.
|
||||
|
||||
Every node in the cluster can handle <<modules-http,HTTP>> and
|
||||
|
@ -11,7 +11,7 @@ exclusively for communication between nodes; the HTTP layer is used by REST
|
|||
clients.
|
||||
|
||||
All nodes know about all the other nodes in the cluster and can forward client
|
||||
requests to the appropriate node.
|
||||
requests to the appropriate node.
|
||||
|
||||
By default, a node is all of the following types: master-eligible, data, ingest,
|
||||
and machine learning (if available).
|
||||
|
@ -95,22 +95,22 @@ be elected to become the master node by the <<modules-discovery,master election
|
|||
process>>.
|
||||
|
||||
IMPORTANT: Master nodes must have access to the `data/` directory (just like
|
||||
`data` nodes) as this is where the cluster state is persisted between node
|
||||
`data` nodes) as this is where the cluster state is persisted between node
|
||||
restarts.
|
||||
|
||||
[float]
|
||||
[[dedicated-master-node]]
|
||||
==== Dedicated master-eligible node
|
||||
|
||||
It is important for the health of the cluster that the elected master node has
|
||||
the resources it needs to fulfill its responsibilities. If the elected master
|
||||
node is overloaded with other tasks then the cluster may not operate well. In
|
||||
particular, indexing and searching your data can be very resource-intensive, so
|
||||
in large or high-throughput clusters it is a good idea to avoid using the
|
||||
master-eligible nodes for tasks such as indexing and searching. You can do this
|
||||
by configuring three of your nodes to be dedicated master-eligible nodes.
|
||||
It is important for the health of the cluster that the elected master node has
|
||||
the resources it needs to fulfill its responsibilities. If the elected master
|
||||
node is overloaded with other tasks then the cluster may not operate well. In
|
||||
particular, indexing and searching your data can be very resource-intensive, so
|
||||
in large or high-throughput clusters it is a good idea to avoid using the
|
||||
master-eligible nodes for tasks such as indexing and searching. You can do this
|
||||
by configuring three of your nodes to be dedicated master-eligible nodes.
|
||||
Dedicated master-eligible nodes only have the `master` role, allowing them to
|
||||
focus on managing the cluster. While master nodes can also behave as
|
||||
focus on managing the cluster. While master nodes can also behave as
|
||||
<<coordinating-node,coordinating nodes>> and route search and indexing requests
|
||||
from clients to data nodes, it is better _not_ to use dedicated master nodes for
|
||||
this purpose.
|
||||
|
@ -127,7 +127,7 @@ node.ml: false <5>
|
|||
xpack.ml.enabled: true <6>
|
||||
node.transform: false <7>
|
||||
xpack.transform.enabled: true <8>
|
||||
cluster.remote.connect: false <9>
|
||||
node.remote_client_client: false <9>
|
||||
-------------------
|
||||
<1> The `node.master` role is enabled by default.
|
||||
<2> The `node.voting_only` role is disabled by default.
|
||||
|
@ -146,7 +146,7 @@ To create a dedicated master-eligible node in the {oss-dist}, set:
|
|||
node.master: true <1>
|
||||
node.data: false <2>
|
||||
node.ingest: false <3>
|
||||
cluster.remote.connect: false <4>
|
||||
node.remote_cluster_client: false <4>
|
||||
-------------------
|
||||
<1> The `node.master` role is enabled by default.
|
||||
<2> Disable the `node.data` role (enabled by default).
|
||||
|
@ -210,7 +210,7 @@ node.ml: false <5>
|
|||
xpack.ml.enabled: true <6>
|
||||
node.transform: false <7>
|
||||
xpack.transform.enabled: true <8>
|
||||
cluster.remote.connect: false <9>
|
||||
node.remote_cluster_client: false <9>
|
||||
-------------------
|
||||
<1> The `node.master` role is enabled by default.
|
||||
<2> Enable the `node.voting_only` role (disabled by default).
|
||||
|
@ -243,7 +243,7 @@ node.data: true <3>
|
|||
node.ingest: false <4>
|
||||
node.ml: false <5>
|
||||
node.transform: false <6>
|
||||
cluster.remote.connect: false <7>
|
||||
node.remote_cluster_client: false <7>
|
||||
-------------------
|
||||
<1> Disable the `node.master` role (enabled by default).
|
||||
<2> The `node.voting_only` role is disabled by default.
|
||||
|
@ -259,7 +259,7 @@ To create a dedicated data node in the {oss-dist}, set:
|
|||
node.master: false <1>
|
||||
node.data: true <2>
|
||||
node.ingest: false <3>
|
||||
cluster.remote.connect: false <4>
|
||||
node.remote_cluster_client: false <4>
|
||||
-------------------
|
||||
<1> Disable the `node.master` role (enabled by default).
|
||||
<2> The `node.data` role is enabled by default.
|
||||
|
@ -285,7 +285,7 @@ node.data: false <3>
|
|||
node.ingest: true <4>
|
||||
node.ml: false <5>
|
||||
node.transform: false <6>
|
||||
cluster.remote.connect: false <7>
|
||||
node.remote_cluster_client: false <7>
|
||||
-------------------
|
||||
<1> Disable the `node.master` role (enabled by default).
|
||||
<2> The `node.voting_only` role is disabled by default.
|
||||
|
@ -302,7 +302,7 @@ To create a dedicated ingest node in the {oss-dist}, set:
|
|||
node.master: false <1>
|
||||
node.data: false <2>
|
||||
node.ingest: true <3>
|
||||
cluster.remote.connect: false <4>
|
||||
node.remote_cluster_client: false <4>
|
||||
-------------------
|
||||
<1> Disable the `node.master` role (enabled by default).
|
||||
<2> Disable the `node.data` role (enabled by default).
|
||||
|
@ -340,7 +340,7 @@ node.data: false <3>
|
|||
node.ingest: false <4>
|
||||
node.ml: false <5>
|
||||
node.transform: false <6>
|
||||
cluster.remote.connect: false <7>
|
||||
node.remote_cluster_client: false <7>
|
||||
-------------------
|
||||
<1> Disable the `node.master` role (enabled by default).
|
||||
<2> The `node.voting_only` role is disabled by default.
|
||||
|
@ -357,7 +357,7 @@ To create a dedicated coordinating node in the {oss-dist}, set:
|
|||
node.master: false <1>
|
||||
node.data: false <2>
|
||||
node.ingest: false <3>
|
||||
cluster.remote.connect: false <4>
|
||||
node.remote_cluster_client: false <4>
|
||||
-------------------
|
||||
<1> Disable the `node.master` role (enabled by default).
|
||||
<2> Disable the `node.data` role (enabled by default).
|
||||
|
@ -390,7 +390,7 @@ node.ml: true <5>
|
|||
xpack.ml.enabled: true <6>
|
||||
node.transform: false <7>
|
||||
xpack.transform.enabled: true <8>
|
||||
cluster.remote.connect: false <9>
|
||||
node.remote_cluster_client: false <9>
|
||||
-------------------
|
||||
<1> Disable the `node.master` role (enabled by default).
|
||||
<2> The `node.voting_only` role is disabled by default.
|
||||
|
@ -425,7 +425,7 @@ node.ingest: false <4>
|
|||
node.ml: false <5>
|
||||
node.transform: true <6>
|
||||
xpack.transform.enabled: true <7>
|
||||
cluster.remote.connect: false <8>
|
||||
node.remote_cluster_client: false <8>
|
||||
-------------------
|
||||
<1> Disable the `node.master` role.
|
||||
<2> Disable the `node.voting_only`.
|
||||
|
|
|
@ -254,12 +254,12 @@ and <<remote-cluster-proxy-settings,proxy mode settings>> are described below.
|
|||
The time to wait for remote connections to be established when the node
|
||||
starts. The default is `30s`.
|
||||
|
||||
`cluster.remote.connect`::
|
||||
`node.remote_cluster_client`::
|
||||
|
||||
By default, any node in the cluster can act as a cross-cluster client and
|
||||
connect to remote clusters. The `cluster.remote.connect` setting can be set to
|
||||
`false` (defaults to `true`) to prevent certain nodes from connecting to
|
||||
remote clusters. Remote cluster requests must be sent to a node that is
|
||||
connect to remote clusters. The `node.remote_cluster_client` setting can be
|
||||
set to `false` (defaults to `true`) to prevent certain nodes from connecting
|
||||
to remote clusters. Remote cluster requests must be sent to a node that is
|
||||
allowed to act as a cross-cluster client.
|
||||
|
||||
`cluster.remote.<cluster_alias>.skip_unavailable`::
|
||||
|
|
|
@ -35,7 +35,7 @@ task 'remote-cluster'(type: RestIntegTestTask) {
|
|||
|
||||
testClusters.'remote-cluster' {
|
||||
numberOfNodes = 2
|
||||
setting 'cluster.remote.connect', 'false'
|
||||
setting 'node.remote_cluster_client', 'false'
|
||||
}
|
||||
|
||||
task mixedClusterTest(type: RestIntegTestTask) {
|
||||
|
@ -50,7 +50,7 @@ testClusters.mixedClusterTest {
|
|||
setting 'cluster.remote.my_remote_cluster.seeds',
|
||||
{ "\"${testClusters.'remote-cluster'.getAllTransportPortURI().get(0)}\"" }
|
||||
setting 'cluster.remote.connections_per_cluster', '1'
|
||||
setting 'cluster.remote.connect', 'true'
|
||||
setting 'node.remote_cluster_client', 'true'
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
- match:
|
||||
$body: |
|
||||
/ #ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
|
||||
^ ((\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d* \s+ (-)?\d* \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)?\s+ ((-)?\d*(\.\d+)?)? \s+ (-|[dilmt]{1,5}) \s+ [-*x] \s+ (\S+\s?)+ \n)+ $/
|
||||
^ ((\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d* \s+ (-)?\d* \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)?\s+ ((-)?\d*(\.\d+)?)? \s+ (-|[dilmrt]{1,6}) \s+ [-*x] \s+ (\S+\s?)+ \n)+ $/
|
||||
|
||||
- do:
|
||||
cat.nodes:
|
||||
|
@ -16,7 +16,7 @@
|
|||
- match:
|
||||
$body: |
|
||||
/^ ip \s+ heap\.percent \s+ ram\.percent \s+ cpu \s+ load_1m \s+ load_5m \s+ load_15m \s+ node\.role \s+ master \s+ name \n
|
||||
((\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d* \s+ (-)?\d* \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)? \s+ (-|[dilmt]{1,5}) \s+ [-*x] \s+ (\S+\s?)+ \n)+ $/
|
||||
((\d{1,3}\.){3}\d{1,3} \s+ \d+ \s+ \d* \s+ (-)?\d* \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)? \s+ ((-)?\d*(\.\d+)?)? \s+ (-|[dilmrt]{1,6}) \s+ [-*x] \s+ (\S+\s?)+ \n)+ $/
|
||||
|
||||
- do:
|
||||
cat.nodes:
|
||||
|
|
|
@ -73,6 +73,10 @@ public class DiscoveryNode implements Writeable, ToXContentFragment {
|
|||
return Node.NODE_INGEST_SETTING.get(settings);
|
||||
}
|
||||
|
||||
public static boolean isRemoteClusterClient(final Settings settings) {
|
||||
return Node.NODE_REMOTE_CLUSTER_CLIENT.get(settings);
|
||||
}
|
||||
|
||||
private final String nodeName;
|
||||
private final String nodeId;
|
||||
private final String ephemeralId;
|
||||
|
@ -282,7 +286,7 @@ public class DiscoveryNode implements Writeable, ToXContentFragment {
|
|||
} else {
|
||||
// an old node will only understand legacy roles since pluggable roles is a new concept
|
||||
final List<DiscoveryNodeRole> rolesToWrite =
|
||||
roles.stream().filter(DiscoveryNodeRole.BUILT_IN_ROLES::contains).collect(Collectors.toList());
|
||||
roles.stream().filter(DiscoveryNodeRole.LEGACY_ROLES::contains).collect(Collectors.toList());
|
||||
out.writeVInt(rolesToWrite.size());
|
||||
for (final DiscoveryNodeRole role : rolesToWrite) {
|
||||
if (role == DiscoveryNodeRole.MASTER_ROLE) {
|
||||
|
@ -357,6 +361,15 @@ public class DiscoveryNode implements Writeable, ToXContentFragment {
|
|||
return roles.contains(DiscoveryNodeRole.INGEST_ROLE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether or not the node can be a remote cluster client.
|
||||
*
|
||||
* @return true if the node can be a remote cluster client, false otherwise
|
||||
*/
|
||||
public boolean isRemoteClusterClient() {
|
||||
return roles.contains(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a set of all the roles that the node fulfills.
|
||||
* If the node doesn't have any specific role, the set is returned empty, which means that the node is a coordinating only node.
|
||||
|
|
|
@ -137,11 +137,23 @@ public abstract class DiscoveryNodeRole {
|
|||
|
||||
};
|
||||
|
||||
public static final DiscoveryNodeRole REMOTE_CLUSTER_CLIENT_ROLE = new DiscoveryNodeRole("remote_cluster_client", "r") {
|
||||
|
||||
@Override
|
||||
protected Setting<Boolean> roleSetting() {
|
||||
return Node.NODE_REMOTE_CLUSTER_CLIENT;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* The built-in node roles.
|
||||
*/
|
||||
public static Set<DiscoveryNodeRole> BUILT_IN_ROLES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DATA_ROLE, INGEST_ROLE, MASTER_ROLE)));
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DATA_ROLE, INGEST_ROLE, MASTER_ROLE, REMOTE_CLUSTER_CLIENT_ROLE)));
|
||||
|
||||
static Set<DiscoveryNodeRole> LEGACY_ROLES =
|
||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DATA_ROLE, INGEST_ROLE, MASTER_ROLE)));
|
||||
|
||||
/**
|
||||
* Represents an unknown role. This can occur if a newer version adds a role that an older version does not know about, or a newer
|
||||
|
|
|
@ -462,6 +462,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
Node.NODE_DATA_SETTING,
|
||||
Node.NODE_MASTER_SETTING,
|
||||
Node.NODE_INGEST_SETTING,
|
||||
Node.NODE_REMOTE_CLUSTER_CLIENT,
|
||||
Node.NODE_ATTRIBUTES,
|
||||
Node.NODE_LOCAL_STORAGE_SETTING,
|
||||
AutoCreateIndex.AUTO_CREATE_INDEX_SETTING,
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.logging.LogConfigurator;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
|
@ -41,7 +40,7 @@ public class ByteSizeValue implements Writeable, Comparable<ByteSizeValue>, ToXC
|
|||
/**
|
||||
* We have to lazy initialize the deprecation logger as otherwise a static logger here would be constructed before logging is configured
|
||||
* leading to a runtime failure (see {@link LogConfigurator#checkErrorListener()} ). The premature construction would come from any
|
||||
* {@link Setting} object constructed in, for example, settings in {@link org.elasticsearch.common.network.NetworkService}.
|
||||
* {@link ByteSizeValue} object constructed in, for example, settings in {@link org.elasticsearch.common.network.NetworkService}.
|
||||
*/
|
||||
static class DeprecationLoggerHolder {
|
||||
static DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(ByteSizeValue.class));
|
||||
|
|
|
@ -158,6 +158,7 @@ import org.elasticsearch.tasks.Task;
|
|||
import org.elasticsearch.tasks.TaskResultsService;
|
||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportInterceptor;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -203,6 +204,8 @@ public class Node implements Closeable {
|
|||
Setting.boolSetting("node.master", true, Property.NodeScope);
|
||||
public static final Setting<Boolean> NODE_INGEST_SETTING =
|
||||
Setting.boolSetting("node.ingest", true, Property.NodeScope);
|
||||
public static final Setting<Boolean> NODE_REMOTE_CLUSTER_CLIENT =
|
||||
Setting.boolSetting("node.remote_cluster_client", RemoteClusterService.ENABLE_REMOTE_CLUSTERS, Property.NodeScope);
|
||||
|
||||
/**
|
||||
* controls whether the node is allowed to persist things like metadata to disk
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.action.support.IndicesOptions;
|
|||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
@ -38,6 +39,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
@ -65,7 +67,7 @@ import static org.elasticsearch.common.settings.Setting.timeSetting;
|
|||
*/
|
||||
public final class RemoteClusterService extends RemoteClusterAware implements Closeable {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(RemoteClusterService.class);
|
||||
private final Logger logger = LogManager.getLogger(RemoteClusterService.class);
|
||||
|
||||
static {
|
||||
// remove search.remote.* settings in 8.0.0
|
||||
|
@ -116,6 +118,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
Setting.boolSetting(
|
||||
"cluster.remote.connect",
|
||||
SEARCH_ENABLE_REMOTE_CLUSTERS, // the default needs to be true when fallback is removed
|
||||
Setting.Property.Deprecated,
|
||||
Setting.Property.NodeScope);
|
||||
|
||||
public static final Setting.AffixSetting<Boolean> SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE =
|
||||
|
@ -175,7 +178,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
|
||||
RemoteClusterService(Settings settings, TransportService transportService) {
|
||||
super(settings);
|
||||
this.enabled = ENABLE_REMOTE_CLUSTERS.get(settings);
|
||||
this.enabled = Node.NODE_REMOTE_CLUSTER_CLIENT.get(settings);
|
||||
this.transportService = transportService;
|
||||
}
|
||||
|
||||
|
@ -256,7 +259,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
|
||||
RemoteClusterConnection getRemoteClusterConnection(String cluster) {
|
||||
if (enabled == false) {
|
||||
throw new IllegalArgumentException("remote cluster service is not enabled");
|
||||
throw new IllegalArgumentException(
|
||||
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
|
||||
}
|
||||
RemoteClusterConnection connection = remoteClusters.get(cluster);
|
||||
if (connection == null) {
|
||||
|
@ -396,7 +400,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
*/
|
||||
public void collectNodes(Set<String> clusters, ActionListener<BiFunction<String, String, DiscoveryNode>> listener) {
|
||||
if (enabled == false) {
|
||||
throw new IllegalArgumentException("remote cluster service is not enabled");
|
||||
throw new IllegalArgumentException(
|
||||
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
|
||||
}
|
||||
Map<String, RemoteClusterConnection> remoteClusters = this.remoteClusters;
|
||||
for (String cluster : clusters) {
|
||||
|
@ -442,7 +447,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
*/
|
||||
public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) {
|
||||
if (transportService.getRemoteClusterService().isEnabled() == false) {
|
||||
throw new IllegalArgumentException("remote cluster service is not enabled");
|
||||
throw new IllegalArgumentException(
|
||||
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
|
||||
}
|
||||
if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias) == false) {
|
||||
throw new NoSuchRemoteClusterException(clusterAlias);
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
|
@ -86,7 +87,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|||
protected final TaskManager taskManager;
|
||||
private final TransportInterceptor.AsyncSender asyncSender;
|
||||
private final Function<BoundTransportAddress, DiscoveryNode> localNodeFactory;
|
||||
private final boolean connectToRemoteCluster;
|
||||
private final boolean remoteClusterClient;
|
||||
private final Transport.ResponseHandlers responseHandlers;
|
||||
private final TransportInterceptor interceptor;
|
||||
|
||||
|
@ -171,13 +172,13 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|||
taskManager = createTaskManager(settings, threadPool, taskHeaders);
|
||||
this.interceptor = transportInterceptor;
|
||||
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
|
||||
this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
|
||||
this.remoteClusterClient = Node.NODE_REMOTE_CLUSTER_CLIENT.get(settings);
|
||||
remoteClusterService = new RemoteClusterService(settings, this);
|
||||
responseHandlers = transport.getResponseHandlers();
|
||||
if (clusterSettings != null) {
|
||||
clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
|
||||
clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
|
||||
if (connectToRemoteCluster) {
|
||||
if (remoteClusterClient) {
|
||||
remoteClusterService.listenForUpdates(clusterSettings);
|
||||
}
|
||||
}
|
||||
|
@ -236,7 +237,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|||
}
|
||||
localNode = localNodeFactory.apply(transport.boundAddress());
|
||||
|
||||
if (connectToRemoteCluster) {
|
||||
if (remoteClusterClient) {
|
||||
// here we start to connect to the remote clusters
|
||||
remoteClusterService.initializeRemoteClusters();
|
||||
}
|
||||
|
|
|
@ -66,6 +66,7 @@ public class ClusterStatsIT extends ESIntegTestCase {
|
|||
expectedCounts.put(DiscoveryNodeRole.DATA_ROLE.roleName(), 1);
|
||||
expectedCounts.put(DiscoveryNodeRole.MASTER_ROLE.roleName(), 1);
|
||||
expectedCounts.put(DiscoveryNodeRole.INGEST_ROLE.roleName(), 1);
|
||||
expectedCounts.put(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName(), 1);
|
||||
expectedCounts.put(ClusterStatsNodes.Counts.COORDINATING_ONLY, 0);
|
||||
int numNodes = randomIntBetween(1, 5);
|
||||
|
||||
|
@ -76,9 +77,13 @@ public class ClusterStatsIT extends ESIntegTestCase {
|
|||
boolean isDataNode = randomBoolean();
|
||||
boolean isMasterNode = randomBoolean();
|
||||
boolean isIngestNode = randomBoolean();
|
||||
Settings settings = Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), isDataNode)
|
||||
.put(Node.NODE_MASTER_SETTING.getKey(), isMasterNode).put(Node.NODE_INGEST_SETTING.getKey(), isIngestNode)
|
||||
.build();
|
||||
boolean isRemoteClusterClientNode = randomBoolean();
|
||||
Settings settings = Settings.builder()
|
||||
.put(Node.NODE_DATA_SETTING.getKey(), isDataNode)
|
||||
.put(Node.NODE_MASTER_SETTING.getKey(), isMasterNode)
|
||||
.put(Node.NODE_INGEST_SETTING.getKey(), isIngestNode)
|
||||
.put(Node.NODE_REMOTE_CLUSTER_CLIENT.getKey(), isRemoteClusterClientNode)
|
||||
.build();
|
||||
internalCluster().startNode(settings);
|
||||
total++;
|
||||
waitForNodes(total);
|
||||
|
@ -92,7 +97,10 @@ public class ClusterStatsIT extends ESIntegTestCase {
|
|||
if (isIngestNode) {
|
||||
incrementCountForRole(DiscoveryNodeRole.INGEST_ROLE.roleName(), expectedCounts);
|
||||
}
|
||||
if (!isDataNode && !isMasterNode && !isIngestNode) {
|
||||
if (isRemoteClusterClientNode) {
|
||||
incrementCountForRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName(), expectedCounts);
|
||||
}
|
||||
if (!isDataNode && !isMasterNode && !isIngestNode && !isRemoteClusterClientNode) {
|
||||
incrementCountForRole(ClusterStatsNodes.Counts.COORDINATING_ONLY, expectedCounts);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,13 +22,18 @@ package org.elasticsearch.cluster.node;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class DiscoveryNodeTests extends ESTestCase {
|
||||
|
||||
|
@ -58,4 +63,29 @@ public class DiscoveryNodeTests extends ESTestCase {
|
|||
assertEquals(transportAddress.getAddress(), serialized.getAddress().getAddress());
|
||||
assertEquals(transportAddress.getPort(), serialized.getAddress().getPort());
|
||||
}
|
||||
|
||||
public void testDiscoveryNodeIsRemoteClusterClientDefault() {
|
||||
runTestDiscoveryNodeIsRemoteClusterClient(Settings.EMPTY, true);
|
||||
}
|
||||
|
||||
public void testDiscoveryNodeIsRemoteClusterClientSet() {
|
||||
runTestDiscoveryNodeIsRemoteClusterClient(Settings.builder().put(Node.NODE_REMOTE_CLUSTER_CLIENT.getKey(), true).build(), true);
|
||||
}
|
||||
|
||||
public void testDiscoveryNodeIsRemoteClusterClientUnset() {
|
||||
runTestDiscoveryNodeIsRemoteClusterClient(Settings.builder().put(Node.NODE_REMOTE_CLUSTER_CLIENT.getKey(), false).build(), false);
|
||||
}
|
||||
|
||||
private void runTestDiscoveryNodeIsRemoteClusterClient(final Settings settings, final boolean expected) {
|
||||
final DiscoveryNode node = DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 9200), "node");
|
||||
assertThat(node.isRemoteClusterClient(), equalTo(expected));
|
||||
final Set<DiscoveryNodeRole> expectedRoles = new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES);
|
||||
if (expected) {
|
||||
assertThat(node.getRoles(), equalTo(expectedRoles));
|
||||
} else {
|
||||
expectedRoles.remove(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE);
|
||||
assertThat(node.getRoles(), equalTo(expectedRoles));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -52,7 +53,7 @@ public class RemoteClusterClientTests extends ESTestCase {
|
|||
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
|
||||
|
||||
Settings localSettings = Settings.builder()
|
||||
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
|
||||
.put(Node.NODE_REMOTE_CLUSTER_CLIENT.getKey(), true)
|
||||
.put("cluster.remote.test.seeds",
|
||||
remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
|
||||
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
|
||||
|
@ -81,7 +82,7 @@ public class RemoteClusterClientTests extends ESTestCase {
|
|||
remoteSettings)) {
|
||||
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
|
||||
Settings localSettings = Settings.builder()
|
||||
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
|
||||
.put(Node.NODE_REMOTE_CLUSTER_CLIENT.getKey(), true)
|
||||
.put("cluster.remote.test.seeds",
|
||||
remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
|
||||
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
|
||||
|
@ -122,14 +123,14 @@ public class RemoteClusterClientTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testRemoteClusterServiceNotEnabled() {
|
||||
final Settings settings = Settings.builder().put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), false).build();
|
||||
final Settings settings = Settings.builder().put(Node.NODE_REMOTE_CLUSTER_CLIENT.getKey(), false).build();
|
||||
try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
final RemoteClusterService remoteClusterService = service.getRemoteClusterService();
|
||||
final IllegalArgumentException e =
|
||||
expectThrows(IllegalArgumentException.class, () -> remoteClusterService.getRemoteClusterClient(threadPool, "test"));
|
||||
assertThat(e.getMessage(), equalTo("remote cluster service is not enabled"));
|
||||
assertThat(e.getMessage(), equalTo("this node does not have the remote_cluster_client role"));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -851,25 +852,25 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testRemoteClusterServiceNotEnabledGetRemoteClusterConnection() {
|
||||
final Settings settings = Settings.builder().put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), false).build();
|
||||
final Settings settings = Settings.builder().put(Node.NODE_REMOTE_CLUSTER_CLIENT.getKey(), false).build();
|
||||
try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
final IllegalArgumentException e =
|
||||
expectThrows(IllegalArgumentException.class, () -> service.getRemoteClusterService().getRemoteClusterConnection("test"));
|
||||
assertThat(e.getMessage(), equalTo("remote cluster service is not enabled"));
|
||||
assertThat(e.getMessage(), equalTo("this node does not have the remote_cluster_client role"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testRemoteClusterServiceNotEnabledGetCollectNodes() {
|
||||
final Settings settings = Settings.builder().put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), false).build();
|
||||
final Settings settings = Settings.builder().put(Node.NODE_REMOTE_CLUSTER_CLIENT.getKey(), false).build();
|
||||
try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
final IllegalArgumentException e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> service.getRemoteClusterService().collectNodes(Collections.emptySet(), ActionListener.wrap(r -> {}, r -> {})));
|
||||
assertThat(e.getMessage(), equalTo("remote cluster service is not enabled"));
|
||||
assertThat(e.getMessage(), equalTo("this node does not have the remote_cluster_client role"));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,20 +28,21 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY;
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS;
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY;
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS;
|
||||
import static org.elasticsearch.node.Node.NODE_REMOTE_CLUSTER_CLIENT;
|
||||
import static org.elasticsearch.transport.RemoteClusterService.ENABLE_REMOTE_CLUSTERS;
|
||||
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE;
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER;
|
||||
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING;
|
||||
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_NODE_ATTRIBUTE;
|
||||
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS;
|
||||
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE;
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER;
|
||||
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING;
|
||||
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE;
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY;
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS;
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER;
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY;
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS;
|
||||
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER;
|
||||
import static org.hamcrest.Matchers.emptyCollectionOf;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
|
@ -89,8 +90,22 @@ public class RemoteClusterSettingsTests extends ESTestCase {
|
|||
assertSettingDeprecationsAndWarnings(new Setting[]{SEARCH_ENABLE_REMOTE_CLUSTERS});
|
||||
}
|
||||
|
||||
public void testEnableRemoteClustersDefault() {
|
||||
assertTrue(ENABLE_REMOTE_CLUSTERS.get(Settings.EMPTY));
|
||||
public void testRemoteClusterClientDefault() {
|
||||
assertTrue(NODE_REMOTE_CLUSTER_CLIENT.get(Settings.EMPTY));
|
||||
}
|
||||
|
||||
public void testDisableRemoteClusterClient() {
|
||||
assertFalse(NODE_REMOTE_CLUSTER_CLIENT.get(Settings.builder().put(NODE_REMOTE_CLUSTER_CLIENT.getKey(), false).build()));
|
||||
}
|
||||
|
||||
public void testDisableEnableRemoteClusters() {
|
||||
assertFalse(NODE_REMOTE_CLUSTER_CLIENT.get(Settings.builder().put(ENABLE_REMOTE_CLUSTERS.getKey(), false).build()));
|
||||
assertSettingDeprecationsAndWarnings(new Setting<?>[]{ENABLE_REMOTE_CLUSTERS});
|
||||
}
|
||||
|
||||
public void testDisableSearchEnableRemoteClusters() {
|
||||
assertFalse(NODE_REMOTE_CLUSTER_CLIENT.get(Settings.builder().put(SEARCH_ENABLE_REMOTE_CLUSTERS.getKey(), false).build()));
|
||||
assertSettingDeprecationsAndWarnings(new Setting<?>[]{SEARCH_ENABLE_REMOTE_CLUSTERS});
|
||||
}
|
||||
|
||||
public void testSkipUnavailableFallback() {
|
||||
|
|
|
@ -53,7 +53,7 @@ public final class SourceDestValidator {
|
|||
public static final String DEST_LOWERCASE = "Destination index [{0}] must be lowercase";
|
||||
public static final String NEEDS_REMOTE_CLUSTER_SEARCH = "Source index is configured with a remote index pattern(s) [{0}]"
|
||||
+ " but the current node [{1}] is not allowed to connect to remote clusters."
|
||||
+ " Please enable cluster.remote.connect for all data nodes.";
|
||||
+ " Please enable remote.cluster_client for all data nodes.";
|
||||
public static final String ERROR_REMOTE_CLUSTER_SEARCH = "Error resolving remote source: {0}";
|
||||
public static final String UNKNOWN_REMOTE_CLUSTER_LICENSE = "Error during license check ({0}) for remote cluster "
|
||||
+ "alias(es) {1}, error: {2}";
|
||||
|
|
|
@ -51,7 +51,7 @@ public final class Messages {
|
|||
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";
|
||||
public static final String DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH = "Datafeed [{0}] is configured with a remote index pattern(s) {1}" +
|
||||
" but the current node [{2}] is not allowed to connect to remote clusters." +
|
||||
" Please enable cluster.remote.connect for all machine learning nodes.";
|
||||
" Please enable node.remote_cluster_client for all machine learning nodes.";
|
||||
|
||||
public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable";
|
||||
public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected";
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.persistent.AllocatedPersistentTask;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
|
@ -38,7 +39,6 @@ import org.elasticsearch.persistent.PersistentTasksService;
|
|||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.ml.MlTasks;
|
||||
|
@ -90,7 +90,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
|
|||
private final AnomalyDetectionAuditor auditor;
|
||||
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
|
||||
private final NamedXContentRegistry xContentRegistry;
|
||||
private final boolean remoteClusterSearchSupported;
|
||||
private final boolean remoteClusterClient;
|
||||
|
||||
@Inject
|
||||
public TransportStartDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool,
|
||||
|
@ -109,7 +109,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
|
|||
this.auditor = auditor;
|
||||
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
|
||||
this.xContentRegistry = xContentRegistry;
|
||||
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
|
||||
this.remoteClusterClient = Node.NODE_REMOTE_CLUSTER_CLIENT.get(settings);
|
||||
}
|
||||
|
||||
static void validate(Job job,
|
||||
|
@ -200,7 +200,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
|
|||
response -> {
|
||||
if (response.isSuccess() == false) {
|
||||
listener.onFailure(createUnlicensedError(params.getDatafeedId(), response));
|
||||
} else if (remoteClusterSearchSupported == false) {
|
||||
} else if (remoteClusterClient == false) {
|
||||
listener.onFailure(
|
||||
ExceptionsHelper.badRequestException(Messages.getMessage(
|
||||
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
|
||||
|
|
|
@ -12,7 +12,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.xpack.core.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
|
||||
|
@ -51,7 +51,7 @@ public class DatafeedJobBuilder {
|
|||
private final JobResultsProvider jobResultsProvider;
|
||||
private final DatafeedConfigProvider datafeedConfigProvider;
|
||||
private final JobResultsPersister jobResultsPersister;
|
||||
private final boolean remoteClusterSearchSupported;
|
||||
private final boolean remoteClusterClient;
|
||||
private final String nodeName;
|
||||
|
||||
public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor,
|
||||
|
@ -66,7 +66,7 @@ public class DatafeedJobBuilder {
|
|||
this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
|
||||
this.datafeedConfigProvider = Objects.requireNonNull(datafeedConfigProvider);
|
||||
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
|
||||
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
|
||||
this.remoteClusterClient = Node.NODE_REMOTE_CLUSTER_CLIENT.get(settings);
|
||||
this.nodeName = nodeName;
|
||||
}
|
||||
|
||||
|
@ -181,7 +181,7 @@ public class DatafeedJobBuilder {
|
|||
configBuilder -> {
|
||||
try {
|
||||
datafeedConfigHolder.set(configBuilder.build());
|
||||
if (remoteClusterSearchSupported == false) {
|
||||
if (remoteClusterClient == false) {
|
||||
List<String> remoteIndices = RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices());
|
||||
if (remoteIndices.isEmpty() == false) {
|
||||
listener.onFailure(
|
||||
|
|
|
@ -11,9 +11,9 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.mock.orig.Mockito;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.xpack.core.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
|
||||
|
@ -207,7 +207,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception {
|
||||
Settings settings = Settings.builder().put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), false).build();
|
||||
Settings settings = Settings.builder().put(Node.NODE_REMOTE_CLUSTER_CLIENT.getKey(), false).build();
|
||||
datafeedJobBuilder =
|
||||
new DatafeedJobBuilder(
|
||||
client,
|
||||
|
|
|
@ -462,7 +462,8 @@ public class ClusterStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Cl
|
|||
+ "\"coordinating_only\":0,"
|
||||
+ "\"data\":0,"
|
||||
+ "\"ingest\":0,"
|
||||
+ "\"master\":1"
|
||||
+ "\"master\":1,"
|
||||
+ "\"remote_cluster_client\":0"
|
||||
+ "},"
|
||||
+ "\"versions\":["
|
||||
+ "\"6.0.0-alpha2\""
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.env.Environment;
|
|||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.persistent.PersistentTasksExecutor;
|
||||
import org.elasticsearch.plugins.PersistentTaskPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -44,7 +45,6 @@ import org.elasticsearch.script.ScriptService;
|
|||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.FixedExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
|
@ -382,7 +382,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
|
|||
Settings.Builder additionalSettings = Settings.builder();
|
||||
|
||||
additionalSettings.put(transformEnabledNodeAttribute, TRANSFORM_ENABLED_NODE.get(settings));
|
||||
additionalSettings.put(transformRemoteEnabledNodeAttribute, RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings));
|
||||
additionalSettings.put(transformRemoteEnabledNodeAttribute, Node.NODE_REMOTE_CLUSTER_CLIENT.get(settings));
|
||||
|
||||
return additionalSettings.build();
|
||||
}
|
||||
|
|
|
@ -35,12 +35,12 @@ import org.elasticsearch.license.License;
|
|||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
|
@ -122,7 +122,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
|
|||
this.sourceDestValidator = new SourceDestValidator(
|
||||
indexNameExpressionResolver,
|
||||
transportService.getRemoteClusterService(),
|
||||
RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings)
|
||||
Node.NODE_REMOTE_CLUSTER_CLIENT.get(settings)
|
||||
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
|
||||
: null,
|
||||
clusterService.getNodeName(),
|
||||
|
|
|
@ -31,10 +31,10 @@ import org.elasticsearch.license.License;
|
|||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
|
@ -137,7 +137,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
|
|||
this.sourceDestValidator = new SourceDestValidator(
|
||||
indexNameExpressionResolver,
|
||||
transportService.getRemoteClusterService(),
|
||||
RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings)
|
||||
Node.NODE_REMOTE_CLUSTER_CLIENT.get(settings)
|
||||
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
|
||||
: null,
|
||||
clusterService.getNodeName(),
|
||||
|
|
|
@ -31,11 +31,11 @@ import org.elasticsearch.license.License;
|
|||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
|
@ -136,7 +136,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
this.sourceDestValidator = new SourceDestValidator(
|
||||
indexNameExpressionResolver,
|
||||
transportService.getRemoteClusterService(),
|
||||
RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings)
|
||||
Node.NODE_REMOTE_CLUSTER_CLIENT.get(settings)
|
||||
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
|
||||
: null,
|
||||
clusterService.getNodeName(),
|
||||
|
|
|
@ -27,10 +27,10 @@ import org.elasticsearch.license.License;
|
|||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
|
@ -125,7 +125,7 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
|
|||
this.sourceDestValidator = new SourceDestValidator(
|
||||
indexNameExpressionResolver,
|
||||
transportService.getRemoteClusterService(),
|
||||
RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings)
|
||||
Node.NODE_REMOTE_CLUSTER_CLIENT.get(settings)
|
||||
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
|
||||
: null,
|
||||
clusterService.getNodeName(),
|
||||
|
|
|
@ -19,7 +19,7 @@ public class TransformTests extends ESTestCase {
|
|||
Settings.Builder builder = Settings.builder();
|
||||
boolean transformEnabled = randomBoolean();
|
||||
boolean transformPluginEnabled = randomBoolean();
|
||||
boolean remoteEnabled = randomBoolean();
|
||||
boolean remoteClusterClient = randomBoolean();
|
||||
|
||||
// randomly use explicit or default setting
|
||||
if ((transformEnabled && randomBoolean()) == false) {
|
||||
|
@ -27,8 +27,8 @@ public class TransformTests extends ESTestCase {
|
|||
}
|
||||
|
||||
// randomly use explicit or default setting
|
||||
if ((remoteEnabled && randomBoolean()) == false) {
|
||||
builder.put("cluster.remote.connect", remoteEnabled);
|
||||
if ((remoteClusterClient && randomBoolean()) == false) {
|
||||
builder.put("node.remote_cluster_client", remoteClusterClient);
|
||||
}
|
||||
|
||||
if (transformPluginEnabled == false) {
|
||||
|
@ -43,7 +43,7 @@ public class TransformTests extends ESTestCase {
|
|||
Boolean.parseBoolean(transform.additionalSettings().get("node.attr.transform.node"))
|
||||
);
|
||||
assertEquals(
|
||||
transformPluginEnabled && remoteEnabled,
|
||||
transformPluginEnabled && remoteClusterClient,
|
||||
Boolean.parseBoolean(transform.additionalSettings().get("node.attr.transform.remote_connect"))
|
||||
);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ task 'remote-cluster'(type: RestIntegTestTask) {
|
|||
testClusters.'remote-cluster' {
|
||||
testDistribution = 'DEFAULT'
|
||||
numberOfNodes = 2
|
||||
setting 'cluster.remote.connect', "false"
|
||||
setting 'node.remote_cluster_client', "false"
|
||||
setting 'xpack.ilm.enabled', 'false'
|
||||
setting 'xpack.security.enabled', 'true'
|
||||
setting 'xpack.watcher.enabled', 'false'
|
||||
|
@ -53,7 +53,7 @@ testClusters.'mixed-cluster' {
|
|||
testClusters.'remote-cluster'.getAllTransportPortURI().collect { "\"$it\"" }.toString()
|
||||
}
|
||||
setting 'cluster.remote.connections_per_cluster', "1"
|
||||
setting 'cluster.remote.connect', "true"
|
||||
setting 'node.remote_cluster_client', "true"
|
||||
|
||||
user username: "test_user", password: "x-pack-test-password"
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ task 'remote-cluster'(type: RestIntegTestTask) {
|
|||
testClusters.'remote-cluster' {
|
||||
testDistribution = 'DEFAULT'
|
||||
numberOfNodes = 2
|
||||
setting 'cluster.remote.connect', "false"
|
||||
setting 'node.remote_cluster_client', "false"
|
||||
setting 'xpack.ilm.enabled', 'false'
|
||||
setting 'xpack.security.enabled', 'true'
|
||||
setting 'xpack.watcher.enabled', 'false'
|
||||
|
@ -52,7 +52,7 @@ testClusters.'mixed-cluster' {
|
|||
testClusters.'remote-cluster'.getAllTransportPortURI().collect { "\"$it\"" }.toString()
|
||||
}
|
||||
setting 'cluster.remote.connections_per_cluster', "1"
|
||||
setting 'cluster.remote.connect', "true"
|
||||
setting 'node.remote_cluster_client', "true"
|
||||
|
||||
user username: "test_user", password: "x-pack-test-password"
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue