From 03217c460abcca893c00a6a09fc6bc2dcc19e410 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sat, 20 Aug 2011 04:00:41 +0300 Subject: [PATCH] Cluster Update Settings API, closes #1266. --- .../action/TransportActionModule.java | 2 + .../action/TransportActions.java | 1 + .../ClusterUpdateSettingsRequest.java | 130 ++++++++++++++++++ .../ClusterUpdateSettingsResponse.java | 42 ++++++ .../TransportClusterUpdateSettingsAction.java | 126 +++++++++++++++++ .../client/ClusterAdminClient.java | 18 +++ .../org/elasticsearch/client/Requests.java | 5 + .../ClusterUpdateSettingsRequestBuilder.java | 99 +++++++++++++ .../client/node/NodeClusterAdminClient.java | 16 ++- .../support/AbstractClusterAdminClient.java | 5 + .../action/ClientTransportActionModule.java | 2 + ...tTransportClusterUpdateSettingsAction.java | 41 ++++++ .../InternalTransportClusterAdminClient.java | 24 +++- .../elasticsearch/cluster/ClusterModule.java | 19 ++- .../cluster/metadata/MetaData.java | 78 ++++++++++- .../routing/allocation/NodeAllocations.java | 5 +- .../routing/allocation/ShardsAllocation.java | 3 +- .../allocation/ThrottlingNodeAllocation.java | 33 ++++- .../service/InternalClusterService.java | 13 +- .../settings/ClusterSettingsService.java | 99 +++++++++++++ .../common/logging/ESLogger.java | 2 + .../common/logging/jdk/JdkESLogger.java | 14 ++ .../common/logging/log4j/Log4jESLogger.java | 14 ++ .../common/logging/slf4j/Slf4jESLogger.java | 4 + .../discovery/zen/ZenDiscovery.java | 5 +- .../zen/elect/ElectMasterService.java | 22 ++- .../elasticsearch/gateway/GatewayService.java | 2 + .../index/shard/recovery/RecoverySource.java | 37 ++++- .../rest/XContentRestResponse.java | 6 - .../rest/action/RestActionModule.java | 4 + .../RestClusterGetSettingsAction.java | 90 ++++++++++++ .../RestClusterUpdateSettingsAction.java | 95 +++++++++++++ .../discovery/ec2/Ec2Discovery.java | 5 +- 33 files changed, 1029 insertions(+), 32 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequest.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/cluster/settings/ClusterUpdateSettingsRequestBuilder.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/settings/ClientTransportClusterUpdateSettingsAction.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/settings/ClusterSettingsService.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/settings/RestClusterGetSettingsAction.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/settings/RestClusterUpdateSettingsAction.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java index e991a323213..58224308d95 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.ping.replication.TransportIndexRep import org.elasticsearch.action.admin.cluster.ping.replication.TransportReplicationPingAction; import org.elasticsearch.action.admin.cluster.ping.replication.TransportShardReplicationPingAction; import org.elasticsearch.action.admin.cluster.ping.single.TransportSinglePingAction; +import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction; import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction; import org.elasticsearch.action.admin.indices.analyze.TransportAnalyzeAction; @@ -90,6 +91,7 @@ public class TransportActionModule extends AbstractModule { bind(TransportNodesRestartAction.class).asEagerSingleton(); bind(TransportClusterStateAction.class).asEagerSingleton(); bind(TransportClusterHealthAction.class).asEagerSingleton(); + bind(TransportClusterUpdateSettingsAction.class).asEagerSingleton(); bind(TransportSinglePingAction.class).asEagerSingleton(); bind(TransportBroadcastPingAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java index 05d14115782..0c9949bb63f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java @@ -85,6 +85,7 @@ public class TransportActions { public static final String STATE = "/cluster/state"; public static final String HEALTH = "/cluster/health"; + public static final String UPDATE_SETTINGS = "/cluster/updateSettings"; public static class Node { public static final String INFO = "/cluster/nodes/info"; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequest.java new file mode 100644 index 00000000000..0ce4b2253f5 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequest.java @@ -0,0 +1,130 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.settings; + +import org.elasticsearch.ElasticSearchGenerationException; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.action.Actions.*; +import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; + +/** + */ +public class ClusterUpdateSettingsRequest extends MasterNodeOperationRequest { + + private Settings transientSettings = EMPTY_SETTINGS; + private Settings persistentSettings = EMPTY_SETTINGS; + + public ClusterUpdateSettingsRequest() { + } + + @Override public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (transientSettings.getAsMap().isEmpty() && persistentSettings.getAsMap().isEmpty()) { + validationException = addValidationError("no settings to update", validationException); + } + return validationException; + } + + Settings transientSettings() { + return transientSettings; + } + + Settings persistentSettings() { + return persistentSettings; + } + + public ClusterUpdateSettingsRequest transientSettings(Settings settings) { + this.transientSettings = settings; + return this; + } + + public ClusterUpdateSettingsRequest transientSettings(Settings.Builder settings) { + this.transientSettings = settings.build(); + return this; + } + + public ClusterUpdateSettingsRequest transientSettings(String source) { + this.transientSettings = ImmutableSettings.settingsBuilder().loadFromSource(source).build(); + return this; + } + + public ClusterUpdateSettingsRequest transientSettings(Map source) { + try { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + builder.map(source); + transientSettings(builder.string()); + } catch (IOException e) { + throw new ElasticSearchGenerationException("Failed to generate [" + source + "]", e); + } + return this; + } + + public ClusterUpdateSettingsRequest persistentSettings(Settings settings) { + this.persistentSettings = settings; + return this; + } + + public ClusterUpdateSettingsRequest persistentSettings(Settings.Builder settings) { + this.persistentSettings = settings.build(); + return this; + } + + public ClusterUpdateSettingsRequest persistentSettings(String source) { + this.persistentSettings = ImmutableSettings.settingsBuilder().loadFromSource(source).build(); + return this; + } + + public ClusterUpdateSettingsRequest persistentSettings(Map source) { + try { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + builder.map(source); + persistentSettings(builder.string()); + } catch (IOException e) { + throw new ElasticSearchGenerationException("Failed to generate [" + source + "]", e); + } + return this; + } + + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + transientSettings = readSettingsFromStream(in); + persistentSettings = readSettingsFromStream(in); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeSettingsToStream(transientSettings, out); + writeSettingsToStream(persistentSettings, out); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java new file mode 100644 index 00000000000..44b527e8a25 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.settings; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; + +import java.io.IOException; + +/** + * A response for a cluster update settings action. + */ +public class ClusterUpdateSettingsResponse implements ActionResponse, Streamable { + + ClusterUpdateSettingsResponse() { + } + + @Override public void readFrom(StreamInput in) throws IOException { + } + + @Override public void writeTo(StreamOutput out) throws IOException { + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java new file mode 100644 index 00000000000..35ffd355d82 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -0,0 +1,126 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.settings; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.TransportActions; +import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author kimchy (shay.banon) + */ +public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOperationAction { + + @Inject public TransportClusterUpdateSettingsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) { + super(settings, transportService, clusterService, threadPool); + } + + @Override protected String executor() { + return ThreadPool.Names.MANAGEMENT; + } + + @Override protected String transportAction() { + return TransportActions.Admin.Cluster.UPDATE_SETTINGS; + } + + @Override protected ClusterUpdateSettingsRequest newRequest() { + return new ClusterUpdateSettingsRequest(); + } + + @Override protected ClusterUpdateSettingsResponse newResponse() { + return new ClusterUpdateSettingsResponse(); + } + + @Override protected ClusterUpdateSettingsResponse masterOperation(final ClusterUpdateSettingsRequest request, ClusterState state) throws ElasticSearchException { + final AtomicReference failureRef = new AtomicReference(); + final CountDownLatch latch = new CountDownLatch(1); + + clusterService.submitStateUpdateTask("cluster_update_settings", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { + try { + boolean changed = false; + ImmutableSettings.Builder transientSettings = ImmutableSettings.settingsBuilder(); + transientSettings.put(currentState.metaData().transientSettings()); + for (Map.Entry entry : request.transientSettings().getAsMap().entrySet()) { + if (MetaData.dynamicSettings().contains(entry.getKey()) || entry.getKey().startsWith("logger.")) { + transientSettings.put(entry.getKey(), entry.getValue()); + changed = true; + } else { + logger.warn("ignoring transient setting [{}], not dynamically updateable", entry.getKey()); + } + } + + ImmutableSettings.Builder persistentSettings = ImmutableSettings.settingsBuilder(); + persistentSettings.put(currentState.metaData().persistentSettings()); + for (Map.Entry entry : request.persistentSettings().getAsMap().entrySet()) { + if (MetaData.dynamicSettings().contains(entry.getKey()) || entry.getKey().startsWith("logger.")) { + changed = true; + persistentSettings.put(entry.getKey(), entry.getValue()); + } else { + logger.warn("ignoring persistent setting [{}], not dynamically updateable", entry.getKey()); + } + } + + if (!changed) { + return currentState; + } + + MetaData.Builder metaData = MetaData.builder().metaData(currentState.metaData()) + .persistentSettings(persistentSettings.build()) + .transientSettings(transientSettings.build()); + + + return ClusterState.builder().state(currentState).metaData(metaData).build(); + } finally { + latch.countDown(); + } + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + failureRef.set(e); + } + + if (failureRef.get() != null) { + if (failureRef.get() instanceof ElasticSearchException) { + throw (ElasticSearchException) failureRef.get(); + } else { + throw new ElasticSearchException(failureRef.get().getMessage(), failureRef.get()); + } + } + + return new ClusterUpdateSettingsResponse(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java index e90e5a2d593..bee4f3d15d4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java @@ -37,6 +37,8 @@ import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRe import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingResponse; import org.elasticsearch.action.admin.cluster.ping.single.SinglePingRequest; import org.elasticsearch.action.admin.cluster.ping.single.SinglePingResponse; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.action.admin.cluster.health.ClusterHealthRequestBuilder; @@ -47,6 +49,7 @@ import org.elasticsearch.client.action.admin.cluster.node.stats.NodesStatsReques import org.elasticsearch.client.action.admin.cluster.ping.broadcast.BroadcastPingRequestBuilder; import org.elasticsearch.client.action.admin.cluster.ping.replication.ReplicationPingRequestBuilder; import org.elasticsearch.client.action.admin.cluster.ping.single.SinglePingRequestBuilder; +import org.elasticsearch.client.action.admin.cluster.settings.ClusterUpdateSettingsRequestBuilder; import org.elasticsearch.client.action.admin.cluster.state.ClusterStateRequestBuilder; /** @@ -103,6 +106,21 @@ public interface ClusterAdminClient { */ ClusterStateRequestBuilder prepareState(); + /** + * Updates settings in the cluster. + */ + ActionFuture updateSettings(ClusterUpdateSettingsRequest request); + + /** + * Update settings in the cluster. + */ + void updateSettings(ClusterUpdateSettingsRequest request, ActionListener listener); + + /** + * Update settings in the cluster. + */ + ClusterUpdateSettingsRequestBuilder prepareUpdateSettings(); + /** * Nodes info of the cluster. * diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java index 370ff31a146..f3df746817b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest; import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRequest; import org.elasticsearch.action.admin.cluster.ping.single.SinglePingRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; @@ -353,6 +354,10 @@ public class Requests { return new ClusterStateRequest(); } + public static ClusterUpdateSettingsRequest clusterUpdateSettingsRequest() { + return new ClusterUpdateSettingsRequest(); + } + /** * Creates a cluster health request. * diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/cluster/settings/ClusterUpdateSettingsRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/cluster/settings/ClusterUpdateSettingsRequestBuilder.java new file mode 100644 index 00000000000..a6bb92368be --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/cluster/settings/ClusterUpdateSettingsRequestBuilder.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.action.admin.cluster.settings; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.client.ClusterAdminClient; +import org.elasticsearch.client.action.admin.cluster.support.BaseClusterRequestBuilder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.Map; + +/** + */ +public class ClusterUpdateSettingsRequestBuilder extends BaseClusterRequestBuilder { + + public ClusterUpdateSettingsRequestBuilder(ClusterAdminClient clusterClient) { + super(clusterClient, new ClusterUpdateSettingsRequest()); + } + + public ClusterUpdateSettingsRequestBuilder setTransientSettings(Settings settings) { + request.transientSettings(settings); + return this; + } + + public ClusterUpdateSettingsRequestBuilder setTransientSettings(Settings.Builder settings) { + request.transientSettings(settings); + return this; + } + + public ClusterUpdateSettingsRequestBuilder setTransientSettings(String settings) { + request.transientSettings(settings); + return this; + } + + public ClusterUpdateSettingsRequestBuilder setTransientSettings(Map settings) { + request.transientSettings(settings); + return this; + } + + public ClusterUpdateSettingsRequestBuilder setPersistentSettings(Settings settings) { + request.persistentSettings(settings); + return this; + } + + public ClusterUpdateSettingsRequestBuilder setPersistentSettings(Settings.Builder settings) { + request.persistentSettings(settings); + return this; + } + + public ClusterUpdateSettingsRequestBuilder setPersistentSettings(String settings) { + request.persistentSettings(settings); + return this; + } + + public ClusterUpdateSettingsRequestBuilder setPersistentSettings(Map settings) { + request.persistentSettings(settings); + return this; + } + + /** + * Sets the master node timeout in case the master has not yet been discovered. + */ + public ClusterUpdateSettingsRequestBuilder setMasterNodeTimeout(TimeValue timeout) { + request.masterNodeTimeout(timeout); + return this; + } + + /** + * Sets the master node timeout in case the master has not yet been discovered. + */ + public ClusterUpdateSettingsRequestBuilder setMasterNodeTimeout(String timeout) { + request.masterNodeTimeout(timeout); + return this; + } + + @Override protected void doExecute(ActionListener listener) { + client.updateSettings(request, listener); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeClusterAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeClusterAdminClient.java index abec093206b..5adac35a08d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeClusterAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeClusterAdminClient.java @@ -45,6 +45,9 @@ import org.elasticsearch.action.admin.cluster.ping.replication.TransportReplicat import org.elasticsearch.action.admin.cluster.ping.single.SinglePingRequest; import org.elasticsearch.action.admin.cluster.ping.single.SinglePingResponse; import org.elasticsearch.action.admin.cluster.ping.single.TransportSinglePingAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction; @@ -65,6 +68,8 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement private final TransportClusterStateAction clusterStateAction; + private final TransportClusterUpdateSettingsAction clusterUpdateSettingsAction; + private final TransportSinglePingAction singlePingAction; private final TransportBroadcastPingAction broadcastPingAction; @@ -80,12 +85,13 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement private final TransportNodesRestartAction nodesRestart; @Inject public NodeClusterAdminClient(Settings settings, ThreadPool threadPool, - TransportClusterHealthAction clusterHealthAction, TransportClusterStateAction clusterStateAction, + TransportClusterHealthAction clusterHealthAction, TransportClusterStateAction clusterStateAction, TransportClusterUpdateSettingsAction clusterUpdateSettingsAction, TransportSinglePingAction singlePingAction, TransportBroadcastPingAction broadcastPingAction, TransportReplicationPingAction replicationPingAction, TransportNodesInfoAction nodesInfoAction, TransportNodesShutdownAction nodesShutdown, TransportNodesRestartAction nodesRestart, TransportNodesStatsAction nodesStatsAction) { this.threadPool = threadPool; this.clusterHealthAction = clusterHealthAction; this.clusterStateAction = clusterStateAction; + this.clusterUpdateSettingsAction = clusterUpdateSettingsAction; this.nodesInfoAction = nodesInfoAction; this.nodesShutdown = nodesShutdown; this.nodesRestart = nodesRestart; @@ -115,6 +121,14 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement clusterStateAction.execute(request, listener); } + @Override public ActionFuture updateSettings(ClusterUpdateSettingsRequest request) { + return clusterUpdateSettingsAction.execute(request); + } + + @Override public void updateSettings(ClusterUpdateSettingsRequest request, ActionListener listener) { + clusterUpdateSettingsAction.execute(request, listener); + } + @Override public ActionFuture ping(SinglePingRequest request) { return singlePingAction.execute(request); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java index 3ce8e22449b..a2499cd552d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java @@ -27,6 +27,7 @@ import org.elasticsearch.client.action.admin.cluster.node.stats.NodesStatsReques import org.elasticsearch.client.action.admin.cluster.ping.broadcast.BroadcastPingRequestBuilder; import org.elasticsearch.client.action.admin.cluster.ping.replication.ReplicationPingRequestBuilder; import org.elasticsearch.client.action.admin.cluster.ping.single.SinglePingRequestBuilder; +import org.elasticsearch.client.action.admin.cluster.settings.ClusterUpdateSettingsRequestBuilder; import org.elasticsearch.client.action.admin.cluster.state.ClusterStateRequestBuilder; import org.elasticsearch.client.internal.InternalClusterAdminClient; @@ -43,6 +44,10 @@ public abstract class AbstractClusterAdminClient implements InternalClusterAdmin return new ClusterStateRequestBuilder(this); } + @Override public ClusterUpdateSettingsRequestBuilder prepareUpdateSettings() { + return new ClusterUpdateSettingsRequestBuilder(this); + } + @Override public NodesInfoRequestBuilder prepareNodesInfo(String... nodesIds) { return new NodesInfoRequestBuilder(this).setNodesIds(nodesIds); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java index 9f96e2d51e5..77c15395628 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java @@ -27,6 +27,7 @@ import org.elasticsearch.client.transport.action.admin.cluster.node.stats.Client import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction; import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction; import org.elasticsearch.client.transport.action.admin.cluster.ping.single.ClientTransportSinglePingAction; +import org.elasticsearch.client.transport.action.admin.cluster.settings.ClientTransportClusterUpdateSettingsAction; import org.elasticsearch.client.transport.action.admin.cluster.state.ClientTransportClusterStateAction; import org.elasticsearch.client.transport.action.admin.indices.alias.ClientTransportIndicesAliasesAction; import org.elasticsearch.client.transport.action.admin.indices.analyze.ClientTransportAnalyzeAction; @@ -105,5 +106,6 @@ public class ClientTransportActionModule extends AbstractModule { bind(ClientTransportBroadcastPingAction.class).asEagerSingleton(); bind(ClientTransportClusterStateAction.class).asEagerSingleton(); bind(ClientTransportClusterHealthAction.class).asEagerSingleton(); + bind(ClientTransportClusterUpdateSettingsAction.class).asEagerSingleton(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/settings/ClientTransportClusterUpdateSettingsAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/settings/ClientTransportClusterUpdateSettingsAction.java new file mode 100644 index 00000000000..d34ece63232 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/settings/ClientTransportClusterUpdateSettingsAction.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.transport.action.admin.cluster.settings; + +import org.elasticsearch.action.TransportActions; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.client.transport.action.support.BaseClientTransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.TransportService; + +/** + */ +public class ClientTransportClusterUpdateSettingsAction extends BaseClientTransportAction { + + @Inject public ClientTransportClusterUpdateSettingsAction(Settings settings, TransportService transportService) { + super(settings, transportService, ClusterUpdateSettingsResponse.class); + } + + @Override protected String action() { + return TransportActions.Admin.Cluster.UPDATE_SETTINGS; + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java index 2ee5a89ef70..d3ff80187aa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java @@ -38,6 +38,8 @@ import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRe import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingResponse; import org.elasticsearch.action.admin.cluster.ping.single.SinglePingRequest; import org.elasticsearch.action.admin.cluster.ping.single.SinglePingResponse; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.internal.InternalClusterAdminClient; @@ -51,6 +53,7 @@ import org.elasticsearch.client.transport.action.admin.cluster.node.stats.Client import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction; import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction; import org.elasticsearch.client.transport.action.admin.cluster.ping.single.ClientTransportSinglePingAction; +import org.elasticsearch.client.transport.action.admin.cluster.settings.ClientTransportClusterUpdateSettingsAction; import org.elasticsearch.client.transport.action.admin.cluster.state.ClientTransportClusterStateAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; @@ -70,6 +73,8 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli private final ClientTransportClusterStateAction clusterStateAction; + private final ClientTransportClusterUpdateSettingsAction clusterUpdateSettingsAction; + private final ClientTransportSinglePingAction singlePingAction; private final ClientTransportReplicationPingAction replicationPingAction; @@ -85,13 +90,14 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli private final ClientTransportNodesRestartAction nodesRestartAction; @Inject public InternalTransportClusterAdminClient(Settings settings, TransportClientNodesService nodesService, ThreadPool threadPool, - ClientTransportClusterHealthAction clusterHealthAction, ClientTransportClusterStateAction clusterStateAction, + ClientTransportClusterHealthAction clusterHealthAction, ClientTransportClusterStateAction clusterStateAction, ClientTransportClusterUpdateSettingsAction clusterUpdateSettingsAction, ClientTransportSinglePingAction singlePingAction, ClientTransportReplicationPingAction replicationPingAction, ClientTransportBroadcastPingAction broadcastPingAction, ClientTransportNodesInfoAction nodesInfoAction, ClientTransportNodesShutdownAction nodesShutdownAction, ClientTransportNodesRestartAction nodesRestartAction, ClientTransportNodesStatsAction nodesStatsAction) { this.nodesService = nodesService; this.threadPool = threadPool; this.clusterHealthAction = clusterHealthAction; this.clusterStateAction = clusterStateAction; + this.clusterUpdateSettingsAction = clusterUpdateSettingsAction; this.nodesInfoAction = nodesInfoAction; this.nodesShutdownAction = nodesShutdownAction; this.nodesRestartAction = nodesRestartAction; @@ -137,6 +143,22 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli }, listener); } + @Override public ActionFuture updateSettings(final ClusterUpdateSettingsRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(DiscoveryNode node) throws ElasticSearchException { + return clusterUpdateSettingsAction.execute(node, request); + } + }); + } + + @Override public void updateSettings(final ClusterUpdateSettingsRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeListenerCallback() { + @Override public void doWithNode(DiscoveryNode node, ActionListener listener) throws ElasticSearchException { + clusterUpdateSettingsAction.execute(node, request, listener); + } + }, listener); + } + @Override public ActionFuture ping(final SinglePingRequest request) { return nodesService.execute(new TransportClientNodesService.NodeCallback>() { @Override public ActionFuture doWithNode(DiscoveryNode node) throws ElasticSearchException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 3d8c23d7f5f..6f89f82fe72 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -19,13 +19,25 @@ package org.elasticsearch.cluster; -import org.elasticsearch.cluster.action.index.*; +import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.cluster.action.index.NodeAliasesUpdatedAction; +import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; +import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; +import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; +import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.*; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService; +import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService; +import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService; +import org.elasticsearch.cluster.metadata.MetaDataMappingService; +import org.elasticsearch.cluster.metadata.MetaDataStateIndexService; +import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule; import org.elasticsearch.cluster.routing.operation.OperationRoutingModule; import org.elasticsearch.cluster.service.InternalClusterService; +import org.elasticsearch.cluster.settings.ClusterSettingsService; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; @@ -50,6 +62,9 @@ public class ClusterModule extends AbstractModule implements SpawnModules { @Override protected void configure() { bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton(); + + bind(ClusterSettingsService.class).asEagerSingleton(); + bind(MetaDataCreateIndexService.class).asEagerSingleton(); bind(MetaDataDeleteIndexService.class).asEagerSingleton(); bind(MetaDataStateIndexService.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index b03a9f8b7ac..bbd8fda3856 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.collect.UnmodifiableIterator; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.set.hash.THashSet; import org.elasticsearch.common.util.concurrent.Immutable; @@ -43,6 +44,7 @@ import org.elasticsearch.indices.IndexMissingException; import java.io.IOException; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -59,10 +61,26 @@ import static org.elasticsearch.common.settings.ImmutableSettings.*; @Immutable public class MetaData implements Iterable { + private static ImmutableSet dynamicSettings = ImmutableSet.builder() + .build(); + + public static ImmutableSet dynamicSettings() { + return dynamicSettings; + } + + public static synchronized void addDynamicSettings(String... settings) { + HashSet updatedSettings = new HashSet(dynamicSettings); + updatedSettings.addAll(Arrays.asList(settings)); + dynamicSettings = ImmutableSet.copyOf(updatedSettings); + } + public static final MetaData EMPTY_META_DATA = newMetaDataBuilder().build(); private final long version; + private final Settings transientSettings; + private final Settings persistentSettings; + private final Settings settings; private final ImmutableMap indices; private final ImmutableMap templates; @@ -80,8 +98,11 @@ public class MetaData implements Iterable { private final ImmutableMap aliasAndIndexToIndexMap; - private MetaData(long version, ImmutableMap indices, ImmutableMap templates) { + MetaData(long version, Settings transientSettings, Settings persistentSettings, ImmutableMap indices, ImmutableMap templates) { this.version = version; + this.transientSettings = transientSettings; + this.persistentSettings = persistentSettings; + this.settings = ImmutableSettings.settingsBuilder().put(persistentSettings).put(transientSettings).build(); this.indices = ImmutableMap.copyOf(indices); this.templates = templates; int totalNumberOfShards = 0; @@ -194,6 +215,21 @@ public class MetaData implements Iterable { return this.version; } + /** + * Returns the merges transient and persistent settings. + */ + public Settings settings() { + return this.settings; + } + + public Settings transientSettings() { + return this.transientSettings; + } + + public Settings persistentSettings() { + return this.persistentSettings; + } + public ImmutableMap> aliases() { return this.aliases; } @@ -601,11 +637,16 @@ public class MetaData implements Iterable { private long version; + private Settings transientSettings = ImmutableSettings.Builder.EMPTY_SETTINGS; + private Settings persistentSettings = ImmutableSettings.Builder.EMPTY_SETTINGS; + private MapBuilder indices = newMapBuilder(); private MapBuilder templates = newMapBuilder(); public Builder metaData(MetaData metaData) { + this.transientSettings = metaData.transientSettings; + this.persistentSettings = metaData.persistentSettings; this.version = metaData.version; this.indices.putAll(metaData.indices); this.templates.putAll(metaData.templates); @@ -674,13 +715,23 @@ public class MetaData implements Iterable { return this; } + public Builder transientSettings(Settings settings) { + this.transientSettings = settings; + return this; + } + + public Builder persistentSettings(Settings settings) { + this.persistentSettings = settings; + return this; + } + public Builder version(long version) { this.version = version; return this; } public MetaData build() { - return new MetaData(version, indices.immutableMap(), templates.immutableMap()); + return new MetaData(version, transientSettings, persistentSettings, indices.immutableMap(), templates.immutableMap()); } public static String toXContent(MetaData metaData) throws IOException { @@ -694,6 +745,14 @@ public class MetaData implements Iterable { public static void toXContent(MetaData metaData, XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject("meta-data"); + if (!metaData.persistentSettings().getAsMap().isEmpty()) { + builder.startObject("settings"); + for (Map.Entry entry : metaData.persistentSettings().getAsMap().entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); + } + builder.startObject("templates"); for (IndexTemplateMetaData template : metaData.templates().values()) { IndexTemplateMetaData.Builder.toXContent(template, builder, params); @@ -727,7 +786,16 @@ public class MetaData implements Iterable { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); } else if (token == XContentParser.Token.START_OBJECT) { - if ("indices".equals(currentFieldName)) { + if ("settings".equals(currentFieldName)) { + ImmutableSettings.Builder settingsBuilder = settingsBuilder(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + String key = parser.currentName(); + token = parser.nextToken(); + String value = parser.text(); + settingsBuilder.put(key, value); + } + builder.persistentSettings(settingsBuilder.build()); + } else if ("indices".equals(currentFieldName)) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { builder.put(IndexMetaData.Builder.fromXContent(parser)); } @@ -744,6 +812,8 @@ public class MetaData implements Iterable { public static MetaData readFrom(StreamInput in) throws IOException { Builder builder = new Builder(); builder.version = in.readLong(); + builder.transientSettings(readSettingsFromStream(in)); + builder.persistentSettings(readSettingsFromStream(in)); int size = in.readVInt(); for (int i = 0; i < size; i++) { builder.put(IndexMetaData.Builder.readFrom(in)); @@ -757,6 +827,8 @@ public class MetaData implements Iterable { public static void writeTo(MetaData metaData, StreamOutput out) throws IOException { out.writeLong(metaData.version); + writeSettingsToStream(metaData.transientSettings(), out); + writeSettingsToStream(metaData.persistentSettings(), out); out.writeVInt(metaData.indices.size()); for (IndexMetaData indexMetaData : metaData) { IndexMetaData.Builder.writeTo(indexMetaData, out); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java index 3de79a6b1cb..df07eee1587 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.settings.ClusterSettingsService; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -36,11 +37,11 @@ public class NodeAllocations extends NodeAllocation { private final NodeAllocation[] allocations; - public NodeAllocations(Settings settings) { + public NodeAllocations(Settings settings, ClusterSettingsService clusterSettingsService) { this(settings, ImmutableSet.builder() .add(new SameShardNodeAllocation(settings)) .add(new ReplicaAfterPrimaryActiveNodeAllocation(settings)) - .add(new ThrottlingNodeAllocation(settings)) + .add(new ThrottlingNodeAllocation(settings, clusterSettingsService)) .add(new RebalanceOnlyWhenActiveNodeAllocation(settings)) .add(new ClusterRebalanceNodeAllocation(settings)) .add(new ConcurrentRebalanceNodeAllocation(settings)) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java index 966819a3fcd..e435dd2bc62 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.settings.ClusterSettingsService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; @@ -52,7 +53,7 @@ public class ShardsAllocation extends AbstractComponent { } public ShardsAllocation(Settings settings) { - this(settings, new NodeAllocations(settings)); + this(settings, new NodeAllocations(settings, new ClusterSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS))); } @Inject public ShardsAllocation(Settings settings, NodeAllocations nodeAllocations) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java index f0b58661593..1a502c9c41b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java @@ -19,10 +19,12 @@ package org.elasticsearch.cluster.routing.allocation; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.settings.ClusterSettingsService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -31,15 +33,24 @@ import org.elasticsearch.common.settings.Settings; */ public class ThrottlingNodeAllocation extends NodeAllocation { - private final int primariesInitialRecoveries; - private final int concurrentRecoveries; + static { + MetaData.addDynamicSettings( + "cluster.routing.allocation.node_initial_primaries_recoveries", + "cluster.routing.allocation.node_concurrent_recoveries" + ); + } - @Inject public ThrottlingNodeAllocation(Settings settings) { + private volatile int primariesInitialRecoveries; + private volatile int concurrentRecoveries; + + @Inject public ThrottlingNodeAllocation(Settings settings, ClusterSettingsService clusterSettingsService) { super(settings); this.primariesInitialRecoveries = componentSettings.getAsInt("node_initial_primaries_recoveries", 4); this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", componentSettings.getAsInt("node_concurrent_recoveries", 2)); logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries); + + clusterSettingsService.addListener(new ApplySettings()); } @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { @@ -83,4 +94,20 @@ public class ThrottlingNodeAllocation extends NodeAllocation { return Decision.YES; } } + + class ApplySettings implements ClusterSettingsService.Listener { + @Override public void onRefreshSettings(Settings settings) { + int primariesInitialRecoveries = settings.getAsInt("cluster.routing.allocation.node_initial_primaries_recoveries", ThrottlingNodeAllocation.this.primariesInitialRecoveries); + if (primariesInitialRecoveries != ThrottlingNodeAllocation.this.primariesInitialRecoveries) { + logger.info("updating [cluster.routing.allocation.node_initial_primaries_recoveries] from [{}] to [{}]", ThrottlingNodeAllocation.this.primariesInitialRecoveries, primariesInitialRecoveries); + ThrottlingNodeAllocation.this.primariesInitialRecoveries = primariesInitialRecoveries; + } + + int concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.node_concurrent_recoveries", ThrottlingNodeAllocation.this.concurrentRecoveries); + if (concurrentRecoveries != ThrottlingNodeAllocation.this.concurrentRecoveries) { + logger.info("updating [cluster.routing.allocation.node_concurrent_recoveries] from [{}] to [{}]", ThrottlingNodeAllocation.this.concurrentRecoveries, concurrentRecoveries); + ThrottlingNodeAllocation.this.concurrentRecoveries = concurrentRecoveries; + } + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 552a543d07b..7911ba270cd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.operation.OperationRouting; +import org.elasticsearch.cluster.settings.ClusterSettingsService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -70,6 +71,8 @@ public class InternalClusterService extends AbstractLifecycleComponent listeners = new CopyOnWriteArrayList(); + + @Inject public ClusterSettingsService(Settings settings) { + super(settings); + } + + // inject it as a member, so we won't get into possible cyclic problems + public void setClusterService(ClusterService clusterService) { + clusterService.add(this); + } + + @Override public void clusterChanged(ClusterChangedEvent event) { + // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency + if (event.state().blocks().disableStatePersistence()) { + return; + } + + if (!event.metaDataChanged()) { + // nothing changed in the metadata, no need to check + return; + } + + if (lastSettingsApplied != null && event.state().metaData().settings().equals(lastSettingsApplied)) { + // nothing changed in the settings, ignore + return; + } + + for (Listener listener : listeners) { + try { + listener.onRefreshSettings(event.state().metaData().settings()); + } catch (Exception e) { + logger.warn("failed to refresh settings for [{}]", e, listener); + } + } + + try { + for (Map.Entry entry : event.state().metaData().settings().getAsMap().entrySet()) { + if (entry.getKey().startsWith("logger.")) { + String component = entry.getKey().substring("logger.".length()); + ESLoggerFactory.getLogger(component, entry.getValue()).setLevel(entry.getValue()); + } + } + } catch (Exception e) { + logger.warn("failed to refresh settings for [{}]", e, "logger"); + } + + lastSettingsApplied = event.state().metaData().settings(); + } + + public void addListener(Listener listener) { + this.listeners.add(listener); + } + + public void removeListener(Listener listener) { + this.listeners.remove(listener); + } + + public static interface Listener { + void onRefreshSettings(Settings settings); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/ESLogger.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/ESLogger.java index 5ad6cffefc4..d270d31c292 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/ESLogger.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/ESLogger.java @@ -28,6 +28,8 @@ public interface ESLogger { String getName(); + void setLevel(String level); + /** * Returns {@code true} if a TRACE level message is logged. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/jdk/JdkESLogger.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/jdk/JdkESLogger.java index 092ad365b66..10f161f1bff 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/jdk/JdkESLogger.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/jdk/JdkESLogger.java @@ -39,6 +39,20 @@ public class JdkESLogger extends AbstractESLogger { this.name = name; } + @Override public void setLevel(String level) { + if ("error".equalsIgnoreCase(level)) { + logger.setLevel(Level.SEVERE); + } else if ("warn".equalsIgnoreCase(level)) { + logger.setLevel(Level.WARNING); + } else if ("info".equalsIgnoreCase(level)) { + logger.setLevel(Level.INFO); + } else if ("debug".equalsIgnoreCase(level)) { + logger.setLevel(Level.FINE); + } else if ("trace".equalsIgnoreCase(level)) { + logger.setLevel(Level.FINE); + } + } + @Override public String getName() { return logger.getName(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/log4j/Log4jESLogger.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/log4j/Log4jESLogger.java index 74bed114d7d..2f24436ae0d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/log4j/Log4jESLogger.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/log4j/Log4jESLogger.java @@ -35,6 +35,20 @@ public class Log4jESLogger extends AbstractESLogger { this.logger = logger; } + public void setLevel(String level) { + if ("error".equalsIgnoreCase(level)) { + logger.setLevel(Level.ERROR); + } else if ("warn".equalsIgnoreCase(level)) { + logger.setLevel(Level.WARN); + } else if ("info".equalsIgnoreCase(level)) { + logger.setLevel(Level.INFO); + } else if ("debug".equalsIgnoreCase(level)) { + logger.setLevel(Level.DEBUG); + } else if ("trace".equalsIgnoreCase(level)) { + logger.setLevel(Level.TRACE); + } + } + @Override public String getName() { return logger.getName(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/slf4j/Slf4jESLogger.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/slf4j/Slf4jESLogger.java index c3778a27cbf..94f5eba84a7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/slf4j/Slf4jESLogger.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/logging/slf4j/Slf4jESLogger.java @@ -34,6 +34,10 @@ public class Slf4jESLogger extends AbstractESLogger { this.logger = logger; } + @Override public void setLevel(String level) { + // can't set it in slf4j... + } + @Override public String getName() { return logger.getName(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 9355a877f1c..d53db088e22 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.settings.ClusterSettingsService; import org.elasticsearch.common.UUID; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -108,7 +109,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private final AtomicBoolean initialStateSent = new AtomicBoolean(); @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, - TransportService transportService, ClusterService clusterService, + TransportService transportService, ClusterService clusterService, ClusterSettingsService clusterSettingsService, ZenPingService pingService) { super(settings); this.clusterName = clusterName; @@ -123,7 +124,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen logger.debug("using ping.timeout [{}]", pingTimeout); - this.electMaster = new ElectMasterService(settings); + this.electMaster = new ElectMasterService(settings, clusterSettingsService); this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this); this.masterFD.addListener(new MasterNodeFailureListener()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java index 10a8e44074d..aff52c5a301 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java @@ -19,7 +19,9 @@ package org.elasticsearch.discovery.zen.elect; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.settings.ClusterSettingsService; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; @@ -34,13 +36,19 @@ import java.util.List; */ public class ElectMasterService extends AbstractComponent { + static { + MetaData.addDynamicSettings("discovery.zen.minimum_master_nodes"); + } + private final NodeComparator nodeComparator = new NodeComparator(); - private final int minimumMasterNodes; + private volatile int minimumMasterNodes; - public ElectMasterService(Settings settings) { + public ElectMasterService(Settings settings, ClusterSettingsService clusterSettingsService) { super(settings); this.minimumMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1); + logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes); + clusterSettingsService.addListener(new ApplySettings()); } public boolean hasEnoughMasterNodes(Iterable nodes) { @@ -103,6 +111,16 @@ public class ElectMasterService extends AbstractComponent { return possibleNodes; } + class ApplySettings implements ClusterSettingsService.Listener { + @Override public void onRefreshSettings(Settings settings) { + int minimumMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", ElectMasterService.this.minimumMasterNodes); + if (minimumMasterNodes != ElectMasterService.this.minimumMasterNodes) { + logger.info("updating [discovery.zen.minimum_master_nodes] from [{}] to [{}]", ElectMasterService.this.minimumMasterNodes, minimumMasterNodes); + ElectMasterService.this.minimumMasterNodes = minimumMasterNodes; + } + } + } + private static class NodeComparator implements Comparator { @Override public int compare(DiscoveryNode o1, DiscoveryNode o2) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index 38e4758d38f..fa8b06e98d5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -250,6 +250,8 @@ public class GatewayService extends AbstractLifecycleComponent i .metaData(currentState.metaData()); metaDataBuilder.version(recoveredState.version()); + metaDataBuilder.persistentSettings(recoveredState.metaData().persistentSettings()); + // add the index templates for (Map.Entry entry : recoveredState.metaData().templates().entrySet()) { metaDataBuilder.put(entry.getValue()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java index 986222f56a3..f3994b2dc55 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java @@ -21,6 +21,8 @@ package org.elasticsearch.index.shard.recovery; import org.apache.lucene.store.IndexInput; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.settings.ClusterSettingsService; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Sets; @@ -42,13 +44,17 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.BaseTransportRequestHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.VoidTransportResponseHandler; import java.io.IOException; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; /** @@ -59,6 +65,10 @@ import java.util.concurrent.atomic.AtomicReference; */ public class RecoverySource extends AbstractComponent { + static { + MetaData.addDynamicSettings("index.shard.recovery.concurrent_streams"); + } + public static class Actions { public static final String START_RECOVERY = "index/shard/recovery/startRecovery"; } @@ -77,16 +87,18 @@ public class RecoverySource extends AbstractComponent { private final int translogOps; private final ByteSizeValue translogSize; - private final ExecutorService concurrentStreamPool; + private volatile int concurrentStreams; + private final ThreadPoolExecutor concurrentStreamPool; - @Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService) { + @Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, + ClusterSettingsService clusterSettingsService) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.indicesService = indicesService; - int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5); - this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); + this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5); + this.concurrentStreamPool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB)); this.translogOps = componentSettings.getAsInt("translog_ops", 1000); @@ -97,6 +109,8 @@ public class RecoverySource extends AbstractComponent { concurrentStreams, fileChunkSize, translogSize, translogOps, compress); transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler()); + + clusterSettingsService.addListener(new ApplySettings()); } public void close() { @@ -297,5 +311,16 @@ public class RecoverySource extends AbstractComponent { channel.sendResponse(response); } } + + class ApplySettings implements ClusterSettingsService.Listener { + @Override public void onRefreshSettings(Settings settings) { + int concurrentStreams = settings.getAsInt("index.shard.recovery.concurrent_streams", RecoverySource.this.concurrentStreams); + if (concurrentStreams != RecoverySource.this.concurrentStreams) { + logger.info("updating [index.shard.recovery.concurrent_streams] from [{}] to [{}]", RecoverySource.this.concurrentStreams, concurrentStreams); + RecoverySource.this.concurrentStreams = concurrentStreams; + RecoverySource.this.concurrentStreamPool.setMaximumPoolSize(concurrentStreams); + } + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/XContentRestResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/XContentRestResponse.java index 7450248b3cf..11f730a5cb3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/XContentRestResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/XContentRestResponse.java @@ -51,12 +51,6 @@ public class XContentRestResponse extends AbstractRestResponse { private final XContentBuilder builder; - public XContentRestResponse(RestRequest request, RestStatus status) { - this.builder = null; - this.status = status; - this.prefixUtf8Result = startJsonp(request); - } - public XContentRestResponse(RestRequest request, RestStatus status, XContentBuilder builder) throws IOException { this.builder = builder; this.status = status; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index 4b9aef45ac0..9b6e7bb06f4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -30,6 +30,8 @@ import org.elasticsearch.rest.action.admin.cluster.node.stats.RestNodesStatsActi import org.elasticsearch.rest.action.admin.cluster.ping.broadcast.RestBroadcastPingAction; import org.elasticsearch.rest.action.admin.cluster.ping.replication.RestReplicationPingAction; import org.elasticsearch.rest.action.admin.cluster.ping.single.RestSinglePingAction; +import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterGetSettingsAction; +import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterUpdateSettingsAction; import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction; import org.elasticsearch.rest.action.admin.indices.alias.RestGetIndicesAliasesAction; import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction; @@ -92,6 +94,8 @@ public class RestActionModule extends AbstractModule { bind(RestNodesRestartAction.class).asEagerSingleton(); bind(RestClusterStateAction.class).asEagerSingleton(); bind(RestClusterHealthAction.class).asEagerSingleton(); + bind(RestClusterUpdateSettingsAction.class).asEagerSingleton(); + bind(RestClusterGetSettingsAction.class).asEagerSingleton(); bind(RestSinglePingAction.class).asEagerSingleton(); bind(RestBroadcastPingAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/settings/RestClusterGetSettingsAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/settings/RestClusterGetSettingsAction.java new file mode 100644 index 00000000000..71eab856314 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/settings/RestClusterGetSettingsAction.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster.settings; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.XContentRestResponse; +import org.elasticsearch.rest.XContentThrowableRestResponse; +import org.elasticsearch.rest.action.support.RestXContentBuilder; + +import java.io.IOException; +import java.util.Map; + +/** + */ +public class RestClusterGetSettingsAction extends BaseRestHandler { + + @Inject public RestClusterGetSettingsAction(Settings settings, Client client, RestController controller) { + super(settings, client); + controller.registerHandler(RestRequest.Method.GET, "/_cluster/settings", this); + } + + @Override public void handleRequest(final RestRequest request, final RestChannel channel) { + ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest() + .filterRoutingTable(true) + .filterNodes(true); + client.admin().cluster().state(clusterStateRequest, new ActionListener() { + @Override public void onResponse(ClusterStateResponse response) { + try { + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); + builder.startObject(); + + builder.startObject("persistent"); + for (Map.Entry entry : response.state().metaData().persistentSettings().getAsMap().entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); + + builder.startObject("transient"); + for (Map.Entry entry : response.state().metaData().transientSettings().getAsMap().entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); + + builder.endObject(); + + channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); + } catch (Exception e) { + onFailure(e); + } + } + + @Override public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/settings/RestClusterUpdateSettingsAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/settings/RestClusterUpdateSettingsAction.java new file mode 100644 index 00000000000..6560f55324f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/settings/RestClusterUpdateSettingsAction.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster.settings; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.StringRestResponse; +import org.elasticsearch.rest.XContentThrowableRestResponse; + +import java.io.IOException; +import java.util.Map; + +/** + */ +public class RestClusterUpdateSettingsAction extends BaseRestHandler { + + @Inject public RestClusterUpdateSettingsAction(Settings settings, Client client, RestController controller) { + super(settings, client); + controller.registerHandler(RestRequest.Method.PUT, "/_cluster/settings", this); + } + + @Override public void handleRequest(final RestRequest request, final RestChannel channel) { + final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest(); + + try { + XContentType xContentType = XContentFactory.xContentType(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()); + Map source = XContentFactory.xContent(xContentType) + .createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapAndClose(); + + if (source.containsKey("transient")) { + clusterUpdateSettingsRequest.transientSettings((Map) source.get("transient")); + } + if (source.containsKey("persistent")) { + clusterUpdateSettingsRequest.persistentSettings((Map) source.get("persistent")); + } + } catch (Exception e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.warn("Failed to send response", e1); + } + return; + } + + client.admin().cluster().updateSettings(clusterUpdateSettingsRequest, new ActionListener() { + @Override public void onResponse(ClusterUpdateSettingsResponse response) { + try { + channel.sendResponse(new StringRestResponse(RestStatus.OK)); + } catch (Exception e) { + onFailure(e); + } + } + + @Override public void onFailure(Throwable e) { + if (logger.isDebugEnabled()) { + logger.debug("failed to handle cluster state", e); + } + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } +} \ No newline at end of file diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java index a764e20e5e2..2718b242505 100755 --- a/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java @@ -22,6 +22,7 @@ package org.elasticsearch.discovery.ec2; import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.settings.ClusterSettingsService; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -38,8 +39,8 @@ import org.elasticsearch.transport.TransportService; public class Ec2Discovery extends ZenDiscovery { @Inject public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, - ClusterService clusterService, ZenPingService pingService, AwsEc2Service ec2Service) { - super(settings, clusterName, threadPool, transportService, clusterService, pingService); + ClusterService clusterService, ClusterSettingsService clusterSettingsService, ZenPingService pingService, AwsEc2Service ec2Service) { + super(settings, clusterName, threadPool, transportService, clusterService, clusterSettingsService, pingService); if (settings.getAsBoolean("cloud.enabled", true)) { ImmutableList zenPings = pingService.zenPings(); UnicastZenPing unicastZenPing = null;