Delete warmer api to support acknowledgements

Added support for acknowledgements in delete warmer api using the generic mechanism introduced in#3786

Closes #3833
This commit is contained in:
Luca Cavanna 2013-10-05 00:32:24 +02:00
parent 31142ae471
commit fcf13e0fa7
8 changed files with 204 additions and 6 deletions

View File

@ -19,24 +19,33 @@
package org.elasticsearch.action.admin.indices.warmer.delete; package org.elasticsearch.action.admin.indices.warmer.delete;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/** /**
* A request to delete an index warmer. * A request to delete an index warmer.
*/ */
public class DeleteWarmerRequest extends MasterNodeOperationRequest<DeleteWarmerRequest> { public class DeleteWarmerRequest extends MasterNodeOperationRequest<DeleteWarmerRequest>
implements AcknowledgedRequest<DeleteWarmerRequest> {
private String name; private String name;
private String[] indices = Strings.EMPTY_ARRAY; private String[] indices = Strings.EMPTY_ARRAY;
private TimeValue timeout = timeValueSeconds(10);
DeleteWarmerRequest() { DeleteWarmerRequest() {
} }
@ -87,11 +96,31 @@ public class DeleteWarmerRequest extends MasterNodeOperationRequest<DeleteWarmer
return indices; return indices;
} }
@Override
public DeleteWarmerRequest timeout(String timeout) {
this.timeout = TimeValue.parseTimeValue(timeout, this.timeout);
return this;
}
@Override
public DeleteWarmerRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
@Override
public TimeValue timeout() {
return timeout;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
name = in.readOptionalString(); name = in.readOptionalString();
indices = in.readStringArray(); indices = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_0_90_6)) {
timeout = readTimeValue(in);
}
} }
@Override @Override
@ -99,5 +128,8 @@ public class DeleteWarmerRequest extends MasterNodeOperationRequest<DeleteWarmer
super.writeTo(out); super.writeTo(out);
out.writeOptionalString(name); out.writeOptionalString(name);
out.writeStringArrayNullable(indices); out.writeStringArrayNullable(indices);
if (out.getVersion().onOrAfter(Version.V_0_90_6)) {
timeout.writeTo(out);
}
} }
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalIndicesAdminClient; import org.elasticsearch.client.internal.InternalIndicesAdminClient;
import org.elasticsearch.common.unit.TimeValue;
/** /**
* *
@ -47,6 +48,23 @@ public class DeleteWarmerRequestBuilder extends MasterNodeOperationRequestBuilde
return this; return this;
} }
/**
* Sets the maximum wait for acknowledgement from other nodes
*/
public DeleteWarmerRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}
/**
* Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public DeleteWarmerRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return this;
}
@Override @Override
protected void doExecute(ActionListener<DeleteWarmerResponse> listener) { protected void doExecute(ActionListener<DeleteWarmerResponse> listener) {
((IndicesAdminClient) client).deleteWarmer(request, listener); ((IndicesAdminClient) client).deleteWarmer(request, listener);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.warmer.delete; package org.elasticsearch.action.admin.indices.warmer.delete;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -28,7 +29,7 @@ import java.io.IOException;
/** /**
* A response for a delete warmer. * A response for a delete warmer.
*/ */
public class DeleteWarmerResponse extends ActionResponse { public class DeleteWarmerResponse extends ActionResponse implements AcknowledgedResponse {
private boolean acknowledged; private boolean acknowledged;

View File

@ -23,13 +23,15 @@ import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -88,7 +90,27 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct
@Override @Override
protected void masterOperation(final DeleteWarmerRequest request, final ClusterState state, final ActionListener<DeleteWarmerResponse> listener) throws ElasticSearchException { protected void masterOperation(final DeleteWarmerRequest request, final ClusterState state, final ActionListener<DeleteWarmerResponse> listener) throws ElasticSearchException {
clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() { clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new AckedClusterStateUpdateTask() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new DeleteWarmerResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new DeleteWarmerResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.timeout();
}
@Override @Override
public TimeValue timeout() { public TimeValue timeout() {
@ -161,7 +183,7 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new DeleteWarmerResponse(true));
} }
}); });
} }

View File

@ -52,6 +52,7 @@ public class RestDeleteWarmerAction extends BaseRestHandler {
DeleteWarmerRequest deleteWarmerRequest = new DeleteWarmerRequest(request.param("name")) DeleteWarmerRequest deleteWarmerRequest = new DeleteWarmerRequest(request.param("name"))
.indices(Strings.splitStringByCommaToArray(request.param("index"))); .indices(Strings.splitStringByCommaToArray(request.param("index")));
deleteWarmerRequest.listenerThreaded(false); deleteWarmerRequest.listenerThreaded(false);
deleteWarmerRequest.timeout(request.paramAsTime("timeout", deleteWarmerRequest.timeout()));
deleteWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWarmerRequest.masterNodeTimeout())); deleteWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWarmerRequest.masterNodeTimeout()));
client.admin().indices().deleteWarmer(deleteWarmerRequest, new ActionListener<DeleteWarmerResponse>() { client.admin().indices().deleteWarmer(deleteWarmerRequest, new ActionListener<DeleteWarmerResponse>() {
@Override @Override

View File

@ -0,0 +1,78 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.elasticsearch.action.admin.indices.warmer.delete;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import static org.hamcrest.CoreMatchers.equalTo;
public class DeleteWarmerRequestTests extends ElasticsearchTestCase {
@Test
public void testDeleteWarmerTimeoutBwComp_Pre0906Format() throws Exception {
DeleteWarmerRequest outRequest = new DeleteWarmerRequest("warmer1");
outRequest.timeout(TimeValue.timeValueMillis(1000));
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
out.setVersion(Version.V_0_90_0);
outRequest.writeTo(out);
ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer);
esBuffer.setVersion(Version.V_0_90_0);
DeleteWarmerRequest inRequest = new DeleteWarmerRequest();
inRequest.readFrom(esBuffer);
assertThat(inRequest.name(), equalTo("warmer1"));
//timeout is default as we don't read it from the received buffer
assertThat(inRequest.timeout().millis(), equalTo(new DeleteWarmerRequest().timeout().millis()));
}
@Test
public void testDeleteWarmerTimeoutBwComp_Post0906Format() throws Exception {
DeleteWarmerRequest outRequest = new DeleteWarmerRequest("warmer1");
outRequest.timeout(TimeValue.timeValueMillis(1000));
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
out.setVersion(Version.V_0_90_6);
outRequest.writeTo(out);
ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer);
esBuffer.setVersion(Version.V_0_90_6);
DeleteWarmerRequest inRequest = new DeleteWarmerRequest();
inRequest.readFrom(esBuffer);
assertThat(inRequest.name(), equalTo("warmer1"));
//timeout is default as we don't read it from the received buffer
assertThat(inRequest.timeout().millis(), equalTo(outRequest.timeout().millis()));
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices.warmer; package org.elasticsearch.indices.warmer;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse;
import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
@ -126,7 +127,8 @@ public class LocalGatewayIndicesWarmerTests extends AbstractIntegrationTest {
logger.info("--> delete warmer warmer_1"); logger.info("--> delete warmer warmer_1");
client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("warmer_1").execute().actionGet(); DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("warmer_1").execute().actionGet();
assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true));
logger.info("--> verify warmers (delete) are registered in cluster state"); logger.info("--> verify warmers (delete) are registered in cluster state");
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.warmer;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse;
import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -172,6 +173,30 @@ public class SimpleIndicesWarmerTests extends AbstractIntegrationTest {
} }
} }
@Test
public void deleteIndexWarmerTest() {
createIndex("test");
ensureGreen();
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer")
.setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery()))
.get();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
GetWarmersResponse getWarmersResponse = client().admin().indices().prepareGetWarmers("test").get();
assertThat(getWarmersResponse.warmers().size(), equalTo(1));
Map.Entry<String, ImmutableList<IndexWarmersMetaData.Entry>> entry = getWarmersResponse.warmers().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo("test"));
assertThat(entry.getValue().size(), equalTo(1));
assertThat(entry.getValue().iterator().next().name(), equalTo("custom_warmer"));
DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get();
assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true));
getWarmersResponse = client().admin().indices().prepareGetWarmers("test").get();
assertThat(getWarmersResponse.warmers().size(), equalTo(0));
}
@Test // issue 3246 @Test // issue 3246
public void ensureThatIndexWarmersCanBeChangedOnRuntime() throws Exception { public void ensureThatIndexWarmersCanBeChangedOnRuntime() throws Exception {
client().admin().indices().prepareCreate("test") client().admin().indices().prepareCreate("test")
@ -221,4 +246,23 @@ public class SimpleIndicesWarmerTests extends AbstractIntegrationTest {
assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer")); assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer"));
} }
} }
@Test
public void testDeleteWarmerAcknowledgement() {
createIndex("test");
ensureGreen();
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer")
.setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery()))
.get();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
DeleteWarmerResponse deleteWarmerResponse = client().admin().indices().prepareDeleteWarmer().setIndices("test").setName("custom_warmer").get();
assertThat(deleteWarmerResponse.isAcknowledged(), equalTo(true));
for (Client client : clients()) {
GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get();
assertThat(getWarmersResponse.warmers().size(), equalTo(0));
}
}
} }