Added support for acknowledgements to update cluster settings api

As a side note, the internal reroute call is now part of the ack mechanism. That means that if the response contains acknowledged flag, the internal reroute that was eventually issued was acknowledged too. Also, even if the request is not acknowledged, the reroute is issued before returning, which means that there is no need to manually call reroute afterwards to make sure the new settings are immediately applied.

Closes #3995
This commit is contained in:
Luca Cavanna 2013-10-28 17:46:25 +01:00
parent d10bd2495d
commit 9e6fab3a6d
8 changed files with 221 additions and 84 deletions

View File

@ -20,8 +20,9 @@
package org.elasticsearch.action.admin.cluster.settings;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -39,8 +40,9 @@ import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFr
import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream;
/**
* Request for an update cluster settings action
*/
public class ClusterUpdateSettingsRequest extends MasterNodeOperationRequest<ClusterUpdateSettingsRequest> {
public class ClusterUpdateSettingsRequest extends AcknowledgedRequest<ClusterUpdateSettingsRequest> {
private Settings transientSettings = EMPTY_SETTINGS;
private Settings persistentSettings = EMPTY_SETTINGS;
@ -65,21 +67,34 @@ public class ClusterUpdateSettingsRequest extends MasterNodeOperationRequest<Clu
return persistentSettings;
}
/**
* Sets the transient settings to be updated. They will not survive a full cluster restart
*/
public ClusterUpdateSettingsRequest transientSettings(Settings settings) {
this.transientSettings = settings;
return this;
}
/**
* Sets the transient settings to be updated. They will not survive a full cluster restart
*/
public ClusterUpdateSettingsRequest transientSettings(Settings.Builder settings) {
this.transientSettings = settings.build();
return this;
}
/**
* Sets the source containing the transient settings to be updated. They will not survive a full cluster restart
*/
public ClusterUpdateSettingsRequest transientSettings(String source) {
this.transientSettings = ImmutableSettings.settingsBuilder().loadFromSource(source).build();
return this;
}
/**
* Sets the transient settings to be updated. They will not survive a full cluster restart
*/
@SuppressWarnings("unchecked")
public ClusterUpdateSettingsRequest transientSettings(Map source) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
@ -91,21 +106,34 @@ public class ClusterUpdateSettingsRequest extends MasterNodeOperationRequest<Clu
return this;
}
/**
* Sets the persistent settings to be updated. They will get applied cross restarts
*/
public ClusterUpdateSettingsRequest persistentSettings(Settings settings) {
this.persistentSettings = settings;
return this;
}
/**
* Sets the persistent settings to be updated. They will get applied cross restarts
*/
public ClusterUpdateSettingsRequest persistentSettings(Settings.Builder settings) {
this.persistentSettings = settings.build();
return this;
}
/**
* Sets the source containing the persistent settings to be updated. They will get applied cross restarts
*/
public ClusterUpdateSettingsRequest persistentSettings(String source) {
this.persistentSettings = ImmutableSettings.settingsBuilder().loadFromSource(source).build();
return this;
}
/**
* Sets the persistent settings to be updated. They will get applied cross restarts
*/
@SuppressWarnings("unchecked")
public ClusterUpdateSettingsRequest persistentSettings(Map source) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
@ -117,12 +145,12 @@ public class ClusterUpdateSettingsRequest extends MasterNodeOperationRequest<Clu
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
transientSettings = readSettingsFromStream(in);
persistentSettings = readSettingsFromStream(in);
readTimeout(in, Version.V_0_90_6);
}
@Override
@ -130,5 +158,6 @@ public class ClusterUpdateSettingsRequest extends MasterNodeOperationRequest<Clu
super.writeTo(out);
writeSettingsToStream(transientSettings, out);
writeSettingsToStream(persistentSettings, out);
writeTimeout(out, Version.V_0_90_6);
}
}

View File

@ -20,7 +20,7 @@
package org.elasticsearch.action.admin.cluster.settings;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.internal.InternalClusterAdminClient;
import org.elasticsearch.common.settings.Settings;
@ -28,48 +28,73 @@ import org.elasticsearch.common.settings.Settings;
import java.util.Map;
/**
* Builder for a cluster update settings request
*/
public class ClusterUpdateSettingsRequestBuilder extends MasterNodeOperationRequestBuilder<ClusterUpdateSettingsRequest, ClusterUpdateSettingsResponse, ClusterUpdateSettingsRequestBuilder> {
public class ClusterUpdateSettingsRequestBuilder extends AcknowledgedRequestBuilder<ClusterUpdateSettingsRequest, ClusterUpdateSettingsResponse, ClusterUpdateSettingsRequestBuilder> {
public ClusterUpdateSettingsRequestBuilder(ClusterAdminClient clusterClient) {
super((InternalClusterAdminClient) clusterClient, new ClusterUpdateSettingsRequest());
}
/**
* Sets the transient settings to be updated. They will not survive a full cluster restart
*/
public ClusterUpdateSettingsRequestBuilder setTransientSettings(Settings settings) {
request.transientSettings(settings);
return this;
}
/**
* Sets the transient settings to be updated. They will not survive a full cluster restart
*/
public ClusterUpdateSettingsRequestBuilder setTransientSettings(Settings.Builder settings) {
request.transientSettings(settings);
return this;
}
/**
* Sets the source containing the transient settings to be updated. They will not survive a full cluster restart
*/
public ClusterUpdateSettingsRequestBuilder setTransientSettings(String settings) {
request.transientSettings(settings);
return this;
}
/**
* Sets the transient settings to be updated. They will not survive a full cluster restart
*/
public ClusterUpdateSettingsRequestBuilder setTransientSettings(Map settings) {
request.transientSettings(settings);
return this;
}
/**
* Sets the persistent settings to be updated. They will get applied cross restarts
*/
public ClusterUpdateSettingsRequestBuilder setPersistentSettings(Settings settings) {
request.persistentSettings(settings);
return this;
}
/**
* Sets the persistent settings to be updated. They will get applied cross restarts
*/
public ClusterUpdateSettingsRequestBuilder setPersistentSettings(Settings.Builder settings) {
request.persistentSettings(settings);
return this;
}
/**
* Sets the source containing the persistent settings to be updated. They will get applied cross restarts
*/
public ClusterUpdateSettingsRequestBuilder setPersistentSettings(String settings) {
request.persistentSettings(settings);
return this;
}
/**
* Sets the persistent settings to be updated. They will get applied cross restarts
*/
public ClusterUpdateSettingsRequestBuilder setPersistentSettings(Map settings) {
request.persistentSettings(settings);
return this;

View File

@ -19,18 +19,19 @@
package org.elasticsearch.action.admin.cluster.settings;
import java.io.IOException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
/**
* A response for a cluster update settings action.
*/
public class ClusterUpdateSettingsResponse extends ActionResponse {
public class ClusterUpdateSettingsResponse extends AcknowledgedResponse {
Settings transientSettings;
Settings persistentSettings;
@ -40,7 +41,8 @@ public class ClusterUpdateSettingsResponse extends ActionResponse {
this.transientSettings = ImmutableSettings.EMPTY;
}
ClusterUpdateSettingsResponse(Settings transientSettings, Settings persistentSettings) {
ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) {
super(acknowledged);
this.persistentSettings = persistentSettings;
this.transientSettings = transientSettings;
}
@ -50,6 +52,7 @@ public class ClusterUpdateSettingsResponse extends ActionResponse {
super.readFrom(in);
transientSettings = ImmutableSettings.readSettingsFromStream(in);
persistentSettings = ImmutableSettings.readSettingsFromStream(in);
readAcknowledged(in, Version.V_0_90_6);
}
public Settings getTransientSettings() {
@ -65,5 +68,6 @@ public class ClusterUpdateSettingsResponse extends ActionResponse {
super.writeTo(out);
ImmutableSettings.writeSettingsToStream(transientSettings, out);
ImmutableSettings.writeSettingsToStream(persistentSettings, out);
writeAcknowledged(out, Version.V_0_90_6);
}
}

View File

@ -22,16 +22,17 @@ package org.elasticsearch.action.admin.cluster.settings;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -42,6 +43,7 @@ import org.elasticsearch.transport.TransportService;
import java.util.Map;
import static org.elasticsearch.cluster.ClusterState.builder;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
/**
@ -86,7 +88,91 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
final ImmutableSettings.Builder transientUpdates = ImmutableSettings.settingsBuilder();
final ImmutableSettings.Builder persistentUpdates = ImmutableSettings.settingsBuilder();
clusterService.submitStateUpdateTask("cluster_update_settings", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask() {
private volatile boolean changed = false;
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
if (changed) {
reroute(true);
} else {
listener.onResponse(new ClusterUpdateSettingsResponse(true, transientUpdates.build(), persistentUpdates.build()));
}
}
@Override
public void onAckTimeout() {
if (changed) {
reroute(false);
} else {
listener.onResponse(new ClusterUpdateSettingsResponse(false, transientUpdates.build(), persistentUpdates.build()));
}
}
private void reroute(final boolean updateSettingsAcked) {
clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
//we wait for the reroute ack only if the update settings was acknowledged
return updateSettingsAcked;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
//we return when the cluster reroute is acked (the acknowledged flag depends on whether the update settings was acknowledged)
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
}
@Override
public void onAckTimeout() {
//we return when the cluster reroute ack times out (acknowledged false)
listener.onResponse(new ClusterUpdateSettingsResponse(false, transientUpdates.build(), persistentUpdates.build()));
}
@Override
public TimeValue ackTimeout() {
return request.timeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
//if the reroute fails we only log
logger.debug("failed to perform [{}]", t, source);
}
@Override
public ClusterState execute(final ClusterState currentState) {
// now, reroute in case things that require it changed (e.g. number of replicas)
RoutingAllocation.Result routingResult = allocationService.reroute(currentState);
if (!routingResult.changed()) {
return currentState;
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
@Override
public TimeValue ackTimeout() {
return request.timeout();
}
@Override
public TimeValue timeout() {
@ -101,7 +187,6 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
@Override
public ClusterState execute(final ClusterState currentState) {
boolean changed = false;
ImmutableSettings.Builder transientSettings = ImmutableSettings.settingsBuilder();
transientSettings.put(currentState.metaData().transientSettings());
for (Map.Entry<String, String> entry : request.transientSettings().getAsMap().entrySet()) {
@ -152,38 +237,12 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
blocks.removeGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK);
}
return ClusterState.builder().state(currentState).metaData(metaData).blocks(blocks).build();
return builder().state(currentState).metaData(metaData).blocks(blocks).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState == newState) {
// nothing changed...
listener.onResponse(new ClusterUpdateSettingsResponse(transientUpdates.build(), persistentUpdates.build()));
return;
}
// now, reroute
clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", Priority.URGENT, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(final ClusterState currentState) {
try {
// now, reroute in case things change that require it (like number of replicas)
RoutingAllocation.Result routingResult = allocationService.reroute(currentState);
if (!routingResult.changed()) {
return currentState;
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
} finally {
listener.onResponse(new ClusterUpdateSettingsResponse(transientUpdates.build(), persistentUpdates.build()));
}
}
@Override
public void onFailure(String source, Throwable t) {
logger.warn("unexpected failure during [{}]", t, source);
listener.onResponse(new ClusterUpdateSettingsResponse(transientUpdates.build(), persistentUpdates.build()));
}
});
}
});
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.rest.action.admin.cluster.settings;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.client.Client;
@ -29,7 +28,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
import java.util.Map;
@ -48,6 +46,7 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest();
clusterUpdateSettingsRequest.listenerThreaded(false);
clusterUpdateSettingsRequest.timeout(request.paramAsTime("timeout", clusterUpdateSettingsRequest.timeout()));
clusterUpdateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterUpdateSettingsRequest.masterNodeTimeout()));
try {
Map<String, Object> source = XContentFactory.xContent(request.content()).createParser(request.content()).mapAndClose();
@ -66,13 +65,10 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
return;
}
client.admin().cluster().updateSettings(clusterUpdateSettingsRequest, new ActionListener<ClusterUpdateSettingsResponse>() {
@Override
public void onResponse(ClusterUpdateSettingsResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
client.admin().cluster().updateSettings(clusterUpdateSettingsRequest, new AcknowledgedRestResponseActionListener<ClusterUpdateSettingsResponse>(request, channel, logger) {
@Override
protected void addCustomFields(XContentBuilder builder, ClusterUpdateSettingsResponse response) throws IOException {
builder.startObject("persistent");
for (Map.Entry<String, String> entry : response.getPersistentSettings().getAsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
@ -84,13 +80,6 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder));
} catch (Throwable e) {
onFailure(e);
}
}
@Override
@ -98,11 +87,7 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
if (logger.isDebugEnabled()) {
logger.debug("failed to handle cluster state", e);
}
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
super.onFailure(e);
}
});
}

View File

@ -33,10 +33,7 @@ import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -238,4 +235,49 @@ public class AckTests extends AbstractIntegrationTest {
return new MoveAllocationCommand(shardToBeMoved.shardId(), fromNodeId, toNodeId);
}
@Test
public void testClusterUpdateSettingsAcknowledgement() {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_replicas", 0)).get();
ensureGreen();
NodesInfoResponse nodesInfo = client().admin().cluster().prepareNodesInfo().get();
String excludedNodeId = null;
for (NodeInfo nodeInfo : nodesInfo) {
if (nodeInfo.getNode().isDataNode()) {
excludedNodeId = nodesInfo.getAt(0).getNode().id();
break;
}
}
assert excludedNodeId != null;
ClusterUpdateSettingsResponse clusterUpdateSettingsResponse = client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(settingsBuilder().put("cluster.routing.allocation.exclude._id", excludedNodeId)).get();
assertThat(clusterUpdateSettingsResponse.isAcknowledged(), equalTo(true));
assertThat(clusterUpdateSettingsResponse.getTransientSettings().get("cluster.routing.allocation.exclude._id"), equalTo(excludedNodeId));
for (Client client : clients()) {
ClusterState clusterState = client.admin().cluster().prepareState().setLocal(true).get().getState();
assertThat(clusterState.routingNodes().metaData().transientSettings().get("cluster.routing.allocation.exclude._id"), equalTo(excludedNodeId));
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (clusterState.nodes().get(shardRouting.currentNodeId()).id().equals(excludedNodeId)) {
//if the shard is still there it must be relocating and all nodes need to know, since the request was acknowledged
assertThat(shardRouting.relocating(), equalTo(true));
}
}
}
}
}
//let's wait for the relocation to be completed, otherwise there can be issues with after test checks (mock directory wrapper etc.)
waitForRelocation();
//removes the allocation exclude settings
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("cluster.routing.allocation.exclude._id", "")).get();
}
}

View File

@ -205,8 +205,6 @@ public class AwarenessAllocationTests extends AbstractIntegrationTest {
assertThat(counts.containsKey(noZoneNode), equalTo(false));
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.settingsBuilder().put("cluster.routing.allocation.awareness.attributes", "").build()).get();
client().admin().cluster().prepareReroute().get();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes("4").setWaitForActiveShards(10).setWaitForRelocatingShards(0).execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

View File

@ -35,8 +35,6 @@ import org.junit.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@ClusterScope(scope=Scope.TEST, numNodes=0)
public class FilteringAllocationTests extends AbstractIntegrationTest {
@ -65,9 +63,7 @@ public class FilteringAllocationTests extends AbstractIntegrationTest {
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(settingsBuilder().put("cluster.routing.allocation.exclude._name", node_1))
.execute().actionGet();
client().admin().cluster().prepareReroute().get();
ensureGreen();
waitForRelocation();
logger.info("--> verify all are allocated on node1 now");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
@ -140,7 +136,6 @@ public class FilteringAllocationTests extends AbstractIntegrationTest {
client().admin().indices().prepareUpdateSettings("test")
.setSettings(settingsBuilder().put("index.routing.allocation.exclude._name", ""))
.execute().actionGet();
client().admin().cluster().prepareReroute().get();
ensureGreen();