add refresh option to index/create/delete opereation, REST allows for refresh parameter (defaults to false)

This commit is contained in:
kimchy 2010-09-26 09:07:37 +02:00
parent 2288c5d670
commit ce28882660
11 changed files with 111 additions and 6 deletions

View File

@ -47,6 +47,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
private String type;
private String id;
private boolean refresh;
/**
* Constructs a new delete request against the specified index. The {@link #type(String)} and {@link #id(String)}
@ -154,16 +155,32 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
return this;
}
/**
* Should a refresh be executed post this index operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
* to <tt>false</tt>.
*/
public DeleteRequest refresh(boolean refresh) {
this.refresh = refresh;
return this;
}
public boolean refresh() {
return this.refresh;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readUTF();
id = in.readUTF();
refresh = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeUTF(type);
out.writeUTF(id);
out.writeBoolean(refresh);
}
@Override public String toString() {

View File

@ -33,6 +33,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -102,7 +104,10 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
indexShard(shardRequest).delete(request.type(), request.id());
IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id());
delete.refresh(request.refresh());
indexShard.delete(delete);
}
@Override protected ShardsIterator shards(ClusterState clusterState, DeleteRequest request) {

View File

@ -113,6 +113,8 @@ public class IndexRequest extends ShardReplicationOperationRequest {
private OpType opType = OpType.INDEX;
private boolean refresh = false;
public IndexRequest() {
}
@ -390,6 +392,20 @@ public class IndexRequest extends ShardReplicationOperationRequest {
return this.opType;
}
/**
* Should a refresh be executed post this index operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
* to <tt>false</tt>.
*/
public IndexRequest refresh(boolean refresh) {
this.refresh = refresh;
return this;
}
public boolean refresh() {
return this.refresh;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readUTF();
@ -404,6 +420,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
in.readFully(source);
opType = OpType.fromId(in.readByte());
refresh = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -418,6 +435,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
out.writeByte(opType.id());
out.writeBoolean(refresh);
}
@Override public String toString() {

View File

@ -35,9 +35,11 @@ import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.UUID;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -127,12 +129,17 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override protected IndexResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest);
final IndexRequest request = shardRequest.request;
ParsedDocument doc;
if (request.opType() == IndexRequest.OpType.INDEX) {
doc = indexShard(shardRequest).index(request.type(), request.id(), request.source());
Engine.Index index = indexShard.prepareIndex(request.type(), request.id(), request.source());
index.refresh(request.refresh());
doc = indexShard.index(index);
} else {
doc = indexShard(shardRequest).create(request.type(), request.id(), request.source());
Engine.Create create = indexShard(shardRequest).prepareCreate(request.type(), request.id(), request.source());
create.refresh(request.refresh());
doc = indexShard(shardRequest).create(create);
}
if (doc.mappersAdded()) {
updateMappingOnMaster(request);

View File

@ -63,6 +63,16 @@ public class DeleteRequestBuilder extends BaseRequestBuilder<DeleteRequest, Dele
return this;
}
/**
* Should a refresh be executed post this index operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
* to <tt>false</tt>.
*/
public DeleteRequestBuilder setRefresh(boolean refresh) {
request.refresh(refresh);
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/

View File

@ -182,6 +182,16 @@ public class IndexRequestBuilder extends BaseRequestBuilder<IndexRequest, IndexR
return this;
}
/**
* Should a refresh be executed post this index operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
* to <tt>false</tt>.
*/
public IndexRequestBuilder setRefresh(boolean refresh) {
request.refresh(refresh);
return this;
}
/**
* Set the replication type for this operation.
*/

View File

@ -265,6 +265,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
static class Create implements Operation {
private final ParsedDocument doc;
private final Analyzer analyzer;
private boolean refresh;
public Create(ParsedDocument doc, Analyzer analyzer) {
this.doc = doc;
@ -298,12 +299,21 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
public byte[] source() {
return this.doc.source();
}
public boolean refresh() {
return refresh;
}
public void refresh(boolean refresh) {
this.refresh = refresh;
}
}
static class Index implements Operation {
private final Term uid;
private final ParsedDocument doc;
private final Analyzer analyzer;
private boolean refresh;
public Index(Term uid, ParsedDocument doc, Analyzer analyzer) {
this.uid = uid;
@ -342,10 +352,19 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
public byte[] source() {
return this.doc.source();
}
public boolean refresh() {
return refresh;
}
public void refresh(boolean refresh) {
this.refresh = refresh;
}
}
static class Delete implements Operation {
private final Term uid;
private boolean refresh;
public Delete(Term uid) {
this.uid = uid;
@ -358,6 +377,14 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
public Term uid() {
return this.uid;
}
public boolean refresh() {
return refresh;
}
public void refresh(boolean refresh) {
this.refresh = refresh;
}
}
static class DeleteByQuery {

View File

@ -244,6 +244,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
writer.addDocument(create.doc(), create.analyzer());
translog.add(new Translog.Create(create));
dirty = true;
if (create.refresh()) {
refresh(new Refresh(false));
}
} catch (IOException e) {
throw new CreateFailedEngineException(shardId, create, e);
} finally {
@ -261,6 +264,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
writer.updateDocument(index.uid(), index.doc(), index.analyzer());
translog.add(new Translog.Index(index));
dirty = true;
if (index.refresh()) {
refresh(new Refresh(false));
}
} catch (IOException e) {
throw new IndexFailedEngineException(shardId, index, e);
} finally {
@ -278,6 +284,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
writer.deleteDocuments(delete.uid());
translog.add(new Translog.Delete(delete));
dirty = true;
if (delete.refresh()) {
refresh(new Refresh(false));
}
} catch (IOException e) {
throw new DeleteFailedEngineException(shardId, delete, e);
} finally {

View File

@ -36,7 +36,7 @@ import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestResponse.Status.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class RestDeleteAction extends BaseRestHandler {
@ -48,6 +48,7 @@ public class RestDeleteAction extends BaseRestHandler {
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id"));
deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT));
deleteRequest.refresh(request.paramAsBoolean("refresh", deleteRequest.refresh()));
// we just send a response, no need to fork
deleteRequest.listenerThreaded(false);
// we don't spawn, then fork if local

View File

@ -36,7 +36,7 @@ import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestResponse.Status.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class RestIndexAction extends BaseRestHandler {
@ -60,6 +60,7 @@ public class RestIndexAction extends BaseRestHandler {
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));
String sOpType = request.param("op_type");
if (sOpType != null) {
if ("index".equals(sOpType)) {

View File

@ -111,7 +111,7 @@ public class DocumentActionsTests extends AbstractNodesTests {
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
logger.info("Indexing [type1/1]");
IndexResponse indexResponse = client1.prepareIndex().setIndex("test").setType("type1").setId("1").setSource(source("1", "test")).execute().actionGet();
IndexResponse indexResponse = client1.prepareIndex().setIndex("test").setType("type1").setId("1").setSource(source("1", "test")).setRefresh(true).execute().actionGet();
assertThat(indexResponse.index(), equalTo(getConcreteIndexName()));
assertThat(indexResponse.id(), equalTo("1"));
assertThat(indexResponse.type(), equalTo("type1"));