rename cluster settings service to node settings service, as it better reflects the fact that settings are applied on the node level
This commit is contained in:
parent
03217c460a
commit
6e81fbc30d
|
@ -37,7 +37,6 @@ import org.elasticsearch.cluster.routing.RoutingService;
|
|||
import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule;
|
||||
import org.elasticsearch.cluster.routing.operation.OperationRoutingModule;
|
||||
import org.elasticsearch.cluster.service.InternalClusterService;
|
||||
import org.elasticsearch.cluster.settings.ClusterSettingsService;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
|
@ -63,8 +62,6 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
|
|||
protected void configure() {
|
||||
bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton();
|
||||
|
||||
bind(ClusterSettingsService.class).asEagerSingleton();
|
||||
|
||||
bind(MetaDataCreateIndexService.class).asEagerSingleton();
|
||||
bind(MetaDataDeleteIndexService.class).asEagerSingleton();
|
||||
bind(MetaDataStateIndexService.class).asEagerSingleton();
|
||||
|
|
|
@ -21,10 +21,10 @@ package org.elasticsearch.cluster.routing.allocation;
|
|||
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.settings.ClusterSettingsService;
|
||||
import org.elasticsearch.common.collect.ImmutableSet;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -37,11 +37,11 @@ public class NodeAllocations extends NodeAllocation {
|
|||
|
||||
private final NodeAllocation[] allocations;
|
||||
|
||||
public NodeAllocations(Settings settings, ClusterSettingsService clusterSettingsService) {
|
||||
public NodeAllocations(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
this(settings, ImmutableSet.<NodeAllocation>builder()
|
||||
.add(new SameShardNodeAllocation(settings))
|
||||
.add(new ReplicaAfterPrimaryActiveNodeAllocation(settings))
|
||||
.add(new ThrottlingNodeAllocation(settings, clusterSettingsService))
|
||||
.add(new ThrottlingNodeAllocation(settings, nodeSettingsService))
|
||||
.add(new RebalanceOnlyWhenActiveNodeAllocation(settings))
|
||||
.add(new ClusterRebalanceNodeAllocation(settings))
|
||||
.add(new ConcurrentRebalanceNodeAllocation(settings))
|
||||
|
|
|
@ -28,11 +28,11 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
|
|||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.settings.ClusterSettingsService;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -53,7 +53,7 @@ public class ShardsAllocation extends AbstractComponent {
|
|||
}
|
||||
|
||||
public ShardsAllocation(Settings settings) {
|
||||
this(settings, new NodeAllocations(settings, new ClusterSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)));
|
||||
this(settings, new NodeAllocations(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)));
|
||||
}
|
||||
|
||||
@Inject public ShardsAllocation(Settings settings, NodeAllocations nodeAllocations) {
|
||||
|
|
|
@ -24,9 +24,9 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
|
|||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.settings.ClusterSettingsService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
@ -43,14 +43,14 @@ public class ThrottlingNodeAllocation extends NodeAllocation {
|
|||
private volatile int primariesInitialRecoveries;
|
||||
private volatile int concurrentRecoveries;
|
||||
|
||||
@Inject public ThrottlingNodeAllocation(Settings settings, ClusterSettingsService clusterSettingsService) {
|
||||
@Inject public ThrottlingNodeAllocation(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
super(settings);
|
||||
|
||||
this.primariesInitialRecoveries = componentSettings.getAsInt("node_initial_primaries_recoveries", 4);
|
||||
this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", componentSettings.getAsInt("node_concurrent_recoveries", 2));
|
||||
logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries);
|
||||
|
||||
clusterSettingsService.addListener(new ApplySettings());
|
||||
nodeSettingsService.addListener(new ApplySettings());
|
||||
}
|
||||
|
||||
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
|
@ -95,7 +95,7 @@ public class ThrottlingNodeAllocation extends NodeAllocation {
|
|||
}
|
||||
}
|
||||
|
||||
class ApplySettings implements ClusterSettingsService.Listener {
|
||||
class ApplySettings implements NodeSettingsService.Listener {
|
||||
@Override public void onRefreshSettings(Settings settings) {
|
||||
int primariesInitialRecoveries = settings.getAsInt("cluster.routing.allocation.node_initial_primaries_recoveries", ThrottlingNodeAllocation.this.primariesInitialRecoveries);
|
||||
if (primariesInitialRecoveries != ThrottlingNodeAllocation.this.primariesInitialRecoveries) {
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.operation.OperationRouting;
|
||||
import org.elasticsearch.cluster.settings.ClusterSettingsService;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -43,6 +42,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoveryService;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -71,7 +71,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
|||
|
||||
private final TransportService transportService;
|
||||
|
||||
private final ClusterSettingsService clusterSettingsService;
|
||||
private final NodeSettingsService nodeSettingsService;
|
||||
|
||||
private final TimeValue reconnectInterval;
|
||||
|
||||
|
@ -89,21 +89,21 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
|||
private volatile ScheduledFuture reconnectToNodes;
|
||||
|
||||
@Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, OperationRouting operationRouting, TransportService transportService,
|
||||
ClusterSettingsService clusterSettingsService, ThreadPool threadPool) {
|
||||
NodeSettingsService nodeSettingsService, ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.operationRouting = operationRouting;
|
||||
this.transportService = transportService;
|
||||
this.discoveryService = discoveryService;
|
||||
this.threadPool = threadPool;
|
||||
this.clusterSettingsService = clusterSettingsService;
|
||||
this.nodeSettingsService = nodeSettingsService;
|
||||
|
||||
this.clusterSettingsService.setClusterService(this);
|
||||
this.nodeSettingsService.setClusterService(this);
|
||||
|
||||
this.reconnectInterval = componentSettings.getAsTime("reconnect_interval", TimeValue.timeValueSeconds(10));
|
||||
}
|
||||
|
||||
public ClusterSettingsService settingsService() {
|
||||
return this.clusterSettingsService;
|
||||
public NodeSettingsService settingsService() {
|
||||
return this.nodeSettingsService;
|
||||
}
|
||||
|
||||
public void addInitialStateBlock(ClusterBlock block) throws ElasticSearchIllegalStateException {
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.settings.ClusterSettingsService;
|
||||
import org.elasticsearch.common.UUID;
|
||||
import org.elasticsearch.common.collect.Sets;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
|
@ -49,6 +48,7 @@ import org.elasticsearch.discovery.zen.ping.ZenPing;
|
|||
import org.elasticsearch.discovery.zen.ping.ZenPingService;
|
||||
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -109,7 +109,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
private final AtomicBoolean initialStateSent = new AtomicBoolean();
|
||||
|
||||
@Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
|
||||
TransportService transportService, ClusterService clusterService, ClusterSettingsService clusterSettingsService,
|
||||
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
|
||||
ZenPingService pingService) {
|
||||
super(settings);
|
||||
this.clusterName = clusterName;
|
||||
|
@ -124,7 +124,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
|
||||
logger.debug("using ping.timeout [{}]", pingTimeout);
|
||||
|
||||
this.electMaster = new ElectMasterService(settings, clusterSettingsService);
|
||||
this.electMaster = new ElectMasterService(settings, nodeSettingsService);
|
||||
|
||||
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
|
||||
this.masterFD.addListener(new MasterNodeFailureListener());
|
||||
|
|
|
@ -21,10 +21,10 @@ package org.elasticsearch.discovery.zen.elect;
|
|||
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.settings.ClusterSettingsService;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
@ -44,11 +44,11 @@ public class ElectMasterService extends AbstractComponent {
|
|||
|
||||
private volatile int minimumMasterNodes;
|
||||
|
||||
public ElectMasterService(Settings settings, ClusterSettingsService clusterSettingsService) {
|
||||
public ElectMasterService(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
super(settings);
|
||||
this.minimumMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1);
|
||||
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
|
||||
clusterSettingsService.addListener(new ApplySettings());
|
||||
nodeSettingsService.addListener(new ApplySettings());
|
||||
}
|
||||
|
||||
public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) {
|
||||
|
@ -111,7 +111,7 @@ public class ElectMasterService extends AbstractComponent {
|
|||
return possibleNodes;
|
||||
}
|
||||
|
||||
class ApplySettings implements ClusterSettingsService.Listener {
|
||||
class ApplySettings implements NodeSettingsService.Listener {
|
||||
@Override public void onRefreshSettings(Settings settings) {
|
||||
int minimumMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", ElectMasterService.this.minimumMasterNodes);
|
||||
if (minimumMasterNodes != ElectMasterService.this.minimumMasterNodes) {
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.index.shard.recovery;
|
|||
import org.apache.lucene.store.IndexInput;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.settings.ClusterSettingsService;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.collect.Sets;
|
||||
|
@ -43,6 +42,7 @@ import org.elasticsearch.index.shard.service.InternalIndexShard;
|
|||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -91,7 +91,7 @@ public class RecoverySource extends AbstractComponent {
|
|||
private final ThreadPoolExecutor concurrentStreamPool;
|
||||
|
||||
@Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
|
||||
ClusterSettingsService clusterSettingsService) {
|
||||
NodeSettingsService nodeSettingsService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.transportService = transportService;
|
||||
|
@ -110,7 +110,7 @@ public class RecoverySource extends AbstractComponent {
|
|||
|
||||
transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler());
|
||||
|
||||
clusterSettingsService.addListener(new ApplySettings());
|
||||
nodeSettingsService.addListener(new ApplySettings());
|
||||
}
|
||||
|
||||
public void close() {
|
||||
|
@ -312,7 +312,7 @@ public class RecoverySource extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
class ApplySettings implements ClusterSettingsService.Listener {
|
||||
class ApplySettings implements NodeSettingsService.Listener {
|
||||
@Override public void onRefreshSettings(Settings settings) {
|
||||
int concurrentStreams = settings.getAsInt("index.shard.recovery.concurrent_streams", RecoverySource.this.concurrentStreams);
|
||||
if (concurrentStreams != RecoverySource.this.concurrentStreams) {
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.node.internal;
|
|||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
@ -35,5 +36,6 @@ public class NodeModule extends AbstractModule {
|
|||
|
||||
@Override protected void configure() {
|
||||
bind(Node.class).toInstance(node);
|
||||
bind(NodeSettingsService.class).asEagerSingleton();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.settings;
|
||||
package org.elasticsearch.node.settings;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
|
@ -31,14 +31,16 @@ import java.util.Map;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
/**
|
||||
* A service that allows to register for node settings change that can come from cluster
|
||||
* events holding new settings.
|
||||
*/
|
||||
public class ClusterSettingsService extends AbstractComponent implements ClusterStateListener {
|
||||
public class NodeSettingsService extends AbstractComponent implements ClusterStateListener {
|
||||
|
||||
private volatile Settings lastSettingsApplied;
|
||||
|
||||
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
|
||||
|
||||
@Inject public ClusterSettingsService(Settings settings) {
|
||||
@Inject public NodeSettingsService(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.discovery.ec2;
|
|||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.settings.ClusterSettingsService;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -30,6 +29,7 @@ import org.elasticsearch.discovery.zen.ZenDiscovery;
|
|||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPingService;
|
||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -39,8 +39,8 @@ import org.elasticsearch.transport.TransportService;
|
|||
public class Ec2Discovery extends ZenDiscovery {
|
||||
|
||||
@Inject public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, ClusterSettingsService clusterSettingsService, ZenPingService pingService, AwsEc2Service ec2Service) {
|
||||
super(settings, clusterName, threadPool, transportService, clusterService, clusterSettingsService, pingService);
|
||||
ClusterService clusterService, NodeSettingsService nodeSettingsService, ZenPingService pingService, AwsEc2Service ec2Service) {
|
||||
super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService, pingService);
|
||||
if (settings.getAsBoolean("cloud.enabled", true)) {
|
||||
ImmutableList<? extends ZenPing> zenPings = pingService.zenPings();
|
||||
UnicastZenPing unicastZenPing = null;
|
||||
|
|
Loading…
Reference in New Issue