From ce2888266051475bb53a406c658b93299ab3a837 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 26 Sep 2010 09:07:37 +0200 Subject: [PATCH] add refresh option to index/create/delete opereation, REST allows for refresh parameter (defaults to false) --- .../action/delete/DeleteRequest.java | 17 ++++++++++++ .../action/delete/TransportDeleteAction.java | 7 ++++- .../action/index/IndexRequest.java | 18 +++++++++++++ .../action/index/TransportIndexAction.java | 11 ++++++-- .../action/delete/DeleteRequestBuilder.java | 10 +++++++ .../action/index/IndexRequestBuilder.java | 10 +++++++ .../elasticsearch/index/engine/Engine.java | 27 +++++++++++++++++++ .../index/engine/robin/RobinEngine.java | 9 +++++++ .../rest/action/delete/RestDeleteAction.java | 3 ++- .../rest/action/index/RestIndexAction.java | 3 ++- .../document/DocumentActionsTests.java | 2 +- 11 files changed, 111 insertions(+), 6 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index ff61801a0ae..fa390df707e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -47,6 +47,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest { private String type; private String id; + private boolean refresh; /** * Constructs a new delete request against the specified index. The {@link #type(String)} and {@link #id(String)} @@ -154,16 +155,32 @@ public class DeleteRequest extends ShardReplicationOperationRequest { return this; } + /** + * Should a refresh be executed post this index operation causing the operation to + * be searchable. Note, heavy indexing should not set this to true. Defaults + * to false. + */ + public DeleteRequest refresh(boolean refresh) { + this.refresh = refresh; + return this; + } + + public boolean refresh() { + return this.refresh; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); type = in.readUTF(); id = in.readUTF(); + refresh = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeUTF(type); out.writeUTF(id); + out.writeBoolean(refresh); } @Override public String toString() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 8f33104d5f3..e6c67615708 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -33,6 +33,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -102,7 +104,10 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) { DeleteRequest request = shardRequest.request; - indexShard(shardRequest).delete(request.type(), request.id()); + IndexShard indexShard = indexShard(shardRequest); + Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id()); + delete.refresh(request.refresh()); + indexShard.delete(delete); } @Override protected ShardsIterator shards(ClusterState clusterState, DeleteRequest request) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java index ec01a480603..5aab211391d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -113,6 +113,8 @@ public class IndexRequest extends ShardReplicationOperationRequest { private OpType opType = OpType.INDEX; + private boolean refresh = false; + public IndexRequest() { } @@ -390,6 +392,20 @@ public class IndexRequest extends ShardReplicationOperationRequest { return this.opType; } + /** + * Should a refresh be executed post this index operation causing the operation to + * be searchable. Note, heavy indexing should not set this to true. Defaults + * to false. + */ + public IndexRequest refresh(boolean refresh) { + this.refresh = refresh; + return this; + } + + public boolean refresh() { + return this.refresh; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); type = in.readUTF(); @@ -404,6 +420,7 @@ public class IndexRequest extends ShardReplicationOperationRequest { in.readFully(source); opType = OpType.fromId(in.readByte()); + refresh = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { @@ -418,6 +435,7 @@ public class IndexRequest extends ShardReplicationOperationRequest { out.writeVInt(sourceLength); out.writeBytes(source, sourceOffset, sourceLength); out.writeByte(opType.id()); + out.writeBoolean(refresh); } @Override public String toString() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 93921004999..8a925a83012 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -35,9 +35,11 @@ import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.UUID; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -127,12 +129,17 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } @Override protected IndexResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) { + IndexShard indexShard = indexShard(shardRequest); final IndexRequest request = shardRequest.request; ParsedDocument doc; if (request.opType() == IndexRequest.OpType.INDEX) { - doc = indexShard(shardRequest).index(request.type(), request.id(), request.source()); + Engine.Index index = indexShard.prepareIndex(request.type(), request.id(), request.source()); + index.refresh(request.refresh()); + doc = indexShard.index(index); } else { - doc = indexShard(shardRequest).create(request.type(), request.id(), request.source()); + Engine.Create create = indexShard(shardRequest).prepareCreate(request.type(), request.id(), request.source()); + create.refresh(request.refresh()); + doc = indexShard(shardRequest).create(create); } if (doc.mappersAdded()) { updateMappingOnMaster(request); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/delete/DeleteRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/delete/DeleteRequestBuilder.java index 5f688907114..b2ab04a3628 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/delete/DeleteRequestBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/delete/DeleteRequestBuilder.java @@ -63,6 +63,16 @@ public class DeleteRequestBuilder extends BaseRequestBuildertrue. Defaults + * to false. + */ + public DeleteRequestBuilder setRefresh(boolean refresh) { + request.refresh(refresh); + return this; + } + /** * Should the listener be called on a separate thread if needed. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java index c617398db56..b227b97aef5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java @@ -182,6 +182,16 @@ public class IndexRequestBuilder extends BaseRequestBuildertrue. Defaults + * to false. + */ + public IndexRequestBuilder setRefresh(boolean refresh) { + request.refresh(refresh); + return this; + } + /** * Set the replication type for this operation. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java index 28405ba3274..a2a623a0b6b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -265,6 +265,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent { static class Create implements Operation { private final ParsedDocument doc; private final Analyzer analyzer; + private boolean refresh; public Create(ParsedDocument doc, Analyzer analyzer) { this.doc = doc; @@ -298,12 +299,21 @@ public interface Engine extends IndexShardComponent, CloseableComponent { public byte[] source() { return this.doc.source(); } + + public boolean refresh() { + return refresh; + } + + public void refresh(boolean refresh) { + this.refresh = refresh; + } } static class Index implements Operation { private final Term uid; private final ParsedDocument doc; private final Analyzer analyzer; + private boolean refresh; public Index(Term uid, ParsedDocument doc, Analyzer analyzer) { this.uid = uid; @@ -342,10 +352,19 @@ public interface Engine extends IndexShardComponent, CloseableComponent { public byte[] source() { return this.doc.source(); } + + public boolean refresh() { + return refresh; + } + + public void refresh(boolean refresh) { + this.refresh = refresh; + } } static class Delete implements Operation { private final Term uid; + private boolean refresh; public Delete(Term uid) { this.uid = uid; @@ -358,6 +377,14 @@ public interface Engine extends IndexShardComponent, CloseableComponent { public Term uid() { return this.uid; } + + public boolean refresh() { + return refresh; + } + + public void refresh(boolean refresh) { + this.refresh = refresh; + } } static class DeleteByQuery { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index fc11e5565dc..d7b4c0a27c9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -244,6 +244,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, writer.addDocument(create.doc(), create.analyzer()); translog.add(new Translog.Create(create)); dirty = true; + if (create.refresh()) { + refresh(new Refresh(false)); + } } catch (IOException e) { throw new CreateFailedEngineException(shardId, create, e); } finally { @@ -261,6 +264,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, writer.updateDocument(index.uid(), index.doc(), index.analyzer()); translog.add(new Translog.Index(index)); dirty = true; + if (index.refresh()) { + refresh(new Refresh(false)); + } } catch (IOException e) { throw new IndexFailedEngineException(shardId, index, e); } finally { @@ -278,6 +284,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, writer.deleteDocuments(delete.uid()); translog.add(new Translog.Delete(delete)); dirty = true; + if (delete.refresh()) { + refresh(new Refresh(false)); + } } catch (IOException e) { throw new DeleteFailedEngineException(shardId, delete, e); } finally { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java index 4ab028d3103..a52df889ed0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java @@ -36,7 +36,7 @@ import static org.elasticsearch.rest.RestRequest.Method.*; import static org.elasticsearch.rest.RestResponse.Status.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class RestDeleteAction extends BaseRestHandler { @@ -48,6 +48,7 @@ public class RestDeleteAction extends BaseRestHandler { @Override public void handleRequest(final RestRequest request, final RestChannel channel) { DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id")); deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT)); + deleteRequest.refresh(request.paramAsBoolean("refresh", deleteRequest.refresh())); // we just send a response, no need to fork deleteRequest.listenerThreaded(false); // we don't spawn, then fork if local diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index eb898ee8ecf..a22aafbca77 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -36,7 +36,7 @@ import static org.elasticsearch.rest.RestRequest.Method.*; import static org.elasticsearch.rest.RestResponse.Status.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class RestIndexAction extends BaseRestHandler { @@ -60,6 +60,7 @@ public class RestIndexAction extends BaseRestHandler { IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id")); indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe()); indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT)); + indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh())); String sOpType = request.param("op_type"); if (sOpType != null) { if ("index".equals(sOpType)) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java index adbe637fb27..5dfe0e9726b 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java @@ -111,7 +111,7 @@ public class DocumentActionsTests extends AbstractNodesTests { assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); logger.info("Indexing [type1/1]"); - IndexResponse indexResponse = client1.prepareIndex().setIndex("test").setType("type1").setId("1").setSource(source("1", "test")).execute().actionGet(); + IndexResponse indexResponse = client1.prepareIndex().setIndex("test").setType("type1").setId("1").setSource(source("1", "test")).setRefresh(true).execute().actionGet(); assertThat(indexResponse.index(), equalTo(getConcreteIndexName())); assertThat(indexResponse.id(), equalTo("1")); assertThat(indexResponse.type(), equalTo("type1"));