From 5f1ebf20f74219d730e3ea55591a29dccbcff598 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Sat, 26 Oct 2013 01:55:32 +0200 Subject: [PATCH] Added support for acknowledgement in update index settings api Added support for serialization based on version to AcknowledgedResponse. Useful in api that don't support yet the acknowledged flag in the response. Moved also ack warmer tests to more specific AckTests class Close #3983 --- .../TransportUpdateSettingsAction.java | 15 ++- ...dateSettingsClusterStateUpdateRequest.java | 65 +++++++++++ .../settings/UpdateSettingsRequest.java | 25 +++-- .../UpdateSettingsRequestBuilder.java | 17 +-- .../settings/UpdateSettingsResponse.java | 13 ++- .../warmer/delete/DeleteWarmerRequest.java | 8 +- .../delete/DeleteWarmerRequestBuilder.java | 22 +--- .../warmer/delete/DeleteWarmerResponse.java | 16 +++ .../indices/warmer/put/PutWarmerRequest.java | 2 - .../warmer/put/PutWarmerRequestBuilder.java | 22 +--- .../indices/warmer/put/PutWarmerResponse.java | 15 +++ .../support/master/AcknowledgedRequest.java | 4 +- .../master/AcknowledgedRequestBuilder.java | 52 +++++++++ .../support/master/AcknowledgedResponse.java | 23 ++-- .../MasterNodeOperationRequestBuilder.java | 1 + .../ack/ClusterStateUpdateListener.java | 37 ++++++ .../ack/ClusterStateUpdateRequest.java | 65 +++++++++++ .../ack/ClusterStateUpdateResponse.java | 39 +++++++ .../MetaDataUpdateSettingsService.java | 54 ++++++--- .../settings/RestUpdateSettingsAction.java | 29 +---- .../elasticsearch/cluster/ack/AckTests.java | 105 ++++++++++++++++++ .../warmer/SimpleIndicesWarmerTests.java | 40 ------- 22 files changed, 507 insertions(+), 162 deletions(-) create mode 100644 src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsClusterStateUpdateRequest.java create mode 100644 src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequestBuilder.java create mode 100644 src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateListener.java create mode 100644 src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java create mode 100644 src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateResponse.java create mode 100644 src/test/java/org/elasticsearch/cluster/ack/AckTests.java diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java index fc2bf19d0f1..3e4aa30baae 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -67,10 +69,17 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeOperationA @Override protected void masterOperation(final UpdateSettingsRequest request, final ClusterState state, final ActionListener listener) throws ElasticSearchException { - updateSettingsService.updateSettings(request.settings(), request.indices(), request.masterNodeTimeout(), new MetaDataUpdateSettingsService.Listener() { + + UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest() + .indices(request.indices()) + .settings(request.settings()) + .ackTimeout(request.timeout()) + .masterNodeTimeout(request.masterNodeTimeout()); + + updateSettingsService.updateSettings(clusterStateUpdateRequest, new ClusterStateUpdateListener() { @Override - public void onSuccess() { - listener.onResponse(new UpdateSettingsResponse()); + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new UpdateSettingsResponse(response.isAcknowledged())); } @Override diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsClusterStateUpdateRequest.java new file mode 100644 index 00000000000..5130fa67271 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsClusterStateUpdateRequest.java @@ -0,0 +1,65 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.settings; + +import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; +import org.elasticsearch.common.settings.Settings; + +/** + * Cluster state update request that allows to update settings for some indices + */ +public class UpdateSettingsClusterStateUpdateRequest extends ClusterStateUpdateRequest { + private Settings settings; + private String[] indices; + + public UpdateSettingsClusterStateUpdateRequest() { + + } + + /** + * Returns the indices that needs to be updated + */ + public String[] indices() { + return indices; + } + + /** + * Sets the indices to update + */ + public UpdateSettingsClusterStateUpdateRequest indices(String[] indices) { + this.indices = indices; + return this; + } + + /** + * Returns the {@link Settings} to update + */ + public Settings settings() { + return settings; + } + + /** + * Sets the {@link Settings} to update + */ + public UpdateSettingsClusterStateUpdateRequest settings(Settings settings) { + this.settings = settings; + return this; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsRequest.java index f2fcec278ed..d32459cfbb1 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsRequest.java @@ -20,8 +20,9 @@ package org.elasticsearch.action.admin.indices.settings; import org.elasticsearch.ElasticSearchGenerationException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.ImmutableSettings; @@ -39,9 +40,9 @@ import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFr import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream; /** - * + * Request for an update index settings action */ -public class UpdateSettingsRequest extends MasterNodeOperationRequest { +public class UpdateSettingsRequest extends AcknowledgedRequest { private String[] indices; @@ -51,14 +52,14 @@ public class UpdateSettingsRequest extends MasterNodeOperationRequest { +public class UpdateSettingsRequestBuilder extends AcknowledgedRequestBuilder { public UpdateSettingsRequestBuilder(IndicesAdminClient indicesClient, String... indices) { super((InternalIndicesAdminClient) indicesClient, new UpdateSettingsRequest(indices)); } + /** + * Sets the indices the update settings will execute on + */ public UpdateSettingsRequestBuilder setIndices(String... indices) { request.indices(indices); return this; } /** - * The settings update. + * Sets the settings to be updated */ public UpdateSettingsRequestBuilder setSettings(Settings settings) { request.settings(settings); @@ -50,7 +53,7 @@ public class UpdateSettingsRequestBuilder extends MasterNodeOperationRequestBuil } /** - * The settings to update. + * Sets the settings to be updated */ public UpdateSettingsRequestBuilder setSettings(Settings.Builder settings) { request.settings(settings); @@ -58,7 +61,7 @@ public class UpdateSettingsRequestBuilder extends MasterNodeOperationRequestBuil } /** - * The settings to update (either json/yaml/properties format) + * Sets the settings to be updated (either json/yaml/properties format) */ public UpdateSettingsRequestBuilder setSettings(String source) { request.settings(source); @@ -66,7 +69,7 @@ public class UpdateSettingsRequestBuilder extends MasterNodeOperationRequestBuil } /** - * The settings to update (either json/yaml/properties format) + * Sets the settings to be updated (either json/yaml/properties format) */ public UpdateSettingsRequestBuilder setSettings(Map source) { request.settings(source); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsResponse.java index fb9397ceb5a..9ce139477e1 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/UpdateSettingsResponse.java @@ -19,27 +19,34 @@ package org.elasticsearch.action.admin.indices.settings; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; /** - * A response for a update settings action. + * A response for an update index settings action */ -public class UpdateSettingsResponse extends ActionResponse { +public class UpdateSettingsResponse extends AcknowledgedResponse { UpdateSettingsResponse() { } + UpdateSettingsResponse(boolean acknowledged) { + super(acknowledged); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + readAcknowledged(in, Version.V_0_90_6); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + writeAcknowledged(out, Version.V_0_90_6); } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java index ea5ecbb7006..35155d92c54 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java @@ -26,12 +26,9 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; - /** * A request to delete an index warmer. */ @@ -41,8 +38,6 @@ public class DeleteWarmerRequest extends AcknowledgedRequest { +public class DeleteWarmerRequestBuilder extends AcknowledgedRequestBuilder { public DeleteWarmerRequestBuilder(IndicesAdminClient indicesClient) { super((InternalIndicesAdminClient) indicesClient, new DeleteWarmerRequest()); @@ -48,23 +47,6 @@ public class DeleteWarmerRequestBuilder extends MasterNodeOperationRequestBuilde return this; } - /** - * Sets the maximum wait for acknowledgement from other nodes - */ - public DeleteWarmerRequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); - return this; - } - - /** - * Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults - * to 10s. - */ - public DeleteWarmerRequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return this; - } - @Override protected void doExecute(ActionListener listener) { ((IndicesAdminClient) client).deleteWarmer(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java index 3a3fee6880d..6b83fed11fd 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java @@ -20,6 +20,10 @@ package org.elasticsearch.action.admin.indices.warmer.delete; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; /** * A response for a delete warmer. @@ -33,4 +37,16 @@ public class DeleteWarmerResponse extends AcknowledgedResponse { DeleteWarmerResponse(boolean acknowledged) { super(acknowledged); } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + readAcknowledged(in, null); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeAcknowledged(out, null); + } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java index 69422e954f3..0b59da78b79 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequest.java @@ -41,9 +41,7 @@ public class PutWarmerRequest extends AcknowledgedRequest { private SearchRequest searchRequest; - PutWarmerRequest() { - } /** diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java index 77b2143dc7e..759d88f1dc2 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerRequestBuilder.java @@ -22,15 +22,14 @@ package org.elasticsearch.action.admin.indices.warmer.put; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.internal.InternalIndicesAdminClient; -import org.elasticsearch.common.unit.TimeValue; /** * */ -public class PutWarmerRequestBuilder extends MasterNodeOperationRequestBuilder { +public class PutWarmerRequestBuilder extends AcknowledgedRequestBuilder { public PutWarmerRequestBuilder(IndicesAdminClient indicesClient, String name) { super((InternalIndicesAdminClient) indicesClient, new PutWarmerRequest().name(name)); @@ -64,23 +63,6 @@ public class PutWarmerRequestBuilder extends MasterNodeOperationRequestBuilder

10s. - */ - public PutWarmerRequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return this; - } - @Override protected void doExecute(ActionListener listener) { ((IndicesAdminClient) client).putWarmer(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java index 58dd2ece8ac..114fb1d5a93 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/PutWarmerResponse.java @@ -20,6 +20,10 @@ package org.elasticsearch.action.admin.indices.warmer.put; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; /** * The response of put warmer operation. @@ -34,4 +38,15 @@ public class PutWarmerResponse extends AcknowledgedResponse { super(acknowledged); } + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + readAcknowledged(in, null); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + writeAcknowledged(out, null); + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java index 2c61539f18d..02fb99925fd 100644 --- a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java +++ b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java @@ -38,11 +38,13 @@ public abstract class AcknowledgedRequest protected AcknowledgedRequest() { } + /** * Allows to set the timeout * @param timeout timeout as a string (e.g. 1s) * @return the request itself */ + @SuppressWarnings("unchecked") public final T timeout(String timeout) { this.timeout = TimeValue.parseTimeValue(timeout, this.timeout); return (T)this; @@ -53,6 +55,7 @@ public abstract class AcknowledgedRequest * @param timeout timeout as a {@link TimeValue} * @return the request itself */ + @SuppressWarnings("unchecked") public final T timeout(TimeValue timeout) { this.timeout = timeout; return (T) this; @@ -83,5 +86,4 @@ public abstract class AcknowledgedRequest timeout.writeTo(out); } } - } diff --git a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequestBuilder.java b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequestBuilder.java new file mode 100644 index 00000000000..4d5c5afebf9 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequestBuilder.java @@ -0,0 +1,52 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.support.master; + +import org.elasticsearch.client.internal.InternalGenericClient; +import org.elasticsearch.common.unit.TimeValue; + +/** + * Base request builder for master node operations that support acknowledgements + */ +public abstract class AcknowledgedRequestBuilder, Response extends AcknowledgedResponse, RequestBuilder extends AcknowledgedRequestBuilder> + extends MasterNodeOperationRequestBuilder { + + protected AcknowledgedRequestBuilder(InternalGenericClient client, Request request) { + super(client, request); + } + + /** + * Sets the maximum wait for acknowledgement from other nodes + */ + @SuppressWarnings("unchecked") + public RequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return (RequestBuilder)this; + } + + /** + * Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults + * to 10s. + */ + @SuppressWarnings("unchecked") + public RequestBuilder setTimeout(String timeout) { + request.timeout(timeout); + return (RequestBuilder)this; + } +} diff --git a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java index 99b46fbda61..a8067daa99d 100644 --- a/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java +++ b/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java @@ -18,6 +18,7 @@ package org.elasticsearch.action.support.master; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -48,15 +49,21 @@ public abstract class AcknowledgedResponse extends ActionResponse { return acknowledged; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - acknowledged = in.readBoolean(); + /** + * Reads the timeout value if on or after the specified min version or if the version is null. + */ + protected void readAcknowledged(StreamInput in, Version minVersion) throws IOException { + if (minVersion == null || in.getVersion().onOrAfter(minVersion)) { + acknowledged = in.readBoolean(); + } } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(acknowledged); + /** + * Writes the timeout value if on or after the specified min version or if the version is null. + */ + protected void writeAcknowledged(StreamOutput out, Version minVersion) throws IOException { + if (minVersion == null || out.getVersion().onOrAfter(minVersion)) { + out.writeBoolean(acknowledged); + } } } diff --git a/src/main/java/org/elasticsearch/action/support/master/MasterNodeOperationRequestBuilder.java b/src/main/java/org/elasticsearch/action/support/master/MasterNodeOperationRequestBuilder.java index 7c29bef6ac5..cc92dc11fbd 100644 --- a/src/main/java/org/elasticsearch/action/support/master/MasterNodeOperationRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/support/master/MasterNodeOperationRequestBuilder.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.internal.InternalGenericClient; import org.elasticsearch.common.unit.TimeValue; /** + * Base request builder for master node operations */ public abstract class MasterNodeOperationRequestBuilder, Response extends ActionResponse, RequestBuilder extends MasterNodeOperationRequestBuilder> extends ActionRequestBuilder { diff --git a/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateListener.java b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateListener.java new file mode 100644 index 00000000000..3887eb4df4d --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.ack; + +/** + * Listener used for cluster state updates processing + * Supports acknowledgement logic + */ +public interface ClusterStateUpdateListener { + + /** + * Called when the cluster state update is acknowledged + */ + void onResponse(ClusterStateUpdateResponse response); + + /** + * Called when any error is thrown during the cluster state update processing + */ + void onFailure(Throwable t); +} diff --git a/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java new file mode 100644 index 00000000000..1d3d8128b59 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java @@ -0,0 +1,65 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.ack; + +import org.elasticsearch.common.unit.TimeValue; + +/** + * Base class to be used when needing to update the cluster state + * Contains the basic fields that are always needed + */ +public abstract class ClusterStateUpdateRequest> { + + private TimeValue ackTimeout; + private TimeValue masterNodeTimeout; + + /** + * Returns the maximum time interval to wait for acknowledgements + */ + public TimeValue ackTimeout() { + return ackTimeout; + } + + /** + * Sets the acknowledgement timeout + */ + @SuppressWarnings("unchecked") + public T ackTimeout(TimeValue ackTimeout) { + this.ackTimeout = ackTimeout; + return (T) this; + } + + /** + * Returns the maximum time interval to wait for the request to + * be completed on the master node + */ + public TimeValue masterNodeTimeout() { + return masterNodeTimeout; + } + + /** + * Sets the master node timeout + */ + @SuppressWarnings("unchecked") + public T masterNodeTimeout(TimeValue masterNodeTimeout) { + this.masterNodeTimeout = masterNodeTimeout; + return (T) this; + } +} diff --git a/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateResponse.java b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateResponse.java new file mode 100644 index 00000000000..a2633269a6c --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.ack; + +/** + * Base response returned after a cluster state update + */ +public class ClusterStateUpdateResponse { + + private final boolean acknowledged; + + public ClusterStateUpdateResponse(boolean acknowledged) { + this.acknowledged = acknowledged; + } + + /** + * Whether the cluster state update was acknowledged or not + */ + public boolean isAcknowledged() { + return acknowledged; + } +} diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java index cae74163632..795ba87c786 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java @@ -21,13 +21,18 @@ package org.elasticsearch.cluster.metadata; import com.google.common.collect.Sets; import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.action.admin.indices.settings.UpdateSettingsClusterStateUpdateRequest; import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.ack.ClusterStateUpdateListener; +import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -41,7 +46,7 @@ import java.util.*; import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; /** - * + * Service responsible for submitting update index settings requests */ public class MetaDataUpdateSettingsService extends AbstractComponent implements ClusterStateListener { @@ -120,9 +125,14 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements Settings settings = ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, fNumberOfReplicas).build(); final List indices = nrReplicasChanged.get(fNumberOfReplicas); - updateSettings(settings, indices.toArray(new String[indices.size()]), TimeValue.timeValueMinutes(10), new Listener() { + UpdateSettingsClusterStateUpdateRequest updateRequest = new UpdateSettingsClusterStateUpdateRequest() + .indices(indices.toArray(new String[indices.size()])).settings(settings) + .ackTimeout(TimeValue.timeValueMillis(0)) //no need to wait for ack here + .masterNodeTimeout(TimeValue.timeValueMinutes(10)); + + updateSettings(updateRequest, new ClusterStateUpdateListener() { @Override - public void onSuccess() { + public void onResponse(ClusterStateUpdateResponse response) { for (String index : indices) { logger.info("[{}] auto expanded replicas to [{}]", index, fNumberOfReplicas); } @@ -139,9 +149,9 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements } } - public void updateSettings(final Settings pSettings, final String[] indices, final TimeValue masterTimeout, final Listener listener) { + public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) { ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder(); - for (Map.Entry entry : pSettings.getAsMap().entrySet()) { + for (Map.Entry entry : request.settings().getAsMap().entrySet()) { if (entry.getKey().equals("index")) { continue; } @@ -186,10 +196,31 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements } final Settings openSettings = updatedSettingsBuilder.build(); - clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new AckedClusterStateUpdateTask() { + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new ClusterStateUpdateResponse(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } + @Override public TimeValue timeout() { - return masterTimeout; + return request.masterNodeTimeout(); } @Override @@ -199,7 +230,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements @Override public ClusterState execute(ClusterState currentState) { - String[] actualIndices = currentState.metaData().concreteIndices(indices); + String[] actualIndices = currentState.metaData().concreteIndices(request.indices()); RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable()); MetaData.Builder metaDataBuilder = MetaData.newMetaDataBuilder().metaData(currentState.metaData()); @@ -296,14 +327,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onSuccess(); } }); } - - public static interface Listener { - void onSuccess(); - - void onFailure(Throwable t); - } } diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java index 2325e0d62dc..46a6c8d6f8f 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.rest.action.admin.indices.settings; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse; import org.elasticsearch.client.Client; @@ -28,16 +27,13 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.*; -import org.elasticsearch.rest.action.support.RestXContentBuilder; import java.io.IOException; import java.util.Map; import static org.elasticsearch.client.Requests.updateSettingsRequest; import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; -import static org.elasticsearch.rest.RestStatus.OK; /** * @@ -55,6 +51,7 @@ public class RestUpdateSettingsAction extends BaseRestHandler { public void handleRequest(final RestRequest request, final RestChannel channel) { UpdateSettingsRequest updateSettingsRequest = updateSettingsRequest(Strings.splitStringByCommaToArray(request.param("index"))); updateSettingsRequest.listenerThreaded(false); + updateSettingsRequest.timeout(request.paramAsTime("timeout", updateSettingsRequest.timeout())); updateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", updateSettingsRequest.masterNodeTimeout())); ImmutableSettings.Builder updateSettings = ImmutableSettings.settingsBuilder(); @@ -88,28 +85,6 @@ public class RestUpdateSettingsAction extends BaseRestHandler { } updateSettingsRequest.settings(updateSettings); - client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener() { - @Override - public void onResponse(UpdateSettingsResponse updateSettingsResponse) { - try { - XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); - builder.startObject() - .field("ok", true) - .endObject(); - channel.sendResponse(new XContentRestResponse(request, OK, builder)); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(new XContentThrowableRestResponse(request, e)); - } catch (IOException e1) { - logger.error("Failed to send failure response", e1); - } - } - }); + client.admin().indices().updateSettings(updateSettingsRequest, new AcknowledgedRestResponseActionListener(request, channel, logger)); } } diff --git a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java new file mode 100644 index 00000000000..45166d32b0d --- /dev/null +++ b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java @@ -0,0 +1,105 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.ack; + +import com.google.common.collect.ImmutableList; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse; +import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse; +import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; +import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.warmer.IndexWarmersMetaData; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.junit.Test; + +import java.util.Map; + +import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import static org.elasticsearch.test.AbstractIntegrationTest.Scope.*; +import static org.hamcrest.Matchers.equalTo; + +@ClusterScope(scope = SUITE) +public class AckTests extends AbstractIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + //to test that the acknowledgement mechanism is working we better disable the wait for publish + //otherwise the operation is most likely acknowledged even if it doesn't support ack + return ImmutableSettings.builder().put("discovery.zen.publish_timeout", 0).build(); + } + + @Test + public void testUpdateSettingsAcknowledgement() { + createIndex("test"); + + UpdateSettingsResponse updateSettingsResponse = client().admin().indices().prepareUpdateSettings("test") + .setSettings(ImmutableSettings.builder().put("refresh_interval", 9999)).get(); + assertThat(updateSettingsResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).get(); + String refreshInterval = clusterStateResponse.getState().metaData().index("test").settings().get("index.refresh_interval"); + assertThat(refreshInterval, equalTo("9999")); + } + } + + @Test + public void testPutWarmerAcknowledgement() { + createIndex("test"); + ensureGreen(); + + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") + .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) + .get(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(1)); + Map.Entry> entry = getWarmersResponse.warmers().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo("test")); + assertThat(entry.getValue().size(), equalTo(1)); + assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer")); + } + } + + @Test + public void testDeleteWarmerAcknowledgement() { + createIndex("test"); + ensureGreen(); + + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") + .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) + .get(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + + DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get(); + assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(0)); + } + } +} diff --git a/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java b/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java index af60d76290f..148a48c6e2e 100644 --- a/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java +++ b/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse; import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.index.query.QueryBuilders; @@ -226,43 +225,4 @@ public class SimpleIndicesWarmerTests extends AbstractIntegrationTest { IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats("test").clear().setWarmer(true).execute().actionGet(); return indicesStatsResponse.getIndex("test").getPrimaries().warmer.total(); } - - @Test - public void testPutWarmerAcknowledgement() { - createIndex("test"); - ensureGreen(); - - PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") - .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) - .get(); - assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); - - for (Client client : clients()) { - GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); - assertThat(getWarmersResponse.warmers().size(), equalTo(1)); - Map.Entry> entry = getWarmersResponse.warmers().entrySet().iterator().next(); - assertThat(entry.getKey(), equalTo("test")); - assertThat(entry.getValue().size(), equalTo(1)); - assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer")); - } - } - - @Test - public void testDeleteWarmerAcknowledgement() { - createIndex("test"); - ensureGreen(); - - PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") - .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) - .get(); - assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); - - DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get(); - assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true)); - - for (Client client : clients()) { - GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); - assertThat(getWarmersResponse.warmers().size(), equalTo(0)); - } - } }