987 lines
52 KiB
Java
Raw Normal View History

2010-02-08 15:30:06 +02:00
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
2010-02-08 15:30:06 +02:00
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.node;
2010-02-08 15:30:06 +02:00
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.SetOnce;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.GenericAction;
Register data node stats from info carried back in search responses (#25430) * Register data node stats from info carried back in search responses This is part of #24915, where we now calculate the EWMA of service time for tasks in the search threadpool, and send that as well as the current queue size back to the coordinating node. The coordinating node now tracks this information for each node in the cluster. This information will be used in the future the determining the best replica a search request should be routed to. This change has no user-visible difference. * Move response time timing into ResponseListenerWrapper * Move ResponseListenerWrapper to ActionListener instead of SearchActionListener Also removes the logger * Move `requestIndex` back to private * De-guice-ify ResponseCollectorService \o/ * Undo all changes to SearchQueryThenFetchAsyncAction * Remove unneeded response collector from TransportSearchAction * Undo all changes to SearchDfsQueryThenFetchAsyncAction * Completely rewrite the inside of ResponseCollectorService's record keeping * Documentation and cleanups for ResponseCollectorService * Add unit test for collection of queue size and service time * Fix Guice construction error * Add basic unit tests for ResponseCollectorService * Fix version constant for the master merge * Fix test compilation after master merge * Add a test for node removal on cluster changed event * Remove integration test as there are now unit tests * Rename ResponseListenerWrapper -> SearchExecutionStatsCollector * Fix line-length * Make classes private and final where appropriate * Pass nodeId into SearchExecutionStatsCollector and use only ActionListener * Get nodeId from connection so searchShardTarget can be private * Remove threadpool from SearchContext, get it from IndexShard instead * Add missing import * Use BiFunction for responseWrapper rather than passing in collector service
2017-07-17 11:04:51 -06:00
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.bootstrap.BootstrapContext;
2010-02-08 15:30:06 +02:00
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeConnectionsService;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.elasticsearch.cluster.service.ClusterService;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Binder;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Key;
import org.elasticsearch.common.inject.Module;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.DeprecationLogger;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkAddress;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
Async fetch of shard started and store during allocation Today, when a primary shard is not allocated we go to all the nodes to find where it is allocated (listing its started state). When we allocate a replica shard, we head to all the nodes and list its store to allocate the replica on a node that holds the closest matching index files to the primary. Those two operations today execute synchronously within the GatewayAllocator, which means they execute in a sync manner within the cluster update thread. For large clusters, or environments with very slow disk, those operations will stall the cluster update thread, making it seem like its stuck. Worse, if the FS is really slow, we timeout after 30s the operation (to not stall the cluster update thread completely). This means that we will have another run for the primary shard if we didn't find one, or we won't find the best node to place a shard since it might have timed out (listing stores need to list all files and read the checksum at the end of each file). On top of that, this sync operation happen one shard at a time, so its effectively compounding the problem in a serial manner the more shards we have and the slower FS is... This change moves to perform both listing the shard started states and the shard stores to an async manner. During the allocation by the GatewayAllocator, if data needs to be fetched from a node, it is done in an async fashion, with the response triggering a reroute to make sure the results will be taken into account. Also, if there are on going operations happening, the relevant shard data will not be taken into account until all the ongoing listing operations are done executing. The execution of listing shard states and stores has been moved to their own respective thread pools (scaling, so will go down to 0 when not needed anymore, unbounded queue, since we don't want to timeout, just let it execute based on how fast the local FS is). This is needed sine we are going to blast nodes with a lot of requests and we need to make sure there is no thread explosion. This change also improves the handling of shard failures coming from a specific node. Today, those nodes were ignored from allocation only for the single reroute round. Now, since fetching is async, we need to keep those failures around at least until a single successful fetch without the node is done, to make sure not to repeat allocating to the failed node all the time. Note, if before the indication of slow allocation was high pending tasks since the allocator was waiting for responses, not the pending tasks will be much smaller. In order to still indicate that the cluster is in the middle of fetching shard data, 2 attributes were added to the cluster health API, indicating the number of ongoing fetches of both started shards and shard store. closes #9502 closes #11101
2015-05-11 01:19:13 +02:00
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.gateway.GatewayMetaState;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.gateway.GatewayModule;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoverySettings;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestService;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.plugins.SearchPlugin;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.rest.RestController;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.threadpool.ExecutorBuilder;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
2015-02-24 11:07:24 +01:00
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
import org.elasticsearch.persistent.PersistentTasksService;
2015-02-24 11:07:24 +01:00
import java.io.BufferedWriter;
import java.io.Closeable;
2015-02-24 11:07:24 +01:00
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
2015-02-24 11:07:24 +01:00
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
2015-02-24 11:07:24 +01:00
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
2015-02-24 11:07:24 +01:00
import static java.util.stream.Collectors.toList;
2010-02-08 15:30:06 +02:00
/**
* A node represent a node within a cluster (<tt>cluster.name</tt>). The {@link #client()} can be used
2010-02-13 20:03:37 +02:00
* in order to use a {@link Client} to perform actions/operations against the cluster.
2010-02-08 15:30:06 +02:00
*/
public class Node implements Closeable {
2015-02-24 11:07:24 +01:00
public static final Setting<Boolean> WRITE_PORTS_FILE_SETTING =
Setting.boolSetting("node.portsfile", false, Property.NodeScope);
public static final Setting<Boolean> NODE_DATA_SETTING = Setting.boolSetting("node.data", true, Property.NodeScope);
public static final Setting<Boolean> NODE_MASTER_SETTING =
Setting.boolSetting("node.master", true, Property.NodeScope);
public static final Setting<Boolean> NODE_INGEST_SETTING =
Setting.boolSetting("node.ingest", true, Property.NodeScope);
Persistent Node Ids (#19140) Node IDs are currently randomly generated during node startup. That means they change every time the node is restarted. While this doesn't matter for ES proper, it makes it hard for external services to track nodes. Another, more minor, side effect is that indexing the output of, say, the node stats API results in creating new fields due to node ID being used as keys. The first approach I considered was to use the node's published address as the base for the id. We already [treat nodes with the same address as the same](https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java#L387) so this is a simple change (see [here](https://github.com/elastic/elasticsearch/compare/master...bleskes:node_persistent_id_based_on_address)). While this is simple and it works for probably most cases, it is not perfect. For example, if after a node restart, the node is not able to bind to the same port (because it's not yet freed by the OS), it will cause the node to still change identity. Also in environments where the host IP can change due to a host restart, identity will not be the same. Due to those limitation, I opted to go with a different approach where the node id will be persisted in the node's data folder. This has the upside of connecting the id to the nodes data. It also means that the host can be adapted in any way (replace network cards, attach storage to a new VM). I It does however also have downsides - we now run the risk of two nodes having the same id, if someone copies clones a data folder from one node to another. To mitigate this I changed the semantics of the protection against multiple nodes with the same address to be stricter - it will now reject the incoming join if a node exists with the same id but a different address. Note that if the existing node doesn't respond to pings (i.e., it's not alive) it will be removed and the new node will be accepted when it tries another join. Last, and most importantly, this change requires that *all* nodes persist data to disk. This is a change from current behavior where only data & master nodes store local files. This is the main reason for marking this PR as breaking. Other less important notes: - DummyTransportAddress is removed as we need a unique network address per node. Use `LocalTransportAddress.buildUnique()` instead. - I renamed `node.add_lid_to_custom_path` to `node.add_lock_id_to_custom_path` to avoid confusion with the node ID which is now part of the `NodeEnvironment` logic. - I removed the `version` paramater from `MetaDataStateFormat#write` , it wasn't really used and was just in the way :) - TribeNodes are special in the sense that they do start multiple sub-nodes (previously known as client nodes). Those sub-nodes do not store local files but derive their ID from the parent node id, so they are generated consistently.
2016-07-04 21:09:25 +02:00
/**
* controls whether the node is allowed to persist things like metadata to disk
* Note that this does not control whether the node stores actual indices (see
* {@link #NODE_DATA_SETTING}). However, if this is false, {@link #NODE_DATA_SETTING}
* and {@link #NODE_MASTER_SETTING} must also be false.
*
*/
public static final Setting<Boolean> NODE_LOCAL_STORAGE_SETTING = Setting.boolSetting("node.local_storage", true, Property.NodeScope);
public static final Setting<String> NODE_NAME_SETTING = Setting.simpleString("node.name", Property.NodeScope);
public static final Setting.AffixSetting<String> NODE_ATTRIBUTES = Setting.prefixKeySetting("node.attr.", (key) ->
new Setting<>(key, "", (value) -> {
if (value.length() > 0
&& (Character.isWhitespace(value.charAt(0)) || Character.isWhitespace(value.charAt(value.length() - 1)))) {
throw new IllegalArgumentException(key + " cannot have leading or trailing whitespace " +
"[" + value + "]");
}
return value;
}, Property.NodeScope));
public static final Setting<String> BREAKER_TYPE_KEY = new Setting<>("indices.breaker.type", "hierarchy", (s) -> {
switch (s) {
case "hierarchy":
case "none":
return s;
default:
throw new IllegalArgumentException("indices.breaker.type must be one of [hierarchy, none] but was: " + s);
}
}, Setting.Property.NodeScope);
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
/**
* Adds a default node name to the given setting, if it doesn't already exist
* @return the given setting if node name is already set, or a new copy with a default node name set.
*/
public static final Settings addNodeNameIfNeeded(Settings settings, final String nodeId) {
if (NODE_NAME_SETTING.exists(settings)) {
return settings;
}
return Settings.builder().put(settings).put(NODE_NAME_SETTING.getKey(), nodeId.substring(0, 7)).build();
}
2015-02-24 11:07:24 +01:00
private static final String CLIENT_TYPE = "node";
private final Lifecycle lifecycle = new Lifecycle();
private final Injector injector;
private final Settings settings;
private final Environment environment;
Testing infra - stablize data folder usage and clean up (#19111) The plan for persistent node ids ( #17811 ) is to tie the node identity to a file stored in it's data folders. As such it becomes important that nodes in our testing infra have better affinity with their data folders and that their data folders are not cleaned underneath them. The first is important because we fix the random seed used for node id generation (for reproducibility) and allowing the same node to use two different data folders causes two separate nodes to have the same id, which prevents the cluster from forming. The second is important, for example, where a full cluster restart / single node restart need to maintain node identity and wiping the data folders at the wrong moment prevents this. Concretely this commit does the following: 1) Remove previous attempts to have data folder per role using a prefix. This wasn't effective as it was using the data paths settings which are only used for part of the runs. An attempt to completely separate the paths via the home dir failed due to assumptions made by index custom path about node data folder ordinal uniqueness (see #19076) 2) Change full cluster restarts to start up nodes in the same order their were first created in, only randomly swapping nodes with the same roles. 3) Change test cluster reset methods to first shutdown the unneeded nodes and then re-start the shared nodes that were shut down, so they'll reclaim their data folders. 4) Improve data folder wiping logic and make sure it wipes only folders of "offline" nodes. 5) Add some very basic tests
2016-06-28 16:38:56 +02:00
private final NodeEnvironment nodeEnvironment;
2015-02-24 11:07:24 +01:00
private final PluginsService pluginsService;
private final NodeClient client;
private final Collection<LifecycleComponent> pluginLifecycleComponents;
private final LocalNodeFactory localNodeFactory;
private final NodeService nodeService;
2015-02-24 11:07:24 +01:00
/**
* Constructs a node with the given settings.
*
* @param preparedSettings Base settings to configure the node with
*/
public Node(Settings preparedSettings) {
this(InternalSettingsPreparer.prepareEnvironment(preparedSettings));
2015-02-24 11:07:24 +01:00
}
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
public Node(Environment environment) {
this(environment, Collections.emptyList());
}
protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
boolean success = false;
{
// use temp logger just to say we are starting. we can't use it later on because the node name might not be set
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
logger.info("initializing ...");
2015-02-24 11:07:24 +01:00
}
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
try {
Settings tmpSettings = Settings.builder().put(environment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
// create the node environment as soon as possible, to recover the node id and enable logging
try {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
resourcesToClose.add(nodeEnvironment);
} catch (IOException ex) {
throw new IllegalStateException("Failed to create node environment", ex);
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
}
final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
final String nodeId = nodeEnvironment.nodeId();
tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
final Logger logger = Loggers.getLogger(Node.class, tmpSettings);
// this must be captured after the node name is possibly added to the settings
final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
if (hadPredefinedNodeName == false) {
logger.info("node name derived from node ID [{}]; set [{}] to override", nodeId, NODE_NAME_SETTING.getKey());
} else {
logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
}
final JvmInfo jvmInfo = JvmInfo.jvmInfo();
logger.info(
"version[{}], pid[{}], build[{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()),
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
jvmInfo.pid(),
Build.CURRENT.shortHash(),
Build.CURRENT.date(),
Constants.OS_NAME,
Constants.OS_VERSION,
Constants.OS_ARCH,
Constants.JVM_VENDOR,
Constants.JVM_NAME,
Constants.JAVA_VERSION,
Constants.JVM_VERSION);
logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
if (logger.isDebugEnabled()) {
logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
environment.configFile(), Arrays.toString(environment.dataFiles()), environment.logsFile(), environment.pluginsFile());
}
2015-02-24 11:07:24 +01:00
this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
this.settings = pluginsService.updatedSettings();
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
// create the environment based on the finalized (processed) view of the settings
// this is just to makes sure that people get the same settings, no matter where they ask them from
this.environment = new Environment(this.settings, environment.configFile());
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
Environment.assertEquivalent(environment, this.environment);
final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
2015-02-24 11:07:24 +01:00
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
// adds the context to the DeprecationLogger so that it does not need to be injected everywhere
DeprecationLogger.setThreadContext(threadPool.getThreadContext());
resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
client = new NodeClient(settings, threadPool);
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
// so we might be late here already
final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
Circuit break the number of inline scripts compiled per minute When compiling many dynamically changing scripts, parameterized scripts (<https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-using.html#prefer-params>) should be preferred. This enforces a limit to the number of scripts that can be compiled within a minute. A new dynamic setting is added - `script.max_compilations_per_minute`, which defaults to 15. If more dynamic scripts are sent, a user will get the following exception: ```json { "error" : { "root_cause" : [ { "type" : "circuit_breaking_exception", "reason" : "[script] Too many dynamic script compilations within one minute, max: [15/min]; please use on-disk, indexed, or scripts with parameters instead", "bytes_wanted" : 0, "bytes_limit" : 0 } ], "type" : "search_phase_execution_exception", "reason" : "all shards failed", "phase" : "query", "grouped" : true, "failed_shards" : [ { "shard" : 0, "index" : "i", "node" : "a5V1eXcZRYiIk8lecjZ4Jw", "reason" : { "type" : "general_script_exception", "reason" : "Failed to compile inline script [\"aaaaaaaaaaaaaaaa\"] using lang [painless]", "caused_by" : { "type" : "circuit_breaking_exception", "reason" : "[script] Too many dynamic script compilations within one minute, max: [15/min]; please use on-disk, indexed, or scripts with parameters instead", "bytes_wanted" : 0, "bytes_limit" : 0 } } } ], "caused_by" : { "type" : "general_script_exception", "reason" : "Failed to compile inline script [\"aaaaaaaaaaaaaaaa\"] using lang [painless]", "caused_by" : { "type" : "circuit_breaking_exception", "reason" : "[script] Too many dynamic script compilations within one minute, max: [15/min]; please use on-disk, indexed, or scripts with parameters instead", "bytes_wanted" : 0, "bytes_limit" : 0 } } }, "status" : 500 } ``` This also fixes a bug in `ScriptService` where requests being executed concurrently on a single node could cause a script to be compiled multiple times (many in the case of a powerful node with many shards) due to no synchronization between checking the cache and compiling the script. There is now synchronization so that a script being compiled will only be compiled once regardless of the number of concurrent searches on a node. Relates to #19396
2016-07-26 13:36:29 -06:00
scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
resourcesToClose.add(resourceWatcherService);
final NetworkService networkService = new NetworkService(
2016-07-27 13:35:58 +02:00
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
clusterService.addStateApplier(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
2017-07-10 13:07:50 -07:00
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
listener::onNewInfo);
final UsageService usageService = new UsageService(settings);
2015-02-24 11:07:24 +01:00
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.createGuiceModules()) {
modules.add(pluginModule);
}
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule);
SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
modules.add(new GatewayModule());
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
NetworkModule.getNamedWriteables().stream(),
indicesModule.getNamedWriteables().stream(),
searchModule.getNamedWriteables().stream(),
pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedWriteables().stream()),
ClusterModule.getNamedWriteables().stream())
.flatMap(Function.identity()).collect(Collectors.toList());
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
NetworkModule.getNamedXContents().stream(),
searchModule.getNamedXContents().stream(),
pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedXContent().stream()),
ClusterModule.getNamedXWriteables().stream())
.flatMap(Function.identity()).collect(toList()));
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),
2017-07-26 10:20:40 +02:00
client, metaStateService);
Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,
namedWriteableRegistry).stream())
.collect(Collectors.toList());
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
modules.add(actionModule);
final RestController restController = actionModule.getRestController();
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
networkService, restController);
Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getCustomMetaDataUpgrader)
.collect(Collectors.toList());
Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders =
pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getIndexTemplateMetaDataUpgrader)
.collect(Collectors.toList());
Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);
final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry,
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders);
final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService,
metaDataIndexUpgradeService, metaDataUpgrader);
new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of("X-Opaque-Id")
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
Register data node stats from info carried back in search responses (#25430) * Register data node stats from info carried back in search responses This is part of #24915, where we now calculate the EWMA of service time for tasks in the search threadpool, and send that as well as the current queue size back to the coordinating node. The coordinating node now tracks this information for each node in the cluster. This information will be used in the future the determining the best replica a search request should be routed to. This change has no user-visible difference. * Move response time timing into ResponseListenerWrapper * Move ResponseListenerWrapper to ActionListener instead of SearchActionListener Also removes the logger * Move `requestIndex` back to private * De-guice-ify ResponseCollectorService \o/ * Undo all changes to SearchQueryThenFetchAsyncAction * Remove unneeded response collector from TransportSearchAction * Undo all changes to SearchDfsQueryThenFetchAsyncAction * Completely rewrite the inside of ResponseCollectorService's record keeping * Documentation and cleanups for ResponseCollectorService * Add unit test for collection of queue size and service time * Fix Guice construction error * Add basic unit tests for ResponseCollectorService * Fix version constant for the master merge * Fix test compilation after master merge * Add a test for node removal on cluster changed event * Remove integration test as there are now unit tests * Rename ResponseListenerWrapper -> SearchExecutionStatsCollector * Fix line-length * Make classes private and final where appropriate * Pass nodeId into SearchExecutionStatsCollector and use only ActionListener * Get nodeId from connection so searchShardTarget can be private * Remove threadpool from SearchContext, get it from IndexShard instead * Add missing import * Use BiFunction for responseWrapper rather than passing in collector service
2017-07-17 11:04:51 -06:00
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(settings, transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final Consumer<Binder> httpBind;
final HttpServerTransport httpServerTransport;
if (networkModule.isHttpEnabled()) {
httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
httpBind = b -> {
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
};
} else {
httpBind = b -> {
b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
};
httpServerTransport = null;
}
Separate publishing from applying cluster states (#24236) Separates cluster state publishing from applying cluster states: - ClusterService is split into two classes MasterService and ClusterApplierService. MasterService has the responsibility to calculate cluster state updates for actions that want to change the cluster state (create index, update shard routing table, etc.). ClusterApplierService has the responsibility to apply cluster states that have been successfully published and invokes the cluster state appliers and listeners. - ClusterApplierService keeps track of the last applied state, but MasterService is stateless and uses the last cluster state that is provided by the discovery module to calculate the next prospective state. The ClusterService class is still kept around, which now just delegates actions to ClusterApplierService and MasterService. - The discovery implementation is now responsible for managing the last cluster state that is used by the consensus layer and the master service. It also exposes the initial cluster state which is used by the ClusterApplierService. The discovery implementation is also responsible for adding the right cluster-level blocks to the initial state. - NoneDiscovery has been renamed to TribeDiscovery as it is exclusively used by TribeService. It adds the tribe blocks to the initial state. - ZenDiscovery is synchronized on state changes to the last cluster state that is used by the consensus layer and the master service, and does not submit cluster state update tasks anymore to make changes to the disco state (except when becoming master). Control flow for cluster state updates is now as follows: - State updates are sent to MasterService - MasterService gets the latest committed cluster state from the discovery implementation and calculates the next cluster state to publish - MasterService submits the new prospective cluster state to the discovery implementation for publishing - Discovery implementation publishes cluster states to all nodes and, once the state is committed, asks the ClusterApplierService to apply the newly committed state. - ClusterApplierService applies state to local node.
2017-04-28 09:34:31 +02:00
final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry,
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
clusterModule.getAllocationService());
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
searchTransportService);
final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService);
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
.filterPlugins(PersistentTaskPlugin.class).stream()
.map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client))
.flatMap(List::stream)
.collect(toList());
final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
final PersistentTasksClusterService persistentTasksClusterService =
new PersistentTasksClusterService(settings, registry, clusterService);
final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);
modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(Client.class).toInstance(client);
b.bind(NodeClient.class).toInstance(client);
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
b.bind(Environment.class).toInstance(this.environment);
b.bind(ThreadPool.class).toInstance(threadPool);
b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
b.bind(MetaStateService.class).toInstance(metaStateService);
b.bind(IndicesService.class).toInstance(indicesService);
b.bind(SearchService.class).toInstance(searchService);
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings,
searchService::createReduceContext));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(settings, transportService,
indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(settings, threadPool,
transportService, recoverySettings, clusterService));
}
httpBind.accept(b);
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
}
);
2015-02-24 11:07:24 +01:00
injector = modules.createInjector();
// TODO hack around circular dependencies problems in AllocationService
clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));
List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
.filter(p -> p instanceof LifecycleComponent)
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
.map(p -> (LifecycleComponent) p).collect(Collectors.toList());
pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
.map(injector::getInstance).collect(Collectors.toList()));
resourcesToClose.addAll(pluginLifecycleComponents);
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
if (NetworkModule.HTTP_ENABLED.get(settings)) {
logger.debug("initializing HTTP handlers ...");
actionModule.initRestHandlers(() -> clusterService.state().nodes());
}
Persistent Node Names (#19456) With #19140 we started persisting the node ID across node restarts. Now that we have a "stable" anchor, we can use it to generate a stable default node name and make it easier to track nodes over a restarts. Sadly, this means we will not have those random fun Marvel characters but we feel this is the right tradeoff. On the implementation side, this requires a bit of juggling because we now need to read the node id from disk before we can log as the node node is part of each log message. The PR move the initialization of NodeEnvironment as high up in the starting sequence as possible, with only one logging message before it to indicate we are initializing. Things look now like this: ``` [2016-07-15 19:38:39,742][INFO ][node ] [_unset_] initializing ... [2016-07-15 19:38:39,826][INFO ][node ] [aAmiW40] node name set to [aAmiW40] by default. set the [node.name] settings to change it [2016-07-15 19:38:39,829][INFO ][env ] [aAmiW40] using [1] data paths, mounts [[ /(/dev/disk1)]], net usable_space [5.5gb], net total_space [232.6gb], spins? [unknown], types [hfs] [2016-07-15 19:38:39,830][INFO ][env ] [aAmiW40] heap size [1.9gb], compressed ordinary object pointers [true] [2016-07-15 19:38:39,837][INFO ][node ] [aAmiW40] version[5.0.0-alpha5-SNAPSHOT], pid[46048], build[473d3c0/2016-07-15T17:38:06.771Z], OS[Mac OS X/10.11.5/x86_64], JVM[Oracle Corporation/Java HotSpot(TM) 64-Bit Server VM/1.8.0_51/25.51-b03] [2016-07-15 19:38:40,980][INFO ][plugins ] [aAmiW40] modules [percolator, lang-mustache, lang-painless, reindex, aggs-matrix-stats, lang-expression, ingest-common, lang-groovy, transport-netty], plugins [] [2016-07-15 19:38:43,218][INFO ][node ] [aAmiW40] initialized ``` Needless to say, settings `node.name` explicitly still works as before. The commit also contains some clean ups to the relationship between Environment, Settings and Plugins. The previous code suggested the path related settings could be changed after the initial Environment was changed. This did not have any effect as the security manager already locked things down.
2016-07-23 22:46:48 +02:00
logger.info("initialized");
2015-02-24 11:07:24 +01:00
success = true;
} catch (IOException ex) {
throw new ElasticsearchException("failed to bind service", ex);
2015-02-24 11:07:24 +01:00
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(resourcesToClose);
2015-02-24 11:07:24 +01:00
}
}
}
2010-02-08 15:30:06 +02:00
static void warnIfPreRelease(final Version version, final boolean isSnapshot, final Logger logger) {
if (!version.isRelease() || isSnapshot) {
logger.warn(
"version [{}] is a pre-release version of Elasticsearch and is not suitable for production",
Version.displayVersion(version, isSnapshot));
}
}
protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings, Set<String> taskHeaders) {
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders);
}
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
// Noop in production, overridden by tests
}
2010-02-13 20:03:37 +02:00
/**
* The settings that were used to create the node.
2010-02-13 20:03:37 +02:00
*/
2015-02-24 11:07:24 +01:00
public Settings settings() {
return this.settings;
}
2010-02-08 15:30:06 +02:00
2010-02-13 20:03:37 +02:00
/**
* A client that can be used to execute actions (operations) against the cluster.
*/
2015-02-24 11:07:24 +01:00
public Client client() {
return client;
}
2010-02-08 15:30:06 +02:00
/**
* Returns the environment of the node
*/
public Environment getEnvironment() {
return environment;
}
Testing infra - stablize data folder usage and clean up (#19111) The plan for persistent node ids ( #17811 ) is to tie the node identity to a file stored in it's data folders. As such it becomes important that nodes in our testing infra have better affinity with their data folders and that their data folders are not cleaned underneath them. The first is important because we fix the random seed used for node id generation (for reproducibility) and allowing the same node to use two different data folders causes two separate nodes to have the same id, which prevents the cluster from forming. The second is important, for example, where a full cluster restart / single node restart need to maintain node identity and wiping the data folders at the wrong moment prevents this. Concretely this commit does the following: 1) Remove previous attempts to have data folder per role using a prefix. This wasn't effective as it was using the data paths settings which are only used for part of the runs. An attempt to completely separate the paths via the home dir failed due to assumptions made by index custom path about node data folder ordinal uniqueness (see #19076) 2) Change full cluster restarts to start up nodes in the same order their were first created in, only randomly swapping nodes with the same roles. 3) Change test cluster reset methods to first shutdown the unneeded nodes and then re-start the shared nodes that were shut down, so they'll reclaim their data folders. 4) Improve data folder wiping logic and make sure it wipes only folders of "offline" nodes. 5) Add some very basic tests
2016-06-28 16:38:56 +02:00
/**
* Returns the {@link NodeEnvironment} instance of this node
*/
public NodeEnvironment getNodeEnvironment() {
return nodeEnvironment;
}
2010-02-13 20:03:37 +02:00
/**
* Start the node. If the node is already started, this method is no-op.
2010-02-13 20:03:37 +02:00
*/
public Node start() throws NodeValidationException {
if (!lifecycle.moveToStarted()) {
return this;
}
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
2015-02-24 11:07:24 +01:00
logger.info("starting ...");
pluginLifecycleComponents.forEach(LifecycleComponent::start);
2015-02-24 11:07:24 +01:00
injector.getInstance(MappingUpdatedAction.class).setClient(client);
2015-02-24 11:07:24 +01:00
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
2015-02-24 11:07:24 +01:00
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
nodeService.getMonitorService().start();
Async fetch of shard started and store during allocation Today, when a primary shard is not allocated we go to all the nodes to find where it is allocated (listing its started state). When we allocate a replica shard, we head to all the nodes and list its store to allocate the replica on a node that holds the closest matching index files to the primary. Those two operations today execute synchronously within the GatewayAllocator, which means they execute in a sync manner within the cluster update thread. For large clusters, or environments with very slow disk, those operations will stall the cluster update thread, making it seem like its stuck. Worse, if the FS is really slow, we timeout after 30s the operation (to not stall the cluster update thread completely). This means that we will have another run for the primary shard if we didn't find one, or we won't find the best node to place a shard since it might have timed out (listing stores need to list all files and read the checksum at the end of each file). On top of that, this sync operation happen one shard at a time, so its effectively compounding the problem in a serial manner the more shards we have and the slower FS is... This change moves to perform both listing the shard started states and the shard stores to an async manner. During the allocation by the GatewayAllocator, if data needs to be fetched from a node, it is done in an async fashion, with the response triggering a reroute to make sure the results will be taken into account. Also, if there are on going operations happening, the relevant shard data will not be taken into account until all the ongoing listing operations are done executing. The execution of listing shard states and stores has been moved to their own respective thread pools (scaling, so will go down to 0 when not needed anymore, unbounded queue, since we don't want to timeout, just let it execute based on how fast the local FS is). This is needed sine we are going to blast nodes with a lot of requests and we need to make sure there is no thread explosion. This change also improves the handling of shard failures coming from a specific node. Today, those nodes were ignored from allocation only for the single reroute round. Now, since fetching is async, we need to keep those failures around at least until a single successful fetch without the node is done, to make sure not to repeat allocating to the failed node all the time. Note, if before the indication of slow allocation was high pending tasks since the allocator was waiting for responses, not the pending tasks will be much smaller. In order to still indicate that the cluster is in the middle of fetching shard data, 2 attributes were added to the cluster health API, indicating the number of ongoing fetches of both started shards and shard store. closes #9502 closes #11101
2015-05-11 01:19:13 +02:00
final ClusterService clusterService = injector.getInstance(ClusterService.class);
Async fetch of shard started and store during allocation Today, when a primary shard is not allocated we go to all the nodes to find where it is allocated (listing its started state). When we allocate a replica shard, we head to all the nodes and list its store to allocate the replica on a node that holds the closest matching index files to the primary. Those two operations today execute synchronously within the GatewayAllocator, which means they execute in a sync manner within the cluster update thread. For large clusters, or environments with very slow disk, those operations will stall the cluster update thread, making it seem like its stuck. Worse, if the FS is really slow, we timeout after 30s the operation (to not stall the cluster update thread completely). This means that we will have another run for the primary shard if we didn't find one, or we won't find the best node to place a shard since it might have timed out (listing stores need to list all files and read the checksum at the end of each file). On top of that, this sync operation happen one shard at a time, so its effectively compounding the problem in a serial manner the more shards we have and the slower FS is... This change moves to perform both listing the shard started states and the shard stores to an async manner. During the allocation by the GatewayAllocator, if data needs to be fetched from a node, it is done in an async fashion, with the response triggering a reroute to make sure the results will be taken into account. Also, if there are on going operations happening, the relevant shard data will not be taken into account until all the ongoing listing operations are done executing. The execution of listing shard states and stores has been moved to their own respective thread pools (scaling, so will go down to 0 when not needed anymore, unbounded queue, since we don't want to timeout, just let it execute based on how fast the local FS is). This is needed sine we are going to blast nodes with a lot of requests and we need to make sure there is no thread explosion. This change also improves the handling of shard failures coming from a specific node. Today, those nodes were ignored from allocation only for the single reroute round. Now, since fetching is async, we need to keep those failures around at least until a single successful fetch without the node is done, to make sure not to repeat allocating to the failed node all the time. Note, if before the indication of slow allocation was high pending tasks since the allocator was waiting for responses, not the pending tasks will be much smaller. In order to still indicate that the cluster is in the middle of fetching shard data, 2 attributes were added to the cluster health API, indicating the number of ongoing fetches of both started shards and shard store. closes #9502 closes #11101
2015-05-11 01:19:13 +02:00
final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
nodeConnectionsService.start();
clusterService.setNodeConnectionsService(nodeConnectionsService);
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(GatewayService.class).start();
Discovery discovery = injector.getInstance(Discovery.class);
Separate publishing from applying cluster states (#24236) Separates cluster state publishing from applying cluster states: - ClusterService is split into two classes MasterService and ClusterApplierService. MasterService has the responsibility to calculate cluster state updates for actions that want to change the cluster state (create index, update shard routing table, etc.). ClusterApplierService has the responsibility to apply cluster states that have been successfully published and invokes the cluster state appliers and listeners. - ClusterApplierService keeps track of the last applied state, but MasterService is stateless and uses the last cluster state that is provided by the discovery module to calculate the next prospective state. The ClusterService class is still kept around, which now just delegates actions to ClusterApplierService and MasterService. - The discovery implementation is now responsible for managing the last cluster state that is used by the consensus layer and the master service. It also exposes the initial cluster state which is used by the ClusterApplierService. The discovery implementation is also responsible for adding the right cluster-level blocks to the initial state. - NoneDiscovery has been renamed to TribeDiscovery as it is exclusively used by TribeService. It adds the tribe blocks to the initial state. - ZenDiscovery is synchronized on state changes to the last cluster state that is used by the consensus layer and the master service, and does not submit cluster state update tasks anymore to make changes to the disco state (except when becoming master). Control flow for cluster state updates is now as follows: - State updates are sent to MasterService - MasterService gets the latest committed cluster state from the discovery implementation and calculates the next cluster state to publish - MasterService submits the new prospective cluster state to the discovery implementation for publishing - Discovery implementation publishes cluster states to all nodes and, once the state is committed, asks the ClusterApplierService to apply the newly committed state. - ClusterApplierService applies state to local node.
2017-04-28 09:34:31 +02:00
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
transportService.start();
Separate publishing from applying cluster states (#24236) Separates cluster state publishing from applying cluster states: - ClusterService is split into two classes MasterService and ClusterApplierService. MasterService has the responsibility to calculate cluster state updates for actions that want to change the cluster state (create index, update shard routing table, etc.). ClusterApplierService has the responsibility to apply cluster states that have been successfully published and invokes the cluster state appliers and listeners. - ClusterApplierService keeps track of the last applied state, but MasterService is stateless and uses the last cluster state that is provided by the discovery module to calculate the next prospective state. The ClusterService class is still kept around, which now just delegates actions to ClusterApplierService and MasterService. - The discovery implementation is now responsible for managing the last cluster state that is used by the consensus layer and the master service. It also exposes the initial cluster state which is used by the ClusterApplierService. The discovery implementation is also responsible for adding the right cluster-level blocks to the initial state. - NoneDiscovery has been renamed to TribeDiscovery as it is exclusively used by TribeService. It adds the tribe blocks to the initial state. - ZenDiscovery is synchronized on state changes to the last cluster state that is used by the consensus layer and the master service, and does not submit cluster state update tasks anymore to make changes to the disco state (except when becoming master). Control flow for cluster state updates is now as follows: - State updates are sent to MasterService - MasterService gets the latest committed cluster state from the discovery implementation and calculates the next cluster state to publish - MasterService submits the new prospective cluster state to the discovery implementation for publishing - Discovery implementation publishes cluster states to all nodes and, once the state is committed, asks the ClusterApplierService to apply the newly committed state. - ClusterApplierService applies state to local node.
2017-04-28 09:34:31 +02:00
assert localNodeFactory.getNode() != null;
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
final MetaData onDiskMetadata;
try {
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
} else {
onDiskMetadata = MetaData.EMPTY_META_DATA;
}
assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
} catch (IOException e) {
throw new UncheckedIOException(e);
}
validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
.filterPlugins(Plugin
.class)
.stream()
.flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
ClusterService should expose "applied" cluster states (i.e., remove ClusterStateStatus) (#21817) `ClusterService` is responsible of updating the cluster state on every node (as a response to an API call on the master and when non-masters receive a new state from the master). When a new cluster state is processed, it is made visible via the `ClusterService#state` method and is sent to series of listeners. Those listeners come in two flavours - one is to change the state of the node in response to the new cluster state (call these cluster state appliers), the other is to start a secondary process. Examples for the later include an indexing operation waiting for a shard to be started or a master node action waiting for a master to be elected. The fact that we expose the state before applying it means that samplers of the cluster state had to worry about two things - working based on a stale CS and working based on a future, i.e., "being applied" CS. The `ClusterStateStatus` was used to allow distinguishing between the two. Working with a stale cluster state is not avoidable. How this PR changes things to make sure consumers don't need to worry about future CS, removing the need for the status and simplifying the waiting logic. This change does come with a price as "cluster state appliers" can't sample the cluster state from `ClusterService` whenever they want as the cluster state isn't exposed yet. However, recent clean ups made this is situation easier and this PR takes the last steps to remove such sampling. This also helps clarify the "information flow" and helps component separation (and thus potential unit testing). It also adds an assertion that will trigger if the cluster state is sampled by such listeners. Note that there are still many "appliers" that could be made a simpler, unrestricted "listener" but this can be done in smaller bits in the future. The commit also makes it clear what the `appliers` and what the `listeners` are by using dedicated interfaces. Also, since I had to change the listener types I went ahead and changed the data structure for temporary/timeout listeners (used for the observer) so addition and removal won't be an O(n) operation.
2016-12-15 17:06:25 +01:00
clusterService.addStateApplier(transportService.getTaskManager());
Separate publishing from applying cluster states (#24236) Separates cluster state publishing from applying cluster states: - ClusterService is split into two classes MasterService and ClusterApplierService. MasterService has the responsibility to calculate cluster state updates for actions that want to change the cluster state (create index, update shard routing table, etc.). ClusterApplierService has the responsibility to apply cluster states that have been successfully published and invokes the cluster state appliers and listeners. - ClusterApplierService keeps track of the last applied state, but MasterService is stateless and uses the last cluster state that is provided by the discovery module to calculate the next prospective state. The ClusterService class is still kept around, which now just delegates actions to ClusterApplierService and MasterService. - The discovery implementation is now responsible for managing the last cluster state that is used by the consensus layer and the master service. It also exposes the initial cluster state which is used by the ClusterApplierService. The discovery implementation is also responsible for adding the right cluster-level blocks to the initial state. - NoneDiscovery has been renamed to TribeDiscovery as it is exclusively used by TribeService. It adds the tribe blocks to the initial state. - ZenDiscovery is synchronized on state changes to the last cluster state that is used by the consensus layer and the master service, and does not submit cluster state update tasks anymore to make changes to the disco state (except when becoming master). Control flow for cluster state updates is now as follows: - State updates are sent to MasterService - MasterService gets the latest committed cluster state from the discovery implementation and calculates the next cluster state to publish - MasterService submits the new prospective cluster state to the discovery implementation for publishing - Discovery implementation publishes cluster states to all nodes and, once the state is committed, asks the ClusterApplierService to apply the newly committed state. - ClusterApplierService applies state to local node.
2017-04-28 09:34:31 +02:00
// start after transport service so the local disco is known
discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
clusterService.start();
assert clusterService.localNode().equals(localNodeFactory.getNode())
: "clusterService has a different local node than the factory provided";
transportService.acceptIncomingRequests();
discovery.startInitialJoin();
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
if (initialStateTimeout.millis() > 0) {
final ThreadPool thread = injector.getInstance(ThreadPool.class);
Let ClusterStateObserver only hold onto state that's needed for change detection (#21631) ClusterStateObserver is a utility class that simplifies interacting with the cluster state in cases where an action takes a decision based on the current cluster state but may want to wait for a new state and retry upon failure. The ClusterStateObserver implements its functionality by keeping a reference to the last cluster state that it observed. When a new ClusterStateObserver is created, it samples a cluster state from the cluster service which is subsequently used for change detection. If actions take a long time to process, however, the cluster observer can reference very old cluster states. Due to cluster observers being created very frequently and cluster states being potentially large the referenced cluster states can waste a lot of heap space. A specific example where this can make a node go out of memory is given in point 2 of issue #21568: The action listener in TransportMasterNodeAction.AsyncSingleAction has a ClusterStateObserver to coordinate the retry mechanism if the action on the master node fails due to the node not being master anymore. The ClusterStateObserver in AsyncSingleAction keeps a reference to the full cluster state when the action was initiated. If the pending tasks queue grows quite large and has older items in it lots of cluster states can possibly be referenced. This commit changes the ClusterStateObserver to hold only onto the part of the cluster state that's needed for change detection.
2016-12-20 15:16:04 +01:00
ClusterState clusterState = clusterService.state();
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
if (clusterState.nodes().getMasterNodeId() == null) {
logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
final CountDownLatch latch = new CountDownLatch(1);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) { latch.countDown(); }
@Override
public void onClusterServiceClose() {
latch.countDown();
}
@Override
public void onTimeout(TimeValue timeout) {
logger.warn("timed out while waiting for initial discovery state - timeout: {}",
initialStateTimeout);
latch.countDown();
}
ClusterService should expose "applied" cluster states (i.e., remove ClusterStateStatus) (#21817) `ClusterService` is responsible of updating the cluster state on every node (as a response to an API call on the master and when non-masters receive a new state from the master). When a new cluster state is processed, it is made visible via the `ClusterService#state` method and is sent to series of listeners. Those listeners come in two flavours - one is to change the state of the node in response to the new cluster state (call these cluster state appliers), the other is to start a secondary process. Examples for the later include an indexing operation waiting for a shard to be started or a master node action waiting for a master to be elected. The fact that we expose the state before applying it means that samplers of the cluster state had to worry about two things - working based on a stale CS and working based on a future, i.e., "being applied" CS. The `ClusterStateStatus` was used to allow distinguishing between the two. Working with a stale cluster state is not avoidable. How this PR changes things to make sure consumers don't need to worry about future CS, removing the need for the status and simplifying the waiting logic. This change does come with a price as "cluster state appliers" can't sample the cluster state from `ClusterService` whenever they want as the cluster state isn't exposed yet. However, recent clean ups made this is situation easier and this PR takes the last steps to remove such sampling. This also helps clarify the "information flow" and helps component separation (and thus potential unit testing). It also adds an assertion that will trigger if the cluster state is sampled by such listeners. Note that there are still many "appliers" that could be made a simpler, unrestricted "listener" but this can be done in smaller bits in the future. The commit also makes it clear what the `appliers` and what the `listeners` are by using dedicated interfaces. Also, since I had to change the listener types I went ahead and changed the data structure for temporary/timeout listeners (used for the observer) so addition and removal won't be an O(n) operation.
2016-12-15 17:06:25 +01:00
}, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
try {
latch.await();
} catch (InterruptedException e) {
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
}
}
}
2015-02-24 11:07:24 +01:00
2016-12-20 22:23:11 +01:00
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServerTransport.class).start();
2015-02-24 11:07:24 +01:00
}
if (WRITE_PORTS_FILE_SETTING.get(settings)) {
if (NetworkModule.HTTP_ENABLED.get(settings)) {
HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
writePortsFile("http", http.boundAddress());
}
TransportService transport = injector.getInstance(TransportService.class);
writePortsFile("transport", transport.boundAddress());
}
2015-02-24 11:07:24 +01:00
logger.info("started");
pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
2015-02-24 11:07:24 +01:00
return this;
}
private Node stop() {
2015-02-24 11:07:24 +01:00
if (!lifecycle.moveToStopped()) {
return this;
}
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
2015-02-24 11:07:24 +01:00
logger.info("stopping ...");
injector.getInstance(ResourceWatcherService.class).stop();
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServerTransport.class).stop();
2015-02-24 11:07:24 +01:00
}
injector.getInstance(SnapshotsService.class).stop();
injector.getInstance(SnapshotShardsService.class).stop();
2015-02-24 11:07:24 +01:00
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
// close discovery early to not react to pings anymore.
// This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
injector.getInstance(Discovery.class).stop();
2015-02-24 11:07:24 +01:00
// we close indices first, so operations won't be allowed on it
injector.getInstance(RoutingService.class).stop();
injector.getInstance(ClusterService.class).stop();
injector.getInstance(NodeConnectionsService.class).stop();
nodeService.getMonitorService().stop();
2015-02-24 11:07:24 +01:00
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
pluginLifecycleComponents.forEach(LifecycleComponent::stop);
2015-02-24 11:07:24 +01:00
// we should stop this last since it waits for resources to get released
// if we had scroll searchers etc or recovery going on we wait for to finish.
injector.getInstance(IndicesService.class).stop();
logger.info("stopped");
return this;
}
// During concurrent close() calls we want to make sure that all of them return after the node has completed it's shutdown cycle.
// If not, the hook that is added in Bootstrap#setup() will be useless: close() might not be executed, in case another (for example api) call
// to close() has already set some lifecycles to stopped. In this case the process will be terminated even if the first call to close() has not finished yet.
@Override
public synchronized void close() throws IOException {
2015-02-24 11:07:24 +01:00
if (lifecycle.started()) {
stop();
}
if (!lifecycle.moveToClosed()) {
return;
}
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
2015-02-24 11:07:24 +01:00
logger.info("closing ...");
List<Closeable> toClose = new ArrayList<>();
2015-02-24 11:07:24 +01:00
StopWatch stopWatch = new StopWatch("node_close");
toClose.add(() -> stopWatch.start("tribe"));
toClose.add(() -> stopWatch.stop().start("node_service"));
toClose.add(nodeService);
toClose.add(() -> stopWatch.stop().start("http"));
if (NetworkModule.HTTP_ENABLED.get(settings)) {
toClose.add(injector.getInstance(HttpServerTransport.class));
2015-02-24 11:07:24 +01:00
}
toClose.add(() -> stopWatch.stop().start("snapshot_service"));
toClose.add(injector.getInstance(SnapshotsService.class));
toClose.add(injector.getInstance(SnapshotShardsService.class));
toClose.add(() -> stopWatch.stop().start("client"));
2015-02-24 11:07:24 +01:00
Releasables.close(injector.getInstance(Client.class));
toClose.add(() -> stopWatch.stop().start("indices_cluster"));
toClose.add(injector.getInstance(IndicesClusterStateService.class));
toClose.add(() -> stopWatch.stop().start("indices"));
toClose.add(injector.getInstance(IndicesService.class));
// close filter/fielddata caches after indices
toClose.add(injector.getInstance(IndicesStore.class));
toClose.add(() -> stopWatch.stop().start("routing"));
toClose.add(injector.getInstance(RoutingService.class));
toClose.add(() -> stopWatch.stop().start("cluster"));
toClose.add(injector.getInstance(ClusterService.class));
toClose.add(() -> stopWatch.stop().start("node_connections_service"));
toClose.add(injector.getInstance(NodeConnectionsService.class));
toClose.add(() -> stopWatch.stop().start("discovery"));
toClose.add(injector.getInstance(Discovery.class));
toClose.add(() -> stopWatch.stop().start("monitor"));
toClose.add(nodeService.getMonitorService());
toClose.add(() -> stopWatch.stop().start("gateway"));
toClose.add(injector.getInstance(GatewayService.class));
toClose.add(() -> stopWatch.stop().start("search"));
toClose.add(injector.getInstance(SearchService.class));
toClose.add(() -> stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
2015-02-24 11:07:24 +01:00
for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
toClose.add(plugin);
2015-02-24 11:07:24 +01:00
}
toClose.addAll(pluginsService.filterPlugins(Plugin.class));
2015-02-24 11:07:24 +01:00
toClose.add(() -> stopWatch.stop().start("script"));
toClose.add(injector.getInstance(ScriptService.class));
2015-02-24 11:07:24 +01:00
toClose.add(() -> stopWatch.stop().start("thread_pool"));
2015-02-24 11:07:24 +01:00
// TODO this should really use ThreadPool.terminate()
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());
toClose.add(() -> {
try {
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
});
toClose.add(() -> stopWatch.stop().start("thread_pool_force_shutdown"));
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdownNow());
toClose.add(() -> stopWatch.stop());
toClose.add(injector.getInstance(NodeEnvironment.class));
toClose.add(injector.getInstance(BigArrays.class));
2015-02-24 11:07:24 +01:00
if (logger.isTraceEnabled()) {
logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint());
}
IOUtils.close(toClose);
2015-02-24 11:07:24 +01:00
logger.info("closed");
}
/**
* Returns <tt>true</tt> if the node is closed.
*/
2015-02-24 11:07:24 +01:00
public boolean isClosed() {
return lifecycle.closed();
}
public Injector injector() {
return this.injector;
}
/**
* Hook for validating the node after network
* services are started but before the cluster service is started
* and before the network service starts accepting incoming network
* requests.
*
* @param context the bootstrap context for this node
* @param boundTransportAddress the network addresses the node is
* bound and publishing to
*/
@SuppressWarnings("unused")
protected void validateNodeBeforeAcceptingRequests(
final BootstrapContext context,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> bootstrapChecks) throws NodeValidationException {
}
/** Writes a file to the logs dir containing the ports for the given transport type */
private void writePortsFile(String type, BoundTransportAddress boundAddress) {
Path tmpPortsFile = environment.logsFile().resolve(type + ".ports.tmp");
try (BufferedWriter writer = Files.newBufferedWriter(tmpPortsFile, Charset.forName("UTF-8"))) {
for (TransportAddress address : boundAddress.boundAddresses()) {
InetAddress inetAddress = InetAddress.getByName(address.getAddress());
writer.write(NetworkAddress.format(new InetSocketAddress(inetAddress, address.getPort())) + "\n");
}
} catch (IOException e) {
throw new RuntimeException("Failed to write ports file", e);
}
Path portsFile = environment.logsFile().resolve(type + ".ports");
try {
Files.move(tmpPortsFile, portsFile, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
throw new RuntimeException("Failed to rename ports file", e);
}
}
/**
* The {@link PluginsService} used to build this node's components.
*/
protected PluginsService getPluginsService() {
return pluginsService;
}
/**
* Creates a new {@link CircuitBreakerService} based on the settings provided.
* @see #BREAKER_TYPE_KEY
*/
public static CircuitBreakerService createCircuitBreakerService(Settings settings, ClusterSettings clusterSettings) {
String type = BREAKER_TYPE_KEY.get(settings);
if (type.equals("hierarchy")) {
return new HierarchyCircuitBreakerService(settings, clusterSettings);
} else if (type.equals("none")) {
return new NoneCircuitBreakerService();
} else {
throw new IllegalArgumentException("Unknown circuit breaker type [" + type + "]");
}
}
/**
* Creates a new {@link BigArrays} instance used for this node.
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
*/
BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) {
return new BigArrays(pageCacheRecycler, circuitBreakerService);
}
/**
* Creates a new {@link BigArrays} instance used for this node.
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
*/
PageCacheRecycler createPageCacheRecycler(Settings settings) {
return new PageCacheRecycler(settings);
}
/**
* Creates a new the SearchService. This method can be overwritten by tests to inject mock implementations.
*/
protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
Register data node stats from info carried back in search responses (#25430) * Register data node stats from info carried back in search responses This is part of #24915, where we now calculate the EWMA of service time for tasks in the search threadpool, and send that as well as the current queue size back to the coordinating node. The coordinating node now tracks this information for each node in the cluster. This information will be used in the future the determining the best replica a search request should be routed to. This change has no user-visible difference. * Move response time timing into ResponseListenerWrapper * Move ResponseListenerWrapper to ActionListener instead of SearchActionListener Also removes the logger * Move `requestIndex` back to private * De-guice-ify ResponseCollectorService \o/ * Undo all changes to SearchQueryThenFetchAsyncAction * Remove unneeded response collector from TransportSearchAction * Undo all changes to SearchDfsQueryThenFetchAsyncAction * Completely rewrite the inside of ResponseCollectorService's record keeping * Documentation and cleanups for ResponseCollectorService * Add unit test for collection of queue size and service time * Fix Guice construction error * Add basic unit tests for ResponseCollectorService * Fix version constant for the master merge * Fix test compilation after master merge * Add a test for node removal on cluster changed event * Remove integration test as there are now unit tests * Rename ResponseListenerWrapper -> SearchExecutionStatsCollector * Fix line-length * Make classes private and final where appropriate * Pass nodeId into SearchExecutionStatsCollector and use only ActionListener * Get nodeId from connection so searchShardTarget can be private * Remove threadpool from SearchContext, get it from IndexShard instead * Add missing import * Use BiFunction for responseWrapper rather than passing in collector service
2017-07-17 11:04:51 -06:00
FetchPhase fetchPhase, ResponseCollectorService responseCollectorService) {
return new SearchService(clusterService, indicesService, threadPool,
scriptService, bigArrays, fetchPhase, responseCollectorService);
}
/**
2016-07-27 13:35:58 +02:00
* Get Custom Name Resolvers list based on a Discovery Plugins list
* @param discoveryPlugins Discovery plugins list
*/
2016-07-27 13:35:58 +02:00
private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<DiscoveryPlugin> discoveryPlugins) {
List<NetworkService.CustomNameResolver> customNameResolvers = new ArrayList<>();
for (DiscoveryPlugin discoveryPlugin : discoveryPlugins) {
NetworkService.CustomNameResolver customNameResolver = discoveryPlugin.getCustomNameResolver(settings);
if (customNameResolver != null) {
customNameResolvers.add(customNameResolver);
}
}
return customNameResolvers;
}
/** Constructs a ClusterInfoService which may be mocked for tests. */
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
ThreadPool threadPool, NodeClient client, Consumer<ClusterInfo> listeners) {
return new InternalClusterInfoService(settings, clusterService, threadPool, client, listeners);
}
private static class LocalNodeFactory implements Function<BoundTransportAddress, DiscoveryNode> {
private final SetOnce<DiscoveryNode> localNode = new SetOnce<>();
private final String persistentNodeId;
private final Settings settings;
private LocalNodeFactory(Settings settings, String persistentNodeId) {
this.persistentNodeId = persistentNodeId;
this.settings = settings;
}
@Override
public DiscoveryNode apply(BoundTransportAddress boundTransportAddress) {
localNode.set(DiscoveryNode.createLocal(settings, boundTransportAddress.publishAddress(), persistentNodeId));
return localNode.get();
}
DiscoveryNode getNode() {
assert localNode.get() != null;
return localNode.get();
}
}
2010-02-08 15:30:06 +02:00
}