diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java index fc2bf19d0f1..3e4aa30baae 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -67,10 +69,17 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeOperationA @Override protected void masterOperation(final UpdateSettingsRequest request, final ClusterState state, final ActionListener listener) throws ElasticSearchException { - updateSettingsService.updateSettings(request.settings(), request.indices(), request.masterNodeTimeout(), new MetaDataUpdateSettingsService.Listener() { + + UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest() + .indices(request.indices()) + .settings(request.settings()) + .ackTimeout(request.timeout()) + .masterNodeTimeout(request.masterNodeTimeout()); + + updateSettingsService.updateSettings(clusterStateUpdateRequest, new ClusterStateUpdateListener() { @Override - public void onSuccess() { - listener.onResponse(new UpdateSettingsResponse()); + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new UpdateSettingsResponse(response.isAcknowledged())); } @Override diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsClusterStateUpdateRequest.java new file mode 100644 index 00000000000..5130fa67271 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsClusterStateUpdateRequest.java @@ -0,0 +1,65 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.settings; + +import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; +import org.elasticsearch.common.settings.Settings; + +/** + * Cluster state update request that allows to update settings for some indices + */ +public class UpdateSettingsClusterStateUpdateRequest extends ClusterStateUpdateRequest { + private Settings settings; + private String[] indices; + + public UpdateSettingsClusterStateUpdateRequest() { + + } + + /** + * Returns the indices that needs to be updated + */ + public String[] indices() { + return indices; + } + + /** + * Sets the indices to update + */ + public UpdateSettingsClusterStateUpdateRequest indices(String[] indices) { + this.indices = indices; + return this; + } + + /** + * Returns the {@link Settings} to update + */ + public Settings settings() { + return settings; + } + + /** + * Sets the {@link Settings} to update + */ + public UpdateSettingsClusterStateUpdateRequest settings(Settings settings) { + this.settings = settings; + return this; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsRequest.java index f2fcec278ed..d32459cfbb1 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsRequest.java @@ -20,8 +20,9 @@ package org.elasticsearch.action.admin.indices.settings; import org.elasticsearch.ElasticSearchGenerationException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.ImmutableSettings; @@ -39,9 +40,9 @@ import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFr import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream; /** - * + * Request for an update index settings action */ -public class UpdateSettingsRequest extends MasterNodeOperationRequest { +public class UpdateSettingsRequest extends AcknowledgedRequest { private String[] indices; @@ -51,14 +52,14 @@ public class UpdateSettingsRequest extends MasterNodeOperationRequest { +public class UpdateSettingsRequestBuilder extends AcknowledgedRequestBuilder { public UpdateSettingsRequestBuilder(IndicesAdminClient indicesClient, String... indices) { super((InternalIndicesAdminClient) indicesClient, new UpdateSettingsRequest(indices)); } + /** + * Sets the indices the update settings will execute on + */ public UpdateSettingsRequestBuilder setIndices(String... indices) { request.indices(indices); return this; } /** - * The settings update. + * Sets the settings to be updated */ public UpdateSettingsRequestBuilder setSettings(Settings settings) { request.settings(settings); @@ -50,7 +53,7 @@ public class UpdateSettingsRequestBuilder extends MasterNodeOperationRequestBuil } /** - * The settings to update. + * Sets the settings to be updated */ public UpdateSettingsRequestBuilder setSettings(Settings.Builder settings) { request.settings(settings); @@ -58,7 +61,7 @@ public class UpdateSettingsRequestBuilder extends MasterNodeOperationRequestBuil } /** - * The settings to update (either json/yaml/properties format) + * Sets the settings to be updated (either json/yaml/properties format) */ public UpdateSettingsRequestBuilder setSettings(String source) { request.settings(source); @@ -66,7 +69,7 @@ public class UpdateSettingsRequestBuilder extends MasterNodeOperationRequestBuil } /** - * The settings to update (either json/yaml/properties format) + * Sets the settings to be updated (either json/yaml/properties format) */ public UpdateSettingsRequestBuilder setSettings(Map source) { request.settings(source); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsResponse.java index fb9397ceb5a..9ce139477e1 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsResponse.java @@ -19,27 +19,34 @@ package org.elasticsearch.action.admin.indices.settings; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; /** - * A response for a update settings action. + * A response for an update index settings action */ -public class UpdateSettingsResponse extends ActionResponse { +public class UpdateSettingsResponse extends AcknowledgedResponse { UpdateSettingsResponse() { } + UpdateSettingsResponse(boolean acknowledged) { + super(acknowledged); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + readAcknowledged(in, Version.V_0_90_6); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + writeAcknowledged(out, Version.V_0_90_6); } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java index ea5ecbb7006..35155d92c54 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java @@ -26,12 +26,9 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; - /** * A request to delete an index warmer. */ @@ -41,8 +38,6 @@ public class DeleteWarmerRequest extends AcknowledgedRequest { +public class DeleteWarmerRequestBuilder extends AcknowledgedRequestBuilder { public DeleteWarmerRequestBuilder(IndicesAdminClient indicesClient) { super((InternalIndicesAdminClient) indicesClient, new DeleteWarmerRequest()); @@ -48,23 +47,6 @@ public class DeleteWarmerRequestBuilder extends MasterNodeOperationRequestBuilde return this; } - /** - * Sets the maximum wait for acknowledgement from other nodes - */ - public DeleteWarmerRequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); - return this; - } - - /** - * Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public DeleteWarmerRequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return this; - } - @Override protected void doExecute(ActionListener listener) { ((IndicesAdminClient) client).deleteWarmer(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java index 3a3fee6880d..6b83fed11fd 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java @@ -20,6 +20,10 @@ package org.elasticsearch.action.admin.indices.warmer.delete; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; /** * A response for a delete warmer. @@ -33,4 +37,16 @@ public class DeleteWarmerResponse extends AcknowledgedResponse { DeleteWarmerResponse(boolean acknowledged) { super(acknowledged); } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + readAcknowledged(in, null); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeAcknowledged(out, null); + } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java index 69422e954f3..0b59da78b79 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java @@ -41,9 +41,7 @@ public class PutWarmerRequest extends AcknowledgedRequest { private SearchRequest searchRequest; - PutWarmerRequest() { - } /** diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java index 77b2143dc7e..759d88f1dc2 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java @@ -22,15 +22,14 @@ package org.elasticsearch.action.admin.indices.warmer.put; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.internal.InternalIndicesAdminClient; -import org.elasticsearch.common.unit.TimeValue; /** * */ -public class PutWarmerRequestBuilder extends MasterNodeOperationRequestBuilder { +public class PutWarmerRequestBuilder extends AcknowledgedRequestBuilder { public PutWarmerRequestBuilder(IndicesAdminClient indicesClient, String name) { super((InternalIndicesAdminClient) indicesClient, new PutWarmerRequest().name(name)); @@ -64,23 +63,6 @@ public class PutWarmerRequestBuilder extends MasterNodeOperationRequestBuilder

10s. - */ - public PutWarmerRequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return this; - } - @Override protected void doExecute(ActionListener listener) { ((IndicesAdminClient) client).putWarmer(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java index 58dd2ece8ac..114fb1d5a93 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java @@ -20,6 +20,10 @@ package org.elasticsearch.action.admin.indices.warmer.put; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; /** * The response of put warmer operation. @@ -34,4 +38,15 @@ public class PutWarmerResponse extends AcknowledgedResponse { super(acknowledged); } + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + readAcknowledged(in, null); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeAcknowledged(out, null); + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java index 2c61539f18d..02fb99925fd 100644 --- a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java +++ b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java @@ -38,11 +38,13 @@ public abstract class AcknowledgedRequest protected AcknowledgedRequest() { } + /** * Allows to set the timeout * @param timeout timeout as a string (e.g. 1s) * @return the request itself */ + @SuppressWarnings("unchecked") public final T timeout(String timeout) { this.timeout = TimeValue.parseTimeValue(timeout, this.timeout); return (T)this; @@ -53,6 +55,7 @@ public abstract class AcknowledgedRequest * @param timeout timeout as a {@link TimeValue} * @return the request itself */ + @SuppressWarnings("unchecked") public final T timeout(TimeValue timeout) { this.timeout = timeout; return (T) this; @@ -83,5 +86,4 @@ public abstract class AcknowledgedRequest timeout.writeTo(out); } } - } diff --git a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequestBuilder.java b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequestBuilder.java new file mode 100644 index 00000000000..4d5c5afebf9 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequestBuilder.java @@ -0,0 +1,52 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.support.master; + +import org.elasticsearch.client.internal.InternalGenericClient; +import org.elasticsearch.common.unit.TimeValue; + +/** + * Base request builder for master node operations that support acknowledgements + */ +public abstract class AcknowledgedRequestBuilder, Response extends AcknowledgedResponse, RequestBuilder extends AcknowledgedRequestBuilder> + extends MasterNodeOperationRequestBuilder { + + protected AcknowledgedRequestBuilder(InternalGenericClient client, Request request) { + super(client, request); + } + + /** + * Sets the maximum wait for acknowledgement from other nodes + */ + @SuppressWarnings("unchecked") + public RequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return (RequestBuilder)this; + } + + /** + * Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults + * to 10s. + */ + @SuppressWarnings("unchecked") + public RequestBuilder setTimeout(String timeout) { + request.timeout(timeout); + return (RequestBuilder)this; + } +} diff --git a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java index 99b46fbda61..a8067daa99d 100644 --- a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java +++ b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java @@ -18,6 +18,7 @@ package org.elasticsearch.action.support.master; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -48,15 +49,21 @@ public abstract class AcknowledgedResponse extends ActionResponse { return acknowledged; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - acknowledged = in.readBoolean(); + /** + * Reads the timeout value if on or after the specified min version or if the version is null. + */ + protected void readAcknowledged(StreamInput in, Version minVersion) throws IOException { + if (minVersion == null || in.getVersion().onOrAfter(minVersion)) { + acknowledged = in.readBoolean(); + } } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(acknowledged); + /** + * Writes the timeout value if on or after the specified min version or if the version is null. + */ + protected void writeAcknowledged(StreamOutput out, Version minVersion) throws IOException { + if (minVersion == null || out.getVersion().onOrAfter(minVersion)) { + out.writeBoolean(acknowledged); + } } } diff --git a/src/main/java/org/elasticsearch/action/support/master/MasterNodeOperationRequestBuilder.java b/src/main/java/org/elasticsearch/action/support/master/MasterNodeOperationRequestBuilder.java index 7c29bef6ac5..cc92dc11fbd 100644 --- a/src/main/java/org/elasticsearch/action/support/master/MasterNodeOperationRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/support/master/MasterNodeOperationRequestBuilder.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.internal.InternalGenericClient; import org.elasticsearch.common.unit.TimeValue; /** + * Base request builder for master node operations */ public abstract class MasterNodeOperationRequestBuilder, Response extends ActionResponse, RequestBuilder extends MasterNodeOperationRequestBuilder> extends ActionRequestBuilder { diff --git a/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateListener.java b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateListener.java new file mode 100644 index 00000000000..3887eb4df4d --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.ack; + +/** + * Listener used for cluster state updates processing + * Supports acknowledgement logic + */ +public interface ClusterStateUpdateListener { + + /** + * Called when the cluster state update is acknowledged + */ + void onResponse(ClusterStateUpdateResponse response); + + /** + * Called when any error is thrown during the cluster state update processing + */ + void onFailure(Throwable t); +} diff --git a/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java new file mode 100644 index 00000000000..1d3d8128b59 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java @@ -0,0 +1,65 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.ack; + +import org.elasticsearch.common.unit.TimeValue; + +/** + * Base class to be used when needing to update the cluster state + * Contains the basic fields that are always needed + */ +public abstract class ClusterStateUpdateRequest> { + + private TimeValue ackTimeout; + private TimeValue masterNodeTimeout; + + /** + * Returns the maximum time interval to wait for acknowledgements + */ + public TimeValue ackTimeout() { + return ackTimeout; + } + + /** + * Sets the acknowledgement timeout + */ + @SuppressWarnings("unchecked") + public T ackTimeout(TimeValue ackTimeout) { + this.ackTimeout = ackTimeout; + return (T) this; + } + + /** + * Returns the maximum time interval to wait for the request to + * be completed on the master node + */ + public TimeValue masterNodeTimeout() { + return masterNodeTimeout; + } + + /** + * Sets the master node timeout + */ + @SuppressWarnings("unchecked") + public T masterNodeTimeout(TimeValue masterNodeTimeout) { + this.masterNodeTimeout = masterNodeTimeout; + return (T) this; + } +} diff --git a/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateResponse.java b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateResponse.java new file mode 100644 index 00000000000..a2633269a6c --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.ack; + +/** + * Base response returned after a cluster state update + */ +public class ClusterStateUpdateResponse { + + private final boolean acknowledged; + + public ClusterStateUpdateResponse(boolean acknowledged) { + this.acknowledged = acknowledged; + } + + /** + * Whether the cluster state update was acknowledged or not + */ + public boolean isAcknowledged() { + return acknowledged; + } +} diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java index cae74163632..795ba87c786 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java @@ -21,13 +21,18 @@ package org.elasticsearch.cluster.metadata; import com.google.common.collect.Sets; import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.action.admin.indices.settings.UpdateSettingsClusterStateUpdateRequest; import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -41,7 +46,7 @@ import java.util.*; import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; /** - * + * Service responsible for submitting update index settings requests */ public class MetaDataUpdateSettingsService extends AbstractComponent implements ClusterStateListener { @@ -120,9 +125,14 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements Settings settings = ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, fNumberOfReplicas).build(); final List indices = nrReplicasChanged.get(fNumberOfReplicas); - updateSettings(settings, indices.toArray(new String[indices.size()]), TimeValue.timeValueMinutes(10), new Listener() { + UpdateSettingsClusterStateUpdateRequest updateRequest = new UpdateSettingsClusterStateUpdateRequest() + .indices(indices.toArray(new String[indices.size()])).settings(settings) + .ackTimeout(TimeValue.timeValueMillis(0)) //no need to wait for ack here + .masterNodeTimeout(TimeValue.timeValueMinutes(10)); + + updateSettings(updateRequest, new ClusterStateUpdateListener() { @Override - public void onSuccess() { + public void onResponse(ClusterStateUpdateResponse response) { for (String index : indices) { logger.info("[{}] auto expanded replicas to [{}]", index, fNumberOfReplicas); } @@ -139,9 +149,9 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements } } - public void updateSettings(final Settings pSettings, final String[] indices, final TimeValue masterTimeout, final Listener listener) { + public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder(); - for (Map.Entry entry : pSettings.getAsMap().entrySet()) { + for (Map.Entry entry : request.settings().getAsMap().entrySet()) { if (entry.getKey().equals("index")) { continue; } @@ -186,10 +196,31 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements } final Settings openSettings = updatedSettingsBuilder.build(); - clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new AckedClusterStateUpdateTask() { + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new ClusterStateUpdateResponse(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } + @Override public TimeValue timeout() { - return masterTimeout; + return request.masterNodeTimeout(); } @Override @@ -199,7 +230,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements @Override public ClusterState execute(ClusterState currentState) { - String[] actualIndices = currentState.metaData().concreteIndices(indices); + String[] actualIndices = currentState.metaData().concreteIndices(request.indices()); RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable()); MetaData.Builder metaDataBuilder = MetaData.newMetaDataBuilder().metaData(currentState.metaData()); @@ -296,14 +327,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onSuccess(); } }); } - - public static interface Listener { - void onSuccess(); - - void onFailure(Throwable t); - } } diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java index 2325e0d62dc..46a6c8d6f8f 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.rest.action.admin.indices.settings; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse; import org.elasticsearch.client.Client; @@ -28,16 +27,13 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.*; -import org.elasticsearch.rest.action.support.RestXContentBuilder; import java.io.IOException; import java.util.Map; import static org.elasticsearch.client.Requests.updateSettingsRequest; import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; -import static org.elasticsearch.rest.RestStatus.OK; /** * @@ -55,6 +51,7 @@ public class RestUpdateSettingsAction extends BaseRestHandler { public void handleRequest(final RestRequest request, final RestChannel channel) { UpdateSettingsRequest updateSettingsRequest = updateSettingsRequest(Strings.splitStringByCommaToArray(request.param("index"))); updateSettingsRequest.listenerThreaded(false); + updateSettingsRequest.timeout(request.paramAsTime("timeout", updateSettingsRequest.timeout())); updateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", updateSettingsRequest.masterNodeTimeout())); ImmutableSettings.Builder updateSettings = ImmutableSettings.settingsBuilder(); @@ -88,28 +85,6 @@ public class RestUpdateSettingsAction extends BaseRestHandler { } updateSettingsRequest.settings(updateSettings); - client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener() { - @Override - public void onResponse(UpdateSettingsResponse updateSettingsResponse) { - try { - XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); - builder.startObject() - .field("ok", true) - .endObject(); - channel.sendResponse(new XContentRestResponse(request, OK, builder)); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(new XContentThrowableRestResponse(request, e)); - } catch (IOException e1) { - logger.error("Failed to send failure response", e1); - } - } - }); + client.admin().indices().updateSettings(updateSettingsRequest, new AcknowledgedRestResponseActionListener(request, channel, logger)); } } diff --git a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java new file mode 100644 index 00000000000..45166d32b0d --- /dev/null +++ b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java @@ -0,0 +1,105 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.ack; + +import com.google.common.collect.ImmutableList; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse; +import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse; +import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; +import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.warmer.IndexWarmersMetaData; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.junit.Test; + +import java.util.Map; + +import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import static org.elasticsearch.test.AbstractIntegrationTest.Scope.*; +import static org.hamcrest.Matchers.equalTo; + +@ClusterScope(scope = SUITE) +public class AckTests extends AbstractIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + //to test that the acknowledgement mechanism is working we better disable the wait for publish + //otherwise the operation is most likely acknowledged even if it doesn't support ack + return ImmutableSettings.builder().put("discovery.zen.publish_timeout", 0).build(); + } + + @Test + public void testUpdateSettingsAcknowledgement() { + createIndex("test"); + + UpdateSettingsResponse updateSettingsResponse = client().admin().indices().prepareUpdateSettings("test") + .setSettings(ImmutableSettings.builder().put("refresh_interval", 9999)).get(); + assertThat(updateSettingsResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).get(); + String refreshInterval = clusterStateResponse.getState().metaData().index("test").settings().get("index.refresh_interval"); + assertThat(refreshInterval, equalTo("9999")); + } + } + + @Test + public void testPutWarmerAcknowledgement() { + createIndex("test"); + ensureGreen(); + + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") + .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) + .get(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(1)); + Map.Entry> entry = getWarmersResponse.warmers().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo("test")); + assertThat(entry.getValue().size(), equalTo(1)); + assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer")); + } + } + + @Test + public void testDeleteWarmerAcknowledgement() { + createIndex("test"); + ensureGreen(); + + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") + .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) + .get(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + + DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get(); + assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(0)); + } + } +} diff --git a/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java b/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java index af60d76290f..148a48c6e2e 100644 --- a/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java +++ b/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse; import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.index.query.QueryBuilders; @@ -226,43 +225,4 @@ public class SimpleIndicesWarmerTests extends AbstractIntegrationTest { IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats("test").clear().setWarmer(true).execute().actionGet(); return indicesStatsResponse.getIndex("test").getPrimaries().warmer.total(); } - - @Test - public void testPutWarmerAcknowledgement() { - createIndex("test"); - ensureGreen(); - - PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") - .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) - .get(); - assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); - - for (Client client : clients()) { - GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); - assertThat(getWarmersResponse.warmers().size(), equalTo(1)); - Map.Entry> entry = getWarmersResponse.warmers().entrySet().iterator().next(); - assertThat(entry.getKey(), equalTo("test")); - assertThat(entry.getValue().size(), equalTo(1)); - assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer")); - } - } - - @Test - public void testDeleteWarmerAcknowledgement() { - createIndex("test"); - ensureGreen(); - - PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") - .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) - .get(); - assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); - - DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get(); - assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true)); - - for (Client client : clients()) { - GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); - assertThat(getWarmersResponse.warmers().size(), equalTo(0)); - } - } }