From fcf13e0fa79f5ad06000b89440de1da7160b14f8 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Sat, 5 Oct 2013 00:32:24 +0200 Subject: [PATCH] Delete warmer api to support acknowledgements Added support for acknowledgements in delete warmer api using the generic mechanism introduced in#3786 Closes #3833 --- .../warmer/delete/DeleteWarmerRequest.java | 34 +++++++- .../delete/DeleteWarmerRequestBuilder.java | 18 +++++ .../warmer/delete/DeleteWarmerResponse.java | 3 +- .../delete/TransportDeleteWarmerAction.java | 28 ++++++- .../warmer/delete/RestDeleteWarmerAction.java | 1 + .../delete/DeleteWarmerRequestTests.java | 78 +++++++++++++++++++ .../LocalGatewayIndicesWarmerTests.java | 4 +- .../warmer/SimpleIndicesWarmerTests.java | 44 +++++++++++ 8 files changed, 204 insertions(+), 6 deletions(-) create mode 100644 src/test/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequestTests.java diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java index be04adb88c4..83be17e0404 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequest.java @@ -19,24 +19,33 @@ package org.elasticsearch.action.admin.indices.warmer.delete; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; +import static org.elasticsearch.common.unit.TimeValue.readTimeValue; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; + /** * A request to delete an index warmer. */ -public class DeleteWarmerRequest extends MasterNodeOperationRequest { +public class DeleteWarmerRequest extends MasterNodeOperationRequest + implements AcknowledgedRequest { private String name; private String[] indices = Strings.EMPTY_ARRAY; + private TimeValue timeout = timeValueSeconds(10); + DeleteWarmerRequest() { } @@ -87,11 +96,31 @@ public class DeleteWarmerRequest extends MasterNodeOperationRequest10s. + */ + public DeleteWarmerRequestBuilder setTimeout(String timeout) { + request.timeout(timeout); + return this; + } + @Override protected void doExecute(ActionListener listener) { ((IndicesAdminClient) client).deleteWarmer(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java index e5d834ae06a..0d8b70efb97 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerResponse.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.warmer.delete; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,7 +29,7 @@ import java.io.IOException; /** * A response for a delete warmer. */ -public class DeleteWarmerResponse extends ActionResponse { +public class DeleteWarmerResponse extends ActionResponse implements AcknowledgedResponse { private boolean acknowledged; diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java index d28f98926b6..79648145304 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java @@ -23,13 +23,15 @@ import com.google.common.collect.Lists; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; @@ -88,7 +90,27 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct @Override protected void masterOperation(final DeleteWarmerRequest request, final ClusterState state, final ActionListener listener) throws ElasticSearchException { - clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new AckedClusterStateUpdateTask() { + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new DeleteWarmerResponse(true)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new DeleteWarmerResponse(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.timeout(); + } @Override public TimeValue timeout() { @@ -161,7 +183,7 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new DeleteWarmerResponse(true)); + } }); } diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java index 1388f5dc35e..8682d950cee 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java @@ -52,6 +52,7 @@ public class RestDeleteWarmerAction extends BaseRestHandler { DeleteWarmerRequest deleteWarmerRequest = new DeleteWarmerRequest(request.param("name")) .indices(Strings.splitStringByCommaToArray(request.param("index"))); deleteWarmerRequest.listenerThreaded(false); + deleteWarmerRequest.timeout(request.paramAsTime("timeout", deleteWarmerRequest.timeout())); deleteWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWarmerRequest.masterNodeTimeout())); client.admin().indices().deleteWarmer(deleteWarmerRequest, new ActionListener() { @Override diff --git a/src/test/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequestTests.java b/src/test/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequestTests.java new file mode 100644 index 00000000000..f647cdba491 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/admin/indices/warmer/delete/DeleteWarmerRequestTests.java @@ -0,0 +1,78 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.action.admin.indices.warmer.delete; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class DeleteWarmerRequestTests extends ElasticsearchTestCase { + + @Test + public void testDeleteWarmerTimeoutBwComp_Pre0906Format() throws Exception { + DeleteWarmerRequest outRequest = new DeleteWarmerRequest("warmer1"); + outRequest.timeout(TimeValue.timeValueMillis(1000)); + + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(Version.V_0_90_0); + outRequest.writeTo(out); + + ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer); + esBuffer.setVersion(Version.V_0_90_0); + DeleteWarmerRequest inRequest = new DeleteWarmerRequest(); + inRequest.readFrom(esBuffer); + + assertThat(inRequest.name(), equalTo("warmer1")); + //timeout is default as we don't read it from the received buffer + assertThat(inRequest.timeout().millis(), equalTo(new DeleteWarmerRequest().timeout().millis())); + + } + + @Test + public void testDeleteWarmerTimeoutBwComp_Post0906Format() throws Exception { + DeleteWarmerRequest outRequest = new DeleteWarmerRequest("warmer1"); + outRequest.timeout(TimeValue.timeValueMillis(1000)); + + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(Version.V_0_90_6); + outRequest.writeTo(out); + + ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer); + esBuffer.setVersion(Version.V_0_90_6); + DeleteWarmerRequest inRequest = new DeleteWarmerRequest(); + inRequest.readFrom(esBuffer); + + assertThat(inRequest.name(), equalTo("warmer1")); + //timeout is default as we don't read it from the received buffer + assertThat(inRequest.timeout().millis(), equalTo(outRequest.timeout().millis())); + + } +} diff --git a/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java b/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java index 9039e07cce8..70d4918e1fe 100644 --- a/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java +++ b/src/test/java/org/elasticsearch/indices/warmer/LocalGatewayIndicesWarmerTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.indices.warmer; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse; import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Priority; @@ -126,7 +127,8 @@ public class LocalGatewayIndicesWarmerTests extends AbstractIntegrationTest { logger.info("--> delete warmer warmer_1"); - client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("warmer_1").execute().actionGet(); + DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("warmer_1").execute().actionGet(); + assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true)); logger.info("--> verify warmers (delete) are registered in cluster state"); clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); diff --git a/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java b/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java index 8fc72af3f22..af60d76290f 100644 --- a/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java +++ b/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.indices.warmer; import com.google.common.collect.ImmutableList; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse; import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; import org.elasticsearch.client.Client; @@ -172,6 +173,30 @@ public class SimpleIndicesWarmerTests extends AbstractIntegrationTest { } } + @Test + public void deleteIndexWarmerTest() { + createIndex("test"); + ensureGreen(); + + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") + .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) + .get(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + + GetWarmersResponse getWarmersResponse = client().admin().indices().prepareGetWarmers("test").get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(1)); + Map.Entry> entry = getWarmersResponse.warmers().entrySet().iterator().next(); + assertThat(entry.getKey(), equalTo("test")); + assertThat(entry.getValue().size(), equalTo(1)); + assertThat(entry.getValue().iterator().next().name(), equalTo("custom_warmer")); + + DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get(); + assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true)); + + getWarmersResponse = client().admin().indices().prepareGetWarmers("test").get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(0)); + } + @Test // issue 3246 public void ensureThatIndexWarmersCanBeChangedOnRuntime() throws Exception { client().admin().indices().prepareCreate("test") @@ -221,4 +246,23 @@ public class SimpleIndicesWarmerTests extends AbstractIntegrationTest { assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer")); } } + + @Test + public void testDeleteWarmerAcknowledgement() { + createIndex("test"); + ensureGreen(); + + PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer") + .setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery())) + .get(); + assertThat(putWarmerResponse.isAcknowledged(), equalTo(true)); + + DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get(); + assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get(); + assertThat(getWarmersResponse.warmers().size(), equalTo(0)); + } + } }