Added support for acknowledgement in update index settings api

Added support for serialization based on version to AcknowledgedResponse. Useful in api that don't support yet the acknowledged flag in the response.
Moved also ack warmer tests to more specific AckTests class

Close #3983
This commit is contained in:
Luca Cavanna 2013-10-26 01:55:32 +02:00
parent f425d4af2f
commit 5f1ebf20f7
22 changed files with 507 additions and 162 deletions

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -67,10 +69,17 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeOperationA
@Override
protected void masterOperation(final UpdateSettingsRequest request, final ClusterState state, final ActionListener<UpdateSettingsResponse> listener) throws ElasticSearchException {
updateSettingsService.updateSettings(request.settings(), request.indices(), request.masterNodeTimeout(), new MetaDataUpdateSettingsService.Listener() {
UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest()
.indices(request.indices())
.settings(request.settings())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());
updateSettingsService.updateSettings(clusterStateUpdateRequest, new ClusterStateUpdateListener() {
@Override
public void onSuccess() {
listener.onResponse(new UpdateSettingsResponse());
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new UpdateSettingsResponse(response.isAcknowledged()));
}
@Override

View File

@ -0,0 +1,65 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.settings;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.common.settings.Settings;
/**
* Cluster state update request that allows to update settings for some indices
*/
public class UpdateSettingsClusterStateUpdateRequest extends ClusterStateUpdateRequest<UpdateSettingsClusterStateUpdateRequest> {
private Settings settings;
private String[] indices;
public UpdateSettingsClusterStateUpdateRequest() {
}
/**
* Returns the indices that needs to be updated
*/
public String[] indices() {
return indices;
}
/**
* Sets the indices to update
*/
public UpdateSettingsClusterStateUpdateRequest indices(String[] indices) {
this.indices = indices;
return this;
}
/**
* Returns the {@link Settings} to update
*/
public Settings settings() {
return settings;
}
/**
* Sets the {@link Settings} to update
*/
public UpdateSettingsClusterStateUpdateRequest settings(Settings settings) {
this.settings = settings;
return this;
}
}

View File

@ -20,8 +20,9 @@
package org.elasticsearch.action.admin.indices.settings;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -39,9 +40,9 @@ import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFr
import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream;
/**
*
* Request for an update index settings action
*/
public class UpdateSettingsRequest extends MasterNodeOperationRequest<UpdateSettingsRequest> {
public class UpdateSettingsRequest extends AcknowledgedRequest<UpdateSettingsRequest> {
private String[] indices;
@ -51,14 +52,14 @@ public class UpdateSettingsRequest extends MasterNodeOperationRequest<UpdateSett
}
/**
* Constructs a new request to create an index with the specified name and settings.
* Constructs a new request to update settings for one or more indices
*/
public UpdateSettingsRequest(String... indices) {
this.indices = indices;
}
/**
* Constructs a new request to create an index with the specified name and settings.
* Constructs a new request to update settings for one or more indices
*/
public UpdateSettingsRequest(Settings settings, String... indices) {
this.indices = indices;
@ -82,13 +83,16 @@ public class UpdateSettingsRequest extends MasterNodeOperationRequest<UpdateSett
return settings;
}
/**
* Sets the indices to apply to settings update to
*/
public UpdateSettingsRequest indices(String... indices) {
this.indices = indices;
return this;
}
/**
* The settings to created the index with.
* Sets the settings to be updated
*/
public UpdateSettingsRequest settings(Settings settings) {
this.settings = settings;
@ -96,7 +100,7 @@ public class UpdateSettingsRequest extends MasterNodeOperationRequest<UpdateSett
}
/**
* The settings to created the index with.
* Sets the settings to be updated
*/
public UpdateSettingsRequest settings(Settings.Builder settings) {
this.settings = settings.build();
@ -104,7 +108,7 @@ public class UpdateSettingsRequest extends MasterNodeOperationRequest<UpdateSett
}
/**
* The settings to crete the index with (either json/yaml/properties format)
* Sets the settings to be updated (either json/yaml/properties format)
*/
public UpdateSettingsRequest settings(String source) {
this.settings = ImmutableSettings.settingsBuilder().loadFromSource(source).build();
@ -112,8 +116,9 @@ public class UpdateSettingsRequest extends MasterNodeOperationRequest<UpdateSett
}
/**
* The settings to crete the index with (either json/yaml/properties format)
* Sets the settings to be updated (either json/yaml/properties format)
*/
@SuppressWarnings("unchecked")
public UpdateSettingsRequest settings(Map source) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
@ -130,6 +135,7 @@ public class UpdateSettingsRequest extends MasterNodeOperationRequest<UpdateSett
super.readFrom(in);
indices = in.readStringArray();
settings = readSettingsFromStream(in);
readTimeout(in, Version.V_0_90_6);
}
@Override
@ -137,5 +143,6 @@ public class UpdateSettingsRequest extends MasterNodeOperationRequest<UpdateSett
super.writeTo(out);
out.writeStringArrayNullable(indices);
writeSettingsToStream(settings, out);
writeTimeout(out, Version.V_0_90_6);
}
}

View File

@ -20,7 +20,7 @@
package org.elasticsearch.action.admin.indices.settings;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalIndicesAdminClient;
import org.elasticsearch.common.settings.Settings;
@ -28,21 +28,24 @@ import org.elasticsearch.common.settings.Settings;
import java.util.Map;
/**
*
* Builder for an update index settings request
*/
public class UpdateSettingsRequestBuilder extends MasterNodeOperationRequestBuilder<UpdateSettingsRequest, UpdateSettingsResponse, UpdateSettingsRequestBuilder> {
public class UpdateSettingsRequestBuilder extends AcknowledgedRequestBuilder<UpdateSettingsRequest, UpdateSettingsResponse, UpdateSettingsRequestBuilder> {
public UpdateSettingsRequestBuilder(IndicesAdminClient indicesClient, String... indices) {
super((InternalIndicesAdminClient) indicesClient, new UpdateSettingsRequest(indices));
}
/**
* Sets the indices the update settings will execute on
*/
public UpdateSettingsRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}
/**
* The settings update.
* Sets the settings to be updated
*/
public UpdateSettingsRequestBuilder setSettings(Settings settings) {
request.settings(settings);
@ -50,7 +53,7 @@ public class UpdateSettingsRequestBuilder extends MasterNodeOperationRequestBuil
}
/**
* The settings to update.
* Sets the settings to be updated
*/
public UpdateSettingsRequestBuilder setSettings(Settings.Builder settings) {
request.settings(settings);
@ -58,7 +61,7 @@ public class UpdateSettingsRequestBuilder extends MasterNodeOperationRequestBuil
}
/**
* The settings to update (either json/yaml/properties format)
* Sets the settings to be updated (either json/yaml/properties format)
*/
public UpdateSettingsRequestBuilder setSettings(String source) {
request.settings(source);
@ -66,7 +69,7 @@ public class UpdateSettingsRequestBuilder extends MasterNodeOperationRequestBuil
}
/**
* The settings to update (either json/yaml/properties format)
* Sets the settings to be updated (either json/yaml/properties format)
*/
public UpdateSettingsRequestBuilder setSettings(Map<String, Object> source) {
request.settings(source);

View File

@ -19,27 +19,34 @@
package org.elasticsearch.action.admin.indices.settings;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* A response for a update settings action.
* A response for an update index settings action
*/
public class UpdateSettingsResponse extends ActionResponse {
public class UpdateSettingsResponse extends AcknowledgedResponse {
UpdateSettingsResponse() {
}
UpdateSettingsResponse(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in, Version.V_0_90_6);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out, Version.V_0_90_6);
}
}

View File

@ -26,12 +26,9 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/**
* A request to delete an index warmer.
*/
@ -41,8 +38,6 @@ public class DeleteWarmerRequest extends AcknowledgedRequest<DeleteWarmerRequest
private String[] indices = Strings.EMPTY_ARRAY;
private TimeValue timeout = timeValueSeconds(10);
DeleteWarmerRequest() {
}
@ -57,8 +52,7 @@ public class DeleteWarmerRequest extends AcknowledgedRequest<DeleteWarmerRequest
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
return validationException;
return null;
}
/**

View File

@ -20,15 +20,14 @@
package org.elasticsearch.action.admin.indices.warmer.delete;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalIndicesAdminClient;
import org.elasticsearch.common.unit.TimeValue;
/**
*
*/
public class DeleteWarmerRequestBuilder extends MasterNodeOperationRequestBuilder<DeleteWarmerRequest, DeleteWarmerResponse, DeleteWarmerRequestBuilder> {
public class DeleteWarmerRequestBuilder extends AcknowledgedRequestBuilder<DeleteWarmerRequest, DeleteWarmerResponse, DeleteWarmerRequestBuilder> {
public DeleteWarmerRequestBuilder(IndicesAdminClient indicesClient) {
super((InternalIndicesAdminClient) indicesClient, new DeleteWarmerRequest());
@ -48,23 +47,6 @@ public class DeleteWarmerRequestBuilder extends MasterNodeOperationRequestBuilde
return this;
}
/**
* Sets the maximum wait for acknowledgement from other nodes
*/
public DeleteWarmerRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}
/**
* Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public DeleteWarmerRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return this;
}
@Override
protected void doExecute(ActionListener<DeleteWarmerResponse> listener) {
((IndicesAdminClient) client).deleteWarmer(request, listener);

View File

@ -20,6 +20,10 @@
package org.elasticsearch.action.admin.indices.warmer.delete;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* A response for a delete warmer.
@ -33,4 +37,16 @@ public class DeleteWarmerResponse extends AcknowledgedResponse {
DeleteWarmerResponse(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in, null);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out, null);
}
}

View File

@ -41,9 +41,7 @@ public class PutWarmerRequest extends AcknowledgedRequest<PutWarmerRequest> {
private SearchRequest searchRequest;
PutWarmerRequest() {
}
/**

View File

@ -22,15 +22,14 @@ package org.elasticsearch.action.admin.indices.warmer.put;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalIndicesAdminClient;
import org.elasticsearch.common.unit.TimeValue;
/**
*
*/
public class PutWarmerRequestBuilder extends MasterNodeOperationRequestBuilder<PutWarmerRequest, PutWarmerResponse, PutWarmerRequestBuilder> {
public class PutWarmerRequestBuilder extends AcknowledgedRequestBuilder<PutWarmerRequest, PutWarmerResponse, PutWarmerRequestBuilder> {
public PutWarmerRequestBuilder(IndicesAdminClient indicesClient, String name) {
super((InternalIndicesAdminClient) indicesClient, new PutWarmerRequest().name(name));
@ -64,23 +63,6 @@ public class PutWarmerRequestBuilder extends MasterNodeOperationRequestBuilder<P
return this;
}
/**
* Sets the maximum wait for acknowledgement from other nodes
*/
public PutWarmerRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}
/**
* Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public PutWarmerRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return this;
}
@Override
protected void doExecute(ActionListener<PutWarmerResponse> listener) {
((IndicesAdminClient) client).putWarmer(request, listener);

View File

@ -20,6 +20,10 @@
package org.elasticsearch.action.admin.indices.warmer.put;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* The response of put warmer operation.
@ -34,4 +38,15 @@ public class PutWarmerResponse extends AcknowledgedResponse {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in, null);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out, null);
}
}

View File

@ -38,11 +38,13 @@ public abstract class AcknowledgedRequest<T extends MasterNodeOperationRequest>
protected AcknowledgedRequest() {
}
/**
* Allows to set the timeout
* @param timeout timeout as a string (e.g. 1s)
* @return the request itself
*/
@SuppressWarnings("unchecked")
public final T timeout(String timeout) {
this.timeout = TimeValue.parseTimeValue(timeout, this.timeout);
return (T)this;
@ -53,6 +55,7 @@ public abstract class AcknowledgedRequest<T extends MasterNodeOperationRequest>
* @param timeout timeout as a {@link TimeValue}
* @return the request itself
*/
@SuppressWarnings("unchecked")
public final T timeout(TimeValue timeout) {
this.timeout = timeout;
return (T) this;
@ -83,5 +86,4 @@ public abstract class AcknowledgedRequest<T extends MasterNodeOperationRequest>
timeout.writeTo(out);
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.elasticsearch.action.support.master;
import org.elasticsearch.client.internal.InternalGenericClient;
import org.elasticsearch.common.unit.TimeValue;
/**
* Base request builder for master node operations that support acknowledgements
*/
public abstract class AcknowledgedRequestBuilder<Request extends AcknowledgedRequest<Request>, Response extends AcknowledgedResponse, RequestBuilder extends AcknowledgedRequestBuilder<Request, Response, RequestBuilder>>
extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
protected AcknowledgedRequestBuilder(InternalGenericClient client, Request request) {
super(client, request);
}
/**
* Sets the maximum wait for acknowledgement from other nodes
*/
@SuppressWarnings("unchecked")
public RequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return (RequestBuilder)this;
}
/**
* Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
@SuppressWarnings("unchecked")
public RequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return (RequestBuilder)this;
}
}

View File

@ -18,6 +18,7 @@
package org.elasticsearch.action.support.master;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -48,15 +49,21 @@ public abstract class AcknowledgedResponse extends ActionResponse {
return acknowledged;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
/**
* Reads the timeout value if on or after the specified min version or if the version is <code>null</code>.
*/
protected void readAcknowledged(StreamInput in, Version minVersion) throws IOException {
if (minVersion == null || in.getVersion().onOrAfter(minVersion)) {
acknowledged = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
/**
* Writes the timeout value if on or after the specified min version or if the version is <code>null</code>.
*/
protected void writeAcknowledged(StreamOutput out, Version minVersion) throws IOException {
if (minVersion == null || out.getVersion().onOrAfter(minVersion)) {
out.writeBoolean(acknowledged);
}
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.client.internal.InternalGenericClient;
import org.elasticsearch.common.unit.TimeValue;
/**
* Base request builder for master node operations
*/
public abstract class MasterNodeOperationRequestBuilder<Request extends MasterNodeOperationRequest<Request>, Response extends ActionResponse, RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder>>
extends ActionRequestBuilder<Request, Response, RequestBuilder> {

View File

@ -0,0 +1,37 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.ack;
/**
* Listener used for cluster state updates processing
* Supports acknowledgement logic
*/
public interface ClusterStateUpdateListener {
/**
* Called when the cluster state update is acknowledged
*/
void onResponse(ClusterStateUpdateResponse response);
/**
* Called when any error is thrown during the cluster state update processing
*/
void onFailure(Throwable t);
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.ack;
import org.elasticsearch.common.unit.TimeValue;
/**
* Base class to be used when needing to update the cluster state
* Contains the basic fields that are always needed
*/
public abstract class ClusterStateUpdateRequest<T extends ClusterStateUpdateRequest<T>> {
private TimeValue ackTimeout;
private TimeValue masterNodeTimeout;
/**
* Returns the maximum time interval to wait for acknowledgements
*/
public TimeValue ackTimeout() {
return ackTimeout;
}
/**
* Sets the acknowledgement timeout
*/
@SuppressWarnings("unchecked")
public T ackTimeout(TimeValue ackTimeout) {
this.ackTimeout = ackTimeout;
return (T) this;
}
/**
* Returns the maximum time interval to wait for the request to
* be completed on the master node
*/
public TimeValue masterNodeTimeout() {
return masterNodeTimeout;
}
/**
* Sets the master node timeout
*/
@SuppressWarnings("unchecked")
public T masterNodeTimeout(TimeValue masterNodeTimeout) {
this.masterNodeTimeout = masterNodeTimeout;
return (T) this;
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.ack;
/**
* Base response returned after a cluster state update
*/
public class ClusterStateUpdateResponse {
private final boolean acknowledged;
public ClusterStateUpdateResponse(boolean acknowledged) {
this.acknowledged = acknowledged;
}
/**
* Whether the cluster state update was acknowledged or not
*/
public boolean isAcknowledged() {
return acknowledged;
}
}

View File

@ -21,13 +21,18 @@ package org.elasticsearch.cluster.metadata;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsClusterStateUpdateRequest;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -41,7 +46,7 @@ import java.util.*;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
/**
*
* Service responsible for submitting update index settings requests
*/
public class MetaDataUpdateSettingsService extends AbstractComponent implements ClusterStateListener {
@ -120,9 +125,14 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
Settings settings = ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, fNumberOfReplicas).build();
final List<String> indices = nrReplicasChanged.get(fNumberOfReplicas);
updateSettings(settings, indices.toArray(new String[indices.size()]), TimeValue.timeValueMinutes(10), new Listener() {
UpdateSettingsClusterStateUpdateRequest updateRequest = new UpdateSettingsClusterStateUpdateRequest()
.indices(indices.toArray(new String[indices.size()])).settings(settings)
.ackTimeout(TimeValue.timeValueMillis(0)) //no need to wait for ack here
.masterNodeTimeout(TimeValue.timeValueMinutes(10));
updateSettings(updateRequest, new ClusterStateUpdateListener() {
@Override
public void onSuccess() {
public void onResponse(ClusterStateUpdateResponse response) {
for (String index : indices) {
logger.info("[{}] auto expanded replicas to [{}]", index, fNumberOfReplicas);
}
@ -139,9 +149,9 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
}
}
public void updateSettings(final Settings pSettings, final String[] indices, final TimeValue masterTimeout, final Listener listener) {
public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
for (Map.Entry<String, String> entry : pSettings.getAsMap().entrySet()) {
for (Map.Entry<String, String> entry : request.settings().getAsMap().entrySet()) {
if (entry.getKey().equals("index")) {
continue;
}
@ -186,10 +196,31 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
}
final Settings openSettings = updatedSettingsBuilder.build();
clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new AckedClusterStateUpdateTask() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
@Override
public TimeValue timeout() {
return masterTimeout;
return request.masterNodeTimeout();
}
@Override
@ -199,7 +230,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
@Override
public ClusterState execute(ClusterState currentState) {
String[] actualIndices = currentState.metaData().concreteIndices(indices);
String[] actualIndices = currentState.metaData().concreteIndices(request.indices());
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().routingTable(currentState.routingTable());
MetaData.Builder metaDataBuilder = MetaData.newMetaDataBuilder().metaData(currentState.metaData());
@ -296,14 +327,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onSuccess();
}
});
}
public static interface Listener {
void onSuccess();
void onFailure(Throwable t);
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.rest.action.admin.indices.settings;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse;
import org.elasticsearch.client.Client;
@ -28,16 +27,13 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.client.Requests.updateSettingsRequest;
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
import static org.elasticsearch.rest.RestStatus.OK;
/**
*
@ -55,6 +51,7 @@ public class RestUpdateSettingsAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
UpdateSettingsRequest updateSettingsRequest = updateSettingsRequest(Strings.splitStringByCommaToArray(request.param("index")));
updateSettingsRequest.listenerThreaded(false);
updateSettingsRequest.timeout(request.paramAsTime("timeout", updateSettingsRequest.timeout()));
updateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", updateSettingsRequest.masterNodeTimeout()));
ImmutableSettings.Builder updateSettings = ImmutableSettings.settingsBuilder();
@ -88,28 +85,6 @@ public class RestUpdateSettingsAction extends BaseRestHandler {
}
updateSettingsRequest.settings(updateSettings);
client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener<UpdateSettingsResponse>() {
@Override
public void onResponse(UpdateSettingsResponse updateSettingsResponse) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject()
.field("ok", true)
.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
client.admin().indices().updateSettings(updateSettingsRequest, new AcknowledgedRestResponseActionListener<UpdateSettingsResponse>(request, channel, logger));
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.ack;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse;
import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
import org.elasticsearch.test.AbstractIntegrationTest;
import org.junit.Test;
import java.util.Map;
import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import static org.elasticsearch.test.AbstractIntegrationTest.Scope.*;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = SUITE)
public class AckTests extends AbstractIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
//to test that the acknowledgement mechanism is working we better disable the wait for publish
//otherwise the operation is most likely acknowledged even if it doesn't support ack
return ImmutableSettings.builder().put("discovery.zen.publish_timeout", 0).build();
}
@Test
public void testUpdateSettingsAcknowledgement() {
createIndex("test");
UpdateSettingsResponse updateSettingsResponse = client().admin().indices().prepareUpdateSettings("test")
.setSettings(ImmutableSettings.builder().put("refresh_interval", 9999)).get();
assertThat(updateSettingsResponse.isAcknowledged(), equalTo(true));
for (Client client : clients()) {
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).get();
String refreshInterval = clusterStateResponse.getState().metaData().index("test").settings().get("index.refresh_interval");
assertThat(refreshInterval, equalTo("9999"));
}
}
@Test
public void testPutWarmerAcknowledgement() {
createIndex("test");
ensureGreen();
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer")
.setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery()))
.get();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
for (Client client : clients()) {
GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get();
assertThat(getWarmersResponse.warmers().size(), equalTo(1));
Map.Entry<String,ImmutableList<IndexWarmersMetaData.Entry>> entry = getWarmersResponse.warmers().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo("test"));
assertThat(entry.getValue().size(), equalTo(1));
assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer"));
}
}
@Test
public void testDeleteWarmerAcknowledgement() {
createIndex("test");
ensureGreen();
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer")
.setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery()))
.get();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get();
assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true));
for (Client client : clients()) {
GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get();
assertThat(getWarmersResponse.warmers().size(), equalTo(0));
}
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse;
import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.query.QueryBuilders;
@ -226,43 +225,4 @@ public class SimpleIndicesWarmerTests extends AbstractIntegrationTest {
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats("test").clear().setWarmer(true).execute().actionGet();
return indicesStatsResponse.getIndex("test").getPrimaries().warmer.total();
}
@Test
public void testPutWarmerAcknowledgement() {
createIndex("test");
ensureGreen();
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer")
.setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery()))
.get();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
for (Client client : clients()) {
GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get();
assertThat(getWarmersResponse.warmers().size(), equalTo(1));
Map.Entry<String,ImmutableList<IndexWarmersMetaData.Entry>> entry = getWarmersResponse.warmers().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo("test"));
assertThat(entry.getValue().size(), equalTo(1));
assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer"));
}
}
@Test
public void testDeleteWarmerAcknowledgement() {
createIndex("test");
ensureGreen();
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer")
.setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery()))
.get();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get();
assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true));
for (Client client : clients()) {
GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get();
assertThat(getWarmersResponse.warmers().size(), equalTo(0));
}
}
}