Cluster Update Settings API, closes #1266.

This commit is contained in:
Shay Banon 2011-08-20 04:00:41 +03:00
parent a83c45be22
commit 03217c460a
33 changed files with 1029 additions and 32 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.ping.replication.TransportIndexRep
import org.elasticsearch.action.admin.cluster.ping.replication.TransportReplicationPingAction;
import org.elasticsearch.action.admin.cluster.ping.replication.TransportShardReplicationPingAction;
import org.elasticsearch.action.admin.cluster.ping.single.TransportSinglePingAction;
import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
import org.elasticsearch.action.admin.indices.analyze.TransportAnalyzeAction;
@ -90,6 +91,7 @@ public class TransportActionModule extends AbstractModule {
bind(TransportNodesRestartAction.class).asEagerSingleton();
bind(TransportClusterStateAction.class).asEagerSingleton();
bind(TransportClusterHealthAction.class).asEagerSingleton();
bind(TransportClusterUpdateSettingsAction.class).asEagerSingleton();
bind(TransportSinglePingAction.class).asEagerSingleton();
bind(TransportBroadcastPingAction.class).asEagerSingleton();

View File

@ -85,6 +85,7 @@ public class TransportActions {
public static final String STATE = "/cluster/state";
public static final String HEALTH = "/cluster/health";
public static final String UPDATE_SETTINGS = "/cluster/updateSettings";
public static class Node {
public static final String INFO = "/cluster/nodes/info";

View File

@ -0,0 +1,130 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.settings;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.action.Actions.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
/**
*/
public class ClusterUpdateSettingsRequest extends MasterNodeOperationRequest {
private Settings transientSettings = EMPTY_SETTINGS;
private Settings persistentSettings = EMPTY_SETTINGS;
public ClusterUpdateSettingsRequest() {
}
@Override public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (transientSettings.getAsMap().isEmpty() && persistentSettings.getAsMap().isEmpty()) {
validationException = addValidationError("no settings to update", validationException);
}
return validationException;
}
Settings transientSettings() {
return transientSettings;
}
Settings persistentSettings() {
return persistentSettings;
}
public ClusterUpdateSettingsRequest transientSettings(Settings settings) {
this.transientSettings = settings;
return this;
}
public ClusterUpdateSettingsRequest transientSettings(Settings.Builder settings) {
this.transientSettings = settings.build();
return this;
}
public ClusterUpdateSettingsRequest transientSettings(String source) {
this.transientSettings = ImmutableSettings.settingsBuilder().loadFromSource(source).build();
return this;
}
public ClusterUpdateSettingsRequest transientSettings(Map source) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.map(source);
transientSettings(builder.string());
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + source + "]", e);
}
return this;
}
public ClusterUpdateSettingsRequest persistentSettings(Settings settings) {
this.persistentSettings = settings;
return this;
}
public ClusterUpdateSettingsRequest persistentSettings(Settings.Builder settings) {
this.persistentSettings = settings.build();
return this;
}
public ClusterUpdateSettingsRequest persistentSettings(String source) {
this.persistentSettings = ImmutableSettings.settingsBuilder().loadFromSource(source).build();
return this;
}
public ClusterUpdateSettingsRequest persistentSettings(Map source) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.map(source);
persistentSettings(builder.string());
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + source + "]", e);
}
return this;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
transientSettings = readSettingsFromStream(in);
persistentSettings = readSettingsFromStream(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeSettingsToStream(transientSettings, out);
writeSettingsToStream(persistentSettings, out);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.settings;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a cluster update settings action.
*/
public class ClusterUpdateSettingsResponse implements ActionResponse, Streamable {
ClusterUpdateSettingsResponse() {
}
@Override public void readFrom(StreamInput in) throws IOException {
}
@Override public void writeTo(StreamOutput out) throws IOException {
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.settings;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author kimchy (shay.banon)
*/
public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOperationAction<ClusterUpdateSettingsRequest, ClusterUpdateSettingsResponse> {
@Inject public TransportClusterUpdateSettingsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(settings, transportService, clusterService, threadPool);
}
@Override protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override protected String transportAction() {
return TransportActions.Admin.Cluster.UPDATE_SETTINGS;
}
@Override protected ClusterUpdateSettingsRequest newRequest() {
return new ClusterUpdateSettingsRequest();
}
@Override protected ClusterUpdateSettingsResponse newResponse() {
return new ClusterUpdateSettingsResponse();
}
@Override protected ClusterUpdateSettingsResponse masterOperation(final ClusterUpdateSettingsRequest request, ClusterState state) throws ElasticSearchException {
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("cluster_update_settings", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
try {
boolean changed = false;
ImmutableSettings.Builder transientSettings = ImmutableSettings.settingsBuilder();
transientSettings.put(currentState.metaData().transientSettings());
for (Map.Entry<String, String> entry : request.transientSettings().getAsMap().entrySet()) {
if (MetaData.dynamicSettings().contains(entry.getKey()) || entry.getKey().startsWith("logger.")) {
transientSettings.put(entry.getKey(), entry.getValue());
changed = true;
} else {
logger.warn("ignoring transient setting [{}], not dynamically updateable", entry.getKey());
}
}
ImmutableSettings.Builder persistentSettings = ImmutableSettings.settingsBuilder();
persistentSettings.put(currentState.metaData().persistentSettings());
for (Map.Entry<String, String> entry : request.persistentSettings().getAsMap().entrySet()) {
if (MetaData.dynamicSettings().contains(entry.getKey()) || entry.getKey().startsWith("logger.")) {
changed = true;
persistentSettings.put(entry.getKey(), entry.getValue());
} else {
logger.warn("ignoring persistent setting [{}], not dynamically updateable", entry.getKey());
}
}
if (!changed) {
return currentState;
}
MetaData.Builder metaData = MetaData.builder().metaData(currentState.metaData())
.persistentSettings(persistentSettings.build())
.transientSettings(transientSettings.build());
return ClusterState.builder().state(currentState).metaData(metaData).build();
} finally {
latch.countDown();
}
}
});
try {
latch.await();
} catch (InterruptedException e) {
failureRef.set(e);
}
if (failureRef.get() != null) {
if (failureRef.get() instanceof ElasticSearchException) {
throw (ElasticSearchException) failureRef.get();
} else {
throw new ElasticSearchException(failureRef.get().getMessage(), failureRef.get());
}
}
return new ClusterUpdateSettingsResponse();
}
}

View File

@ -37,6 +37,8 @@ import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRe
import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingResponse;
import org.elasticsearch.action.admin.cluster.ping.single.SinglePingRequest;
import org.elasticsearch.action.admin.cluster.ping.single.SinglePingResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.action.admin.cluster.health.ClusterHealthRequestBuilder;
@ -47,6 +49,7 @@ import org.elasticsearch.client.action.admin.cluster.node.stats.NodesStatsReques
import org.elasticsearch.client.action.admin.cluster.ping.broadcast.BroadcastPingRequestBuilder;
import org.elasticsearch.client.action.admin.cluster.ping.replication.ReplicationPingRequestBuilder;
import org.elasticsearch.client.action.admin.cluster.ping.single.SinglePingRequestBuilder;
import org.elasticsearch.client.action.admin.cluster.settings.ClusterUpdateSettingsRequestBuilder;
import org.elasticsearch.client.action.admin.cluster.state.ClusterStateRequestBuilder;
/**
@ -103,6 +106,21 @@ public interface ClusterAdminClient {
*/
ClusterStateRequestBuilder prepareState();
/**
* Updates settings in the cluster.
*/
ActionFuture<ClusterUpdateSettingsResponse> updateSettings(ClusterUpdateSettingsRequest request);
/**
* Update settings in the cluster.
*/
void updateSettings(ClusterUpdateSettingsRequest request, ActionListener<ClusterUpdateSettingsResponse> listener);
/**
* Update settings in the cluster.
*/
ClusterUpdateSettingsRequestBuilder prepareUpdateSettings();
/**
* Nodes info of the cluster.
*

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRequest;
import org.elasticsearch.action.admin.cluster.ping.single.SinglePingRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
@ -353,6 +354,10 @@ public class Requests {
return new ClusterStateRequest();
}
public static ClusterUpdateSettingsRequest clusterUpdateSettingsRequest() {
return new ClusterUpdateSettingsRequest();
}
/**
* Creates a cluster health request.
*

View File

@ -0,0 +1,99 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.action.admin.cluster.settings;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.action.admin.cluster.support.BaseClusterRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Map;
/**
*/
public class ClusterUpdateSettingsRequestBuilder extends BaseClusterRequestBuilder<ClusterUpdateSettingsRequest, ClusterUpdateSettingsResponse> {
public ClusterUpdateSettingsRequestBuilder(ClusterAdminClient clusterClient) {
super(clusterClient, new ClusterUpdateSettingsRequest());
}
public ClusterUpdateSettingsRequestBuilder setTransientSettings(Settings settings) {
request.transientSettings(settings);
return this;
}
public ClusterUpdateSettingsRequestBuilder setTransientSettings(Settings.Builder settings) {
request.transientSettings(settings);
return this;
}
public ClusterUpdateSettingsRequestBuilder setTransientSettings(String settings) {
request.transientSettings(settings);
return this;
}
public ClusterUpdateSettingsRequestBuilder setTransientSettings(Map settings) {
request.transientSettings(settings);
return this;
}
public ClusterUpdateSettingsRequestBuilder setPersistentSettings(Settings settings) {
request.persistentSettings(settings);
return this;
}
public ClusterUpdateSettingsRequestBuilder setPersistentSettings(Settings.Builder settings) {
request.persistentSettings(settings);
return this;
}
public ClusterUpdateSettingsRequestBuilder setPersistentSettings(String settings) {
request.persistentSettings(settings);
return this;
}
public ClusterUpdateSettingsRequestBuilder setPersistentSettings(Map settings) {
request.persistentSettings(settings);
return this;
}
/**
* Sets the master node timeout in case the master has not yet been discovered.
*/
public ClusterUpdateSettingsRequestBuilder setMasterNodeTimeout(TimeValue timeout) {
request.masterNodeTimeout(timeout);
return this;
}
/**
* Sets the master node timeout in case the master has not yet been discovered.
*/
public ClusterUpdateSettingsRequestBuilder setMasterNodeTimeout(String timeout) {
request.masterNodeTimeout(timeout);
return this;
}
@Override protected void doExecute(ActionListener<ClusterUpdateSettingsResponse> listener) {
client.updateSettings(request, listener);
}
}

View File

@ -45,6 +45,9 @@ import org.elasticsearch.action.admin.cluster.ping.replication.TransportReplicat
import org.elasticsearch.action.admin.cluster.ping.single.SinglePingRequest;
import org.elasticsearch.action.admin.cluster.ping.single.SinglePingResponse;
import org.elasticsearch.action.admin.cluster.ping.single.TransportSinglePingAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
@ -65,6 +68,8 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement
private final TransportClusterStateAction clusterStateAction;
private final TransportClusterUpdateSettingsAction clusterUpdateSettingsAction;
private final TransportSinglePingAction singlePingAction;
private final TransportBroadcastPingAction broadcastPingAction;
@ -80,12 +85,13 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement
private final TransportNodesRestartAction nodesRestart;
@Inject public NodeClusterAdminClient(Settings settings, ThreadPool threadPool,
TransportClusterHealthAction clusterHealthAction, TransportClusterStateAction clusterStateAction,
TransportClusterHealthAction clusterHealthAction, TransportClusterStateAction clusterStateAction, TransportClusterUpdateSettingsAction clusterUpdateSettingsAction,
TransportSinglePingAction singlePingAction, TransportBroadcastPingAction broadcastPingAction, TransportReplicationPingAction replicationPingAction,
TransportNodesInfoAction nodesInfoAction, TransportNodesShutdownAction nodesShutdown, TransportNodesRestartAction nodesRestart, TransportNodesStatsAction nodesStatsAction) {
this.threadPool = threadPool;
this.clusterHealthAction = clusterHealthAction;
this.clusterStateAction = clusterStateAction;
this.clusterUpdateSettingsAction = clusterUpdateSettingsAction;
this.nodesInfoAction = nodesInfoAction;
this.nodesShutdown = nodesShutdown;
this.nodesRestart = nodesRestart;
@ -115,6 +121,14 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement
clusterStateAction.execute(request, listener);
}
@Override public ActionFuture<ClusterUpdateSettingsResponse> updateSettings(ClusterUpdateSettingsRequest request) {
return clusterUpdateSettingsAction.execute(request);
}
@Override public void updateSettings(ClusterUpdateSettingsRequest request, ActionListener<ClusterUpdateSettingsResponse> listener) {
clusterUpdateSettingsAction.execute(request, listener);
}
@Override public ActionFuture<SinglePingResponse> ping(SinglePingRequest request) {
return singlePingAction.execute(request);
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.client.action.admin.cluster.node.stats.NodesStatsReques
import org.elasticsearch.client.action.admin.cluster.ping.broadcast.BroadcastPingRequestBuilder;
import org.elasticsearch.client.action.admin.cluster.ping.replication.ReplicationPingRequestBuilder;
import org.elasticsearch.client.action.admin.cluster.ping.single.SinglePingRequestBuilder;
import org.elasticsearch.client.action.admin.cluster.settings.ClusterUpdateSettingsRequestBuilder;
import org.elasticsearch.client.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.client.internal.InternalClusterAdminClient;
@ -43,6 +44,10 @@ public abstract class AbstractClusterAdminClient implements InternalClusterAdmin
return new ClusterStateRequestBuilder(this);
}
@Override public ClusterUpdateSettingsRequestBuilder prepareUpdateSettings() {
return new ClusterUpdateSettingsRequestBuilder(this);
}
@Override public NodesInfoRequestBuilder prepareNodesInfo(String... nodesIds) {
return new NodesInfoRequestBuilder(this).setNodesIds(nodesIds);
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.client.transport.action.admin.cluster.node.stats.Client
import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.single.ClientTransportSinglePingAction;
import org.elasticsearch.client.transport.action.admin.cluster.settings.ClientTransportClusterUpdateSettingsAction;
import org.elasticsearch.client.transport.action.admin.cluster.state.ClientTransportClusterStateAction;
import org.elasticsearch.client.transport.action.admin.indices.alias.ClientTransportIndicesAliasesAction;
import org.elasticsearch.client.transport.action.admin.indices.analyze.ClientTransportAnalyzeAction;
@ -105,5 +106,6 @@ public class ClientTransportActionModule extends AbstractModule {
bind(ClientTransportBroadcastPingAction.class).asEagerSingleton();
bind(ClientTransportClusterStateAction.class).asEagerSingleton();
bind(ClientTransportClusterHealthAction.class).asEagerSingleton();
bind(ClientTransportClusterUpdateSettingsAction.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport.action.admin.cluster.settings;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.client.transport.action.support.BaseClientTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportService;
/**
*/
public class ClientTransportClusterUpdateSettingsAction extends BaseClientTransportAction<ClusterUpdateSettingsRequest, ClusterUpdateSettingsResponse> {
@Inject public ClientTransportClusterUpdateSettingsAction(Settings settings, TransportService transportService) {
super(settings, transportService, ClusterUpdateSettingsResponse.class);
}
@Override protected String action() {
return TransportActions.Admin.Cluster.UPDATE_SETTINGS;
}
}

View File

@ -38,6 +38,8 @@ import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRe
import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingResponse;
import org.elasticsearch.action.admin.cluster.ping.single.SinglePingRequest;
import org.elasticsearch.action.admin.cluster.ping.single.SinglePingResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.internal.InternalClusterAdminClient;
@ -51,6 +53,7 @@ import org.elasticsearch.client.transport.action.admin.cluster.node.stats.Client
import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.single.ClientTransportSinglePingAction;
import org.elasticsearch.client.transport.action.admin.cluster.settings.ClientTransportClusterUpdateSettingsAction;
import org.elasticsearch.client.transport.action.admin.cluster.state.ClientTransportClusterStateAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
@ -70,6 +73,8 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
private final ClientTransportClusterStateAction clusterStateAction;
private final ClientTransportClusterUpdateSettingsAction clusterUpdateSettingsAction;
private final ClientTransportSinglePingAction singlePingAction;
private final ClientTransportReplicationPingAction replicationPingAction;
@ -85,13 +90,14 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
private final ClientTransportNodesRestartAction nodesRestartAction;
@Inject public InternalTransportClusterAdminClient(Settings settings, TransportClientNodesService nodesService, ThreadPool threadPool,
ClientTransportClusterHealthAction clusterHealthAction, ClientTransportClusterStateAction clusterStateAction,
ClientTransportClusterHealthAction clusterHealthAction, ClientTransportClusterStateAction clusterStateAction, ClientTransportClusterUpdateSettingsAction clusterUpdateSettingsAction,
ClientTransportSinglePingAction singlePingAction, ClientTransportReplicationPingAction replicationPingAction, ClientTransportBroadcastPingAction broadcastPingAction,
ClientTransportNodesInfoAction nodesInfoAction, ClientTransportNodesShutdownAction nodesShutdownAction, ClientTransportNodesRestartAction nodesRestartAction, ClientTransportNodesStatsAction nodesStatsAction) {
this.nodesService = nodesService;
this.threadPool = threadPool;
this.clusterHealthAction = clusterHealthAction;
this.clusterStateAction = clusterStateAction;
this.clusterUpdateSettingsAction = clusterUpdateSettingsAction;
this.nodesInfoAction = nodesInfoAction;
this.nodesShutdownAction = nodesShutdownAction;
this.nodesRestartAction = nodesRestartAction;
@ -137,6 +143,22 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
}, listener);
}
@Override public ActionFuture<ClusterUpdateSettingsResponse> updateSettings(final ClusterUpdateSettingsRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<ClusterUpdateSettingsResponse>>() {
@Override public ActionFuture<ClusterUpdateSettingsResponse> doWithNode(DiscoveryNode node) throws ElasticSearchException {
return clusterUpdateSettingsAction.execute(node, request);
}
});
}
@Override public void updateSettings(final ClusterUpdateSettingsRequest request, final ActionListener<ClusterUpdateSettingsResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<ClusterUpdateSettingsResponse>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<ClusterUpdateSettingsResponse> listener) throws ElasticSearchException {
clusterUpdateSettingsAction.execute(node, request, listener);
}
}, listener);
}
@Override public ActionFuture<SinglePingResponse> ping(final SinglePingRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<SinglePingResponse>>() {
@Override public ActionFuture<SinglePingResponse> doWithNode(DiscoveryNode node) throws ElasticSearchException {

View File

@ -19,13 +19,25 @@
package org.elasticsearch.cluster;
import org.elasticsearch.cluster.action.index.*;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeAliasesUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.metadata.MetaDataStateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule;
import org.elasticsearch.cluster.routing.operation.OperationRoutingModule;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.cluster.settings.ClusterSettingsService;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
@ -50,6 +62,9 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
@Override
protected void configure() {
bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton();
bind(ClusterSettingsService.class).asEagerSingleton();
bind(MetaDataCreateIndexService.class).asEagerSingleton();
bind(MetaDataDeleteIndexService.class).asEagerSingleton();
bind(MetaDataStateIndexService.class).asEagerSingleton();

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.set.hash.THashSet;
import org.elasticsearch.common.util.concurrent.Immutable;
@ -43,6 +44,7 @@ import org.elasticsearch.indices.IndexMissingException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -59,10 +61,26 @@ import static org.elasticsearch.common.settings.ImmutableSettings.*;
@Immutable
public class MetaData implements Iterable<IndexMetaData> {
private static ImmutableSet<String> dynamicSettings = ImmutableSet.<String>builder()
.build();
public static ImmutableSet<String> dynamicSettings() {
return dynamicSettings;
}
public static synchronized void addDynamicSettings(String... settings) {
HashSet<String> updatedSettings = new HashSet<String>(dynamicSettings);
updatedSettings.addAll(Arrays.asList(settings));
dynamicSettings = ImmutableSet.copyOf(updatedSettings);
}
public static final MetaData EMPTY_META_DATA = newMetaDataBuilder().build();
private final long version;
private final Settings transientSettings;
private final Settings persistentSettings;
private final Settings settings;
private final ImmutableMap<String, IndexMetaData> indices;
private final ImmutableMap<String, IndexTemplateMetaData> templates;
@ -80,8 +98,11 @@ public class MetaData implements Iterable<IndexMetaData> {
private final ImmutableMap<String, String[]> aliasAndIndexToIndexMap;
private MetaData(long version, ImmutableMap<String, IndexMetaData> indices, ImmutableMap<String, IndexTemplateMetaData> templates) {
MetaData(long version, Settings transientSettings, Settings persistentSettings, ImmutableMap<String, IndexMetaData> indices, ImmutableMap<String, IndexTemplateMetaData> templates) {
this.version = version;
this.transientSettings = transientSettings;
this.persistentSettings = persistentSettings;
this.settings = ImmutableSettings.settingsBuilder().put(persistentSettings).put(transientSettings).build();
this.indices = ImmutableMap.copyOf(indices);
this.templates = templates;
int totalNumberOfShards = 0;
@ -194,6 +215,21 @@ public class MetaData implements Iterable<IndexMetaData> {
return this.version;
}
/**
* Returns the merges transient and persistent settings.
*/
public Settings settings() {
return this.settings;
}
public Settings transientSettings() {
return this.transientSettings;
}
public Settings persistentSettings() {
return this.persistentSettings;
}
public ImmutableMap<String, ImmutableMap<String, AliasMetaData>> aliases() {
return this.aliases;
}
@ -601,11 +637,16 @@ public class MetaData implements Iterable<IndexMetaData> {
private long version;
private Settings transientSettings = ImmutableSettings.Builder.EMPTY_SETTINGS;
private Settings persistentSettings = ImmutableSettings.Builder.EMPTY_SETTINGS;
private MapBuilder<String, IndexMetaData> indices = newMapBuilder();
private MapBuilder<String, IndexTemplateMetaData> templates = newMapBuilder();
public Builder metaData(MetaData metaData) {
this.transientSettings = metaData.transientSettings;
this.persistentSettings = metaData.persistentSettings;
this.version = metaData.version;
this.indices.putAll(metaData.indices);
this.templates.putAll(metaData.templates);
@ -674,13 +715,23 @@ public class MetaData implements Iterable<IndexMetaData> {
return this;
}
public Builder transientSettings(Settings settings) {
this.transientSettings = settings;
return this;
}
public Builder persistentSettings(Settings settings) {
this.persistentSettings = settings;
return this;
}
public Builder version(long version) {
this.version = version;
return this;
}
public MetaData build() {
return new MetaData(version, indices.immutableMap(), templates.immutableMap());
return new MetaData(version, transientSettings, persistentSettings, indices.immutableMap(), templates.immutableMap());
}
public static String toXContent(MetaData metaData) throws IOException {
@ -694,6 +745,14 @@ public class MetaData implements Iterable<IndexMetaData> {
public static void toXContent(MetaData metaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("meta-data");
if (!metaData.persistentSettings().getAsMap().isEmpty()) {
builder.startObject("settings");
for (Map.Entry<String, String> entry : metaData.persistentSettings().getAsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
}
builder.startObject("templates");
for (IndexTemplateMetaData template : metaData.templates().values()) {
IndexTemplateMetaData.Builder.toXContent(template, builder, params);
@ -727,7 +786,16 @@ public class MetaData implements Iterable<IndexMetaData> {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if ("indices".equals(currentFieldName)) {
if ("settings".equals(currentFieldName)) {
ImmutableSettings.Builder settingsBuilder = settingsBuilder();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
String key = parser.currentName();
token = parser.nextToken();
String value = parser.text();
settingsBuilder.put(key, value);
}
builder.persistentSettings(settingsBuilder.build());
} else if ("indices".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
builder.put(IndexMetaData.Builder.fromXContent(parser));
}
@ -744,6 +812,8 @@ public class MetaData implements Iterable<IndexMetaData> {
public static MetaData readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
builder.version = in.readLong();
builder.transientSettings(readSettingsFromStream(in));
builder.persistentSettings(readSettingsFromStream(in));
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(IndexMetaData.Builder.readFrom(in));
@ -757,6 +827,8 @@ public class MetaData implements Iterable<IndexMetaData> {
public static void writeTo(MetaData metaData, StreamOutput out) throws IOException {
out.writeLong(metaData.version);
writeSettingsToStream(metaData.transientSettings(), out);
writeSettingsToStream(metaData.persistentSettings(), out);
out.writeVInt(metaData.indices.size());
for (IndexMetaData indexMetaData : metaData) {
IndexMetaData.Builder.writeTo(indexMetaData, out);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.settings.ClusterSettingsService;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -36,11 +37,11 @@ public class NodeAllocations extends NodeAllocation {
private final NodeAllocation[] allocations;
public NodeAllocations(Settings settings) {
public NodeAllocations(Settings settings, ClusterSettingsService clusterSettingsService) {
this(settings, ImmutableSet.<NodeAllocation>builder()
.add(new SameShardNodeAllocation(settings))
.add(new ReplicaAfterPrimaryActiveNodeAllocation(settings))
.add(new ThrottlingNodeAllocation(settings))
.add(new ThrottlingNodeAllocation(settings, clusterSettingsService))
.add(new RebalanceOnlyWhenActiveNodeAllocation(settings))
.add(new ClusterRebalanceNodeAllocation(settings))
.add(new ConcurrentRebalanceNodeAllocation(settings))

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.settings.ClusterSettingsService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -52,7 +53,7 @@ public class ShardsAllocation extends AbstractComponent {
}
public ShardsAllocation(Settings settings) {
this(settings, new NodeAllocations(settings));
this(settings, new NodeAllocations(settings, new ClusterSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)));
}
@Inject public ShardsAllocation(Settings settings, NodeAllocations nodeAllocations) {

View File

@ -19,10 +19,12 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.settings.ClusterSettingsService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -31,15 +33,24 @@ import org.elasticsearch.common.settings.Settings;
*/
public class ThrottlingNodeAllocation extends NodeAllocation {
private final int primariesInitialRecoveries;
private final int concurrentRecoveries;
static {
MetaData.addDynamicSettings(
"cluster.routing.allocation.node_initial_primaries_recoveries",
"cluster.routing.allocation.node_concurrent_recoveries"
);
}
@Inject public ThrottlingNodeAllocation(Settings settings) {
private volatile int primariesInitialRecoveries;
private volatile int concurrentRecoveries;
@Inject public ThrottlingNodeAllocation(Settings settings, ClusterSettingsService clusterSettingsService) {
super(settings);
this.primariesInitialRecoveries = componentSettings.getAsInt("node_initial_primaries_recoveries", 4);
this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", componentSettings.getAsInt("node_concurrent_recoveries", 2));
logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries);
clusterSettingsService.addListener(new ApplySettings());
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
@ -83,4 +94,20 @@ public class ThrottlingNodeAllocation extends NodeAllocation {
return Decision.YES;
}
}
class ApplySettings implements ClusterSettingsService.Listener {
@Override public void onRefreshSettings(Settings settings) {
int primariesInitialRecoveries = settings.getAsInt("cluster.routing.allocation.node_initial_primaries_recoveries", ThrottlingNodeAllocation.this.primariesInitialRecoveries);
if (primariesInitialRecoveries != ThrottlingNodeAllocation.this.primariesInitialRecoveries) {
logger.info("updating [cluster.routing.allocation.node_initial_primaries_recoveries] from [{}] to [{}]", ThrottlingNodeAllocation.this.primariesInitialRecoveries, primariesInitialRecoveries);
ThrottlingNodeAllocation.this.primariesInitialRecoveries = primariesInitialRecoveries;
}
int concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.node_concurrent_recoveries", ThrottlingNodeAllocation.this.concurrentRecoveries);
if (concurrentRecoveries != ThrottlingNodeAllocation.this.concurrentRecoveries) {
logger.info("updating [cluster.routing.allocation.node_concurrent_recoveries] from [{}] to [{}]", ThrottlingNodeAllocation.this.concurrentRecoveries, concurrentRecoveries);
ThrottlingNodeAllocation.this.concurrentRecoveries = concurrentRecoveries;
}
}
}
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.settings.ClusterSettingsService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -70,6 +71,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final TransportService transportService;
private final ClusterSettingsService clusterSettingsService;
private final TimeValue reconnectInterval;
private volatile ExecutorService updateTasksExecutor;
@ -85,16 +88,24 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private volatile ScheduledFuture reconnectToNodes;
@Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, OperationRouting operationRouting, TransportService transportService, ThreadPool threadPool) {
@Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, OperationRouting operationRouting, TransportService transportService,
ClusterSettingsService clusterSettingsService, ThreadPool threadPool) {
super(settings);
this.operationRouting = operationRouting;
this.transportService = transportService;
this.discoveryService = discoveryService;
this.threadPool = threadPool;
this.clusterSettingsService = clusterSettingsService;
this.clusterSettingsService.setClusterService(this);
this.reconnectInterval = componentSettings.getAsTime("reconnect_interval", TimeValue.timeValueSeconds(10));
}
public ClusterSettingsService settingsService() {
return this.clusterSettingsService;
}
public void addInitialStateBlock(ClusterBlock block) throws ElasticSearchIllegalStateException {
if (lifecycle.started()) {
throw new ElasticSearchIllegalStateException("can't set initial block when started");

View File

@ -0,0 +1,99 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.settings;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*/
public class ClusterSettingsService extends AbstractComponent implements ClusterStateListener {
private volatile Settings lastSettingsApplied;
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
@Inject public ClusterSettingsService(Settings settings) {
super(settings);
}
// inject it as a member, so we won't get into possible cyclic problems
public void setClusterService(ClusterService clusterService) {
clusterService.add(this);
}
@Override public void clusterChanged(ClusterChangedEvent event) {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (event.state().blocks().disableStatePersistence()) {
return;
}
if (!event.metaDataChanged()) {
// nothing changed in the metadata, no need to check
return;
}
if (lastSettingsApplied != null && event.state().metaData().settings().equals(lastSettingsApplied)) {
// nothing changed in the settings, ignore
return;
}
for (Listener listener : listeners) {
try {
listener.onRefreshSettings(event.state().metaData().settings());
} catch (Exception e) {
logger.warn("failed to refresh settings for [{}]", e, listener);
}
}
try {
for (Map.Entry<String, String> entry : event.state().metaData().settings().getAsMap().entrySet()) {
if (entry.getKey().startsWith("logger.")) {
String component = entry.getKey().substring("logger.".length());
ESLoggerFactory.getLogger(component, entry.getValue()).setLevel(entry.getValue());
}
}
} catch (Exception e) {
logger.warn("failed to refresh settings for [{}]", e, "logger");
}
lastSettingsApplied = event.state().metaData().settings();
}
public void addListener(Listener listener) {
this.listeners.add(listener);
}
public void removeListener(Listener listener) {
this.listeners.remove(listener);
}
public static interface Listener {
void onRefreshSettings(Settings settings);
}
}

View File

@ -28,6 +28,8 @@ public interface ESLogger {
String getName();
void setLevel(String level);
/**
* Returns {@code true} if a TRACE level message is logged.
*/

View File

@ -39,6 +39,20 @@ public class JdkESLogger extends AbstractESLogger {
this.name = name;
}
@Override public void setLevel(String level) {
if ("error".equalsIgnoreCase(level)) {
logger.setLevel(Level.SEVERE);
} else if ("warn".equalsIgnoreCase(level)) {
logger.setLevel(Level.WARNING);
} else if ("info".equalsIgnoreCase(level)) {
logger.setLevel(Level.INFO);
} else if ("debug".equalsIgnoreCase(level)) {
logger.setLevel(Level.FINE);
} else if ("trace".equalsIgnoreCase(level)) {
logger.setLevel(Level.FINE);
}
}
@Override public String getName() {
return logger.getName();
}

View File

@ -35,6 +35,20 @@ public class Log4jESLogger extends AbstractESLogger {
this.logger = logger;
}
public void setLevel(String level) {
if ("error".equalsIgnoreCase(level)) {
logger.setLevel(Level.ERROR);
} else if ("warn".equalsIgnoreCase(level)) {
logger.setLevel(Level.WARN);
} else if ("info".equalsIgnoreCase(level)) {
logger.setLevel(Level.INFO);
} else if ("debug".equalsIgnoreCase(level)) {
logger.setLevel(Level.DEBUG);
} else if ("trace".equalsIgnoreCase(level)) {
logger.setLevel(Level.TRACE);
}
}
@Override public String getName() {
return logger.getName();
}

View File

@ -34,6 +34,10 @@ public class Slf4jESLogger extends AbstractESLogger {
this.logger = logger;
}
@Override public void setLevel(String level) {
// can't set it in slf4j...
}
@Override public String getName() {
return logger.getName();
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.settings.ClusterSettingsService;
import org.elasticsearch.common.UUID;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -108,7 +109,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private final AtomicBoolean initialStateSent = new AtomicBoolean();
@Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService,
TransportService transportService, ClusterService clusterService, ClusterSettingsService clusterSettingsService,
ZenPingService pingService) {
super(settings);
this.clusterName = clusterName;
@ -123,7 +124,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
logger.debug("using ping.timeout [{}]", pingTimeout);
this.electMaster = new ElectMasterService(settings);
this.electMaster = new ElectMasterService(settings, clusterSettingsService);
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
this.masterFD.addListener(new MasterNodeFailureListener());

View File

@ -19,7 +19,9 @@
package org.elasticsearch.discovery.zen.elect;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.settings.ClusterSettingsService;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
@ -34,13 +36,19 @@ import java.util.List;
*/
public class ElectMasterService extends AbstractComponent {
static {
MetaData.addDynamicSettings("discovery.zen.minimum_master_nodes");
}
private final NodeComparator nodeComparator = new NodeComparator();
private final int minimumMasterNodes;
private volatile int minimumMasterNodes;
public ElectMasterService(Settings settings) {
public ElectMasterService(Settings settings, ClusterSettingsService clusterSettingsService) {
super(settings);
this.minimumMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1);
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
clusterSettingsService.addListener(new ApplySettings());
}
public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) {
@ -103,6 +111,16 @@ public class ElectMasterService extends AbstractComponent {
return possibleNodes;
}
class ApplySettings implements ClusterSettingsService.Listener {
@Override public void onRefreshSettings(Settings settings) {
int minimumMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", ElectMasterService.this.minimumMasterNodes);
if (minimumMasterNodes != ElectMasterService.this.minimumMasterNodes) {
logger.info("updating [discovery.zen.minimum_master_nodes] from [{}] to [{}]", ElectMasterService.this.minimumMasterNodes, minimumMasterNodes);
ElectMasterService.this.minimumMasterNodes = minimumMasterNodes;
}
}
}
private static class NodeComparator implements Comparator<DiscoveryNode> {
@Override public int compare(DiscoveryNode o1, DiscoveryNode o2) {

View File

@ -250,6 +250,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
.metaData(currentState.metaData());
metaDataBuilder.version(recoveredState.version());
metaDataBuilder.persistentSettings(recoveredState.metaData().persistentSettings());
// add the index templates
for (Map.Entry<String, IndexTemplateMetaData> entry : recoveredState.metaData().templates().entrySet()) {
metaDataBuilder.put(entry.getValue());

View File

@ -21,6 +21,8 @@ package org.elasticsearch.index.shard.recovery;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.settings.ClusterSettingsService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Sets;
@ -42,13 +44,17 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -59,6 +65,10 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class RecoverySource extends AbstractComponent {
static {
MetaData.addDynamicSettings("index.shard.recovery.concurrent_streams");
}
public static class Actions {
public static final String START_RECOVERY = "index/shard/recovery/startRecovery";
}
@ -77,16 +87,18 @@ public class RecoverySource extends AbstractComponent {
private final int translogOps;
private final ByteSizeValue translogSize;
private final ExecutorService concurrentStreamPool;
private volatile int concurrentStreams;
private final ThreadPoolExecutor concurrentStreamPool;
@Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService) {
@Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
ClusterSettingsService clusterSettingsService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.indicesService = indicesService;
int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5);
this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5);
this.concurrentStreamPool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB));
this.translogOps = componentSettings.getAsInt("translog_ops", 1000);
@ -97,6 +109,8 @@ public class RecoverySource extends AbstractComponent {
concurrentStreams, fileChunkSize, translogSize, translogOps, compress);
transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler());
clusterSettingsService.addListener(new ApplySettings());
}
public void close() {
@ -297,5 +311,16 @@ public class RecoverySource extends AbstractComponent {
channel.sendResponse(response);
}
}
class ApplySettings implements ClusterSettingsService.Listener {
@Override public void onRefreshSettings(Settings settings) {
int concurrentStreams = settings.getAsInt("index.shard.recovery.concurrent_streams", RecoverySource.this.concurrentStreams);
if (concurrentStreams != RecoverySource.this.concurrentStreams) {
logger.info("updating [index.shard.recovery.concurrent_streams] from [{}] to [{}]", RecoverySource.this.concurrentStreams, concurrentStreams);
RecoverySource.this.concurrentStreams = concurrentStreams;
RecoverySource.this.concurrentStreamPool.setMaximumPoolSize(concurrentStreams);
}
}
}
}

View File

@ -51,12 +51,6 @@ public class XContentRestResponse extends AbstractRestResponse {
private final XContentBuilder builder;
public XContentRestResponse(RestRequest request, RestStatus status) {
this.builder = null;
this.status = status;
this.prefixUtf8Result = startJsonp(request);
}
public XContentRestResponse(RestRequest request, RestStatus status, XContentBuilder builder) throws IOException {
this.builder = builder;
this.status = status;

View File

@ -30,6 +30,8 @@ import org.elasticsearch.rest.action.admin.cluster.node.stats.RestNodesStatsActi
import org.elasticsearch.rest.action.admin.cluster.ping.broadcast.RestBroadcastPingAction;
import org.elasticsearch.rest.action.admin.cluster.ping.replication.RestReplicationPingAction;
import org.elasticsearch.rest.action.admin.cluster.ping.single.RestSinglePingAction;
import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterGetSettingsAction;
import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterUpdateSettingsAction;
import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction;
import org.elasticsearch.rest.action.admin.indices.alias.RestGetIndicesAliasesAction;
import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction;
@ -92,6 +94,8 @@ public class RestActionModule extends AbstractModule {
bind(RestNodesRestartAction.class).asEagerSingleton();
bind(RestClusterStateAction.class).asEagerSingleton();
bind(RestClusterHealthAction.class).asEagerSingleton();
bind(RestClusterUpdateSettingsAction.class).asEagerSingleton();
bind(RestClusterGetSettingsAction.class).asEagerSingleton();
bind(RestSinglePingAction.class).asEagerSingleton();
bind(RestBroadcastPingAction.class).asEagerSingleton();

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.settings;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.XContentRestResponse;
import org.elasticsearch.rest.XContentThrowableRestResponse;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
import java.util.Map;
/**
*/
public class RestClusterGetSettingsAction extends BaseRestHandler {
@Inject public RestClusterGetSettingsAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/settings", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest()
.filterRoutingTable(true)
.filterNodes(true);
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override public void onResponse(ClusterStateResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
builder.startObject("persistent");
for (Map.Entry<String, String> entry : response.state().metaData().persistentSettings().getAsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.startObject("transient");
for (Map.Entry<String, String> entry : response.state().metaData().transientSettings().getAsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder));
} catch (Exception e) {
onFailure(e);
}
}
@Override public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.settings;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.StringRestResponse;
import org.elasticsearch.rest.XContentThrowableRestResponse;
import java.io.IOException;
import java.util.Map;
/**
*/
public class RestClusterUpdateSettingsAction extends BaseRestHandler {
@Inject public RestClusterUpdateSettingsAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(RestRequest.Method.PUT, "/_cluster/settings", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest();
try {
XContentType xContentType = XContentFactory.xContentType(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
Map<String, Object> source = XContentFactory.xContent(xContentType)
.createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapAndClose();
if (source.containsKey("transient")) {
clusterUpdateSettingsRequest.transientSettings((Map) source.get("transient"));
}
if (source.containsKey("persistent")) {
clusterUpdateSettingsRequest.persistentSettings((Map) source.get("persistent"));
}
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.warn("Failed to send response", e1);
}
return;
}
client.admin().cluster().updateSettings(clusterUpdateSettingsRequest, new ActionListener<ClusterUpdateSettingsResponse>() {
@Override public void onResponse(ClusterUpdateSettingsResponse response) {
try {
channel.sendResponse(new StringRestResponse(RestStatus.OK));
} catch (Exception e) {
onFailure(e);
}
}
@Override public void onFailure(Throwable e) {
if (logger.isDebugEnabled()) {
logger.debug("failed to handle cluster state", e);
}
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.discovery.ec2;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.settings.ClusterSettingsService;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -38,8 +39,8 @@ import org.elasticsearch.transport.TransportService;
public class Ec2Discovery extends ZenDiscovery {
@Inject public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, ZenPingService pingService, AwsEc2Service ec2Service) {
super(settings, clusterName, threadPool, transportService, clusterService, pingService);
ClusterService clusterService, ClusterSettingsService clusterSettingsService, ZenPingService pingService, AwsEc2Service ec2Service) {
super(settings, clusterName, threadPool, transportService, clusterService, clusterSettingsService, pingService);
if (settings.getAsBoolean("cloud.enabled", true)) {
ImmutableList<? extends ZenPing> zenPings = pingService.zenPings();
UnicastZenPing unicastZenPing = null;