Put warmer api to support acknowledgements

Added support for acknowledgements in put warmer api using the generic mechanism introduced in #3786

Closes #3831
This commit is contained in:
Luca Cavanna 2013-10-05 00:02:06 +02:00
parent 55f1eab09a
commit 31142ae471
10 changed files with 270 additions and 10 deletions

View File

@ -19,27 +19,35 @@
package org.elasticsearch.action.admin.indices.warmer.put;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/**
* A request to put a search warmer.
*/
public class PutWarmerRequest extends MasterNodeOperationRequest<PutWarmerRequest> {
public class PutWarmerRequest extends MasterNodeOperationRequest<PutWarmerRequest>
implements AcknowledgedRequest<PutWarmerRequest> {
private String name;
private SearchRequest searchRequest;
private TimeValue timeout = timeValueSeconds(10);
PutWarmerRequest() {
}
@ -86,6 +94,22 @@ public class PutWarmerRequest extends MasterNodeOperationRequest<PutWarmerReques
return this.searchRequest;
}
@Override
public PutWarmerRequest timeout(String timeout) {
return this;
}
@Override
public PutWarmerRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
@Override
public TimeValue timeout() {
return timeout;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = searchRequest.validate();
@ -103,6 +127,9 @@ public class PutWarmerRequest extends MasterNodeOperationRequest<PutWarmerReques
searchRequest = new SearchRequest();
searchRequest.readFrom(in);
}
if (in.getVersion().onOrAfter(Version.V_0_90_6)) {
timeout = readTimeValue(in);
}
}
@Override
@ -115,5 +142,8 @@ public class PutWarmerRequest extends MasterNodeOperationRequest<PutWarmerReques
out.writeBoolean(true);
searchRequest.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_0_90_6)) {
timeout.writeTo(out);
}
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalIndicesAdminClient;
import org.elasticsearch.common.unit.TimeValue;
/**
*
@ -63,6 +64,23 @@ public class PutWarmerRequestBuilder extends MasterNodeOperationRequestBuilder<P
return this;
}
/**
* Sets the maximum wait for acknowledgement from other nodes
*/
public PutWarmerRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}
/**
* Timeout to wait for the operation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public PutWarmerRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return this;
}
@Override
protected void doExecute(ActionListener<PutWarmerResponse> listener) {
((IndicesAdminClient) client).putWarmer(request, listener);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.warmer.put;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -28,7 +29,7 @@ import java.io.IOException;
/**
* The response of put warmer operation.
*/
public class PutWarmerResponse extends ActionResponse {
public class PutWarmerResponse extends ActionResponse implements AcknowledgedResponse {
private boolean acknowledged;

View File

@ -24,13 +24,15 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -96,7 +98,27 @@ public class TransportPutWarmerAction extends TransportMasterNodeOperationAction
return;
}
clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new AckedClusterStateUpdateTask() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new PutWarmerResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new PutWarmerResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.timeout();
}
@Override
public TimeValue timeout() {
@ -161,7 +183,7 @@ public class TransportPutWarmerAction extends TransportMasterNodeOperationAction
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new PutWarmerResponse(true));
}
});
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.elasticsearch.action.support.master;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.unit.TimeValue;
/**
* Interface that allows to mark action requests that support acknowledgements.
* Facilitates consistency across different api.
*/
public interface AcknowledgedRequest<T extends ActionRequest<T>> {
/**
* Allows to set the timeout
* @param timeout timeout as a string (e.g. 1s)
* @return the request itself
*/
T timeout(String timeout);
/**
* Allows to set the timeout
* @param timeout timeout as a {@link TimeValue}
* @return the request itself
*/
T timeout(TimeValue timeout);
/**
* Returns the current timeout
* @return the current timeout as a {@link TimeValue}
*/
TimeValue timeout();
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.elasticsearch.action.support.master;
/**
* Interface that allows to mark action responses that support acknowledgements.
* Facilitates consistency across different api.
*/
public interface AcknowledgedResponse {
/**
* Returns whether the response is acknowledged or not
* @return true if the response is acknowledged, false otherwise
*/
boolean isAcknowledged();
}

View File

@ -55,6 +55,7 @@ public class RestPutWarmerAction extends BaseRestHandler {
.types(Strings.splitStringByCommaToArray(request.param("type")))
.source(request.content(), request.contentUnsafe());
putWarmerRequest.searchRequest(searchRequest);
putWarmerRequest.timeout(request.paramAsTime("timeout", putWarmerRequest.timeout()));
putWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putWarmerRequest.masterNodeTimeout()));
client.admin().indices().putWarmer(putWarmerRequest, new ActionListener<PutWarmerResponse>() {
@Override

View File

@ -0,0 +1,76 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.elasticsearch.action.admin.indices.warmer.put;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import static org.hamcrest.CoreMatchers.equalTo;
public class PutWarmerRequestTests extends ElasticsearchTestCase {
@Test
public void testPutWarmerTimeoutBwComp_Pre0906Format() throws Exception {
PutWarmerRequest outRequest = new PutWarmerRequest("warmer1");
outRequest.timeout(TimeValue.timeValueMillis(1000));
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
out.setVersion(Version.V_0_90_0);
outRequest.writeTo(out);
ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer);
esBuffer.setVersion(Version.V_0_90_0);
PutWarmerRequest inRequest = new PutWarmerRequest();
inRequest.readFrom(esBuffer);
assertThat(inRequest.name(), equalTo("warmer1"));
//timeout is default as we don't read it from the received buffer
assertThat(inRequest.timeout().millis(), equalTo(new PutWarmerRequest().timeout().millis()));
}
@Test
public void testPutWarmerTimeoutBwComp_Post0906Format() throws Exception {
PutWarmerRequest outRequest = new PutWarmerRequest("warmer1");
outRequest.timeout(TimeValue.timeValueMillis(1000));
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
out.setVersion(Version.V_0_90_6);
outRequest.writeTo(out);
ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer);
esBuffer.setVersion(Version.V_0_90_6);
PutWarmerRequest inRequest = new PutWarmerRequest();
inRequest.readFrom(esBuffer);
assertThat(inRequest.name(), equalTo("warmer1"));
//timeout is default as we don't read it from the received buffer
assertThat(inRequest.timeout().millis(), equalTo(outRequest.timeout().millis()));
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices.warmer;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.logging.ESLogger;
@ -58,12 +59,14 @@ public class LocalGatewayIndicesWarmerTests extends AbstractIntegrationTest {
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
client().admin().indices().preparePutWarmer("warmer_1")
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_1")
.setSearchRequest(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value1")))
.execute().actionGet();
client().admin().indices().preparePutWarmer("warmer_2")
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_2")
.setSearchRequest(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "value2")))
.execute().actionGet();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
logger.info("--> put template with warmer");
client().admin().indices().preparePutTemplate("template_1")

View File

@ -19,8 +19,11 @@
package org.elasticsearch.indices.warmer;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.query.QueryBuilders;
@ -30,6 +33,8 @@ import org.elasticsearch.test.AbstractIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -45,12 +50,14 @@ public class SimpleIndicesWarmerTests extends AbstractIntegrationTest {
.execute().actionGet();
ensureGreen();
client().admin().indices().preparePutWarmer("warmer_1")
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_1")
.setSearchRequest(client().prepareSearch("test").setTypes("a1").setQuery(QueryBuilders.termQuery("field", "value1")))
.execute().actionGet();
client().admin().indices().preparePutWarmer("warmer_2")
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
putWarmerResponse = client().admin().indices().preparePutWarmer("warmer_2")
.setSearchRequest(client().prepareSearch("test").setTypes("a2").setQuery(QueryBuilders.termQuery("field", "value2")))
.execute().actionGet();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
client().prepareIndex("test", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
client().prepareIndex("test", "type1", "2").setSource("field", "value2").setRefresh(true).execute().actionGet();
@ -172,9 +179,10 @@ public class SimpleIndicesWarmerTests extends AbstractIntegrationTest {
.execute().actionGet();
ensureGreen();
client().admin().indices().preparePutWarmer("custom_warmer")
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer")
.setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery()))
.execute().actionGet();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
client().prepareIndex("test", "test", "1").setSource("foo", "bar").setRefresh(true).execute().actionGet();
@ -193,4 +201,24 @@ public class SimpleIndicesWarmerTests extends AbstractIntegrationTest {
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats("test").clear().setWarmer(true).execute().actionGet();
return indicesStatsResponse.getIndex("test").getPrimaries().warmer.total();
}
@Test
public void testPutWarmerAcknowledgement() {
createIndex("test");
ensureGreen();
PutWarmerResponse putWarmerResponse = client().admin().indices().preparePutWarmer("custom_warmer")
.setSearchRequest(client().prepareSearch("test").setTypes("test").setQuery(QueryBuilders.matchAllQuery()))
.get();
assertThat(putWarmerResponse.isAcknowledged(), equalTo(true));
for (Client client : clients()) {
GetWarmersResponse getWarmersResponse = client.admin().indices().prepareGetWarmers().setLocal(true).get();
assertThat(getWarmersResponse.warmers().size(), equalTo(1));
Map.Entry<String,ImmutableList<IndexWarmersMetaData.Entry>> entry = getWarmersResponse.warmers().entrySet().iterator().next();
assertThat(entry.getKey(), equalTo("test"));
assertThat(entry.getValue().size(), equalTo(1));
assertThat(entry.getValue().get(0).name(), equalTo("custom_warmer"));
}
}
}