allow to force flush and use it when optimizing

This commit is contained in:
Shay Banon 2011-09-09 15:21:22 +03:00
parent bf70836e92
commit 052f9aac1f
6 changed files with 50 additions and 8 deletions

View File

@ -42,6 +42,8 @@ public class FlushRequest extends BroadcastOperationRequest {
private boolean refresh = false;
private boolean force = false;
private boolean full = false;
FlushRequest() {
@ -88,6 +90,21 @@ public class FlushRequest extends BroadcastOperationRequest {
return this;
}
/**
* Force flushing, even if one is possibly not needed.
*/
public boolean force() {
return force;
}
/**
* Force flushing, even if one is possibly not needed.
*/
public FlushRequest force(boolean force) {
this.force = force;
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/
@ -108,11 +125,13 @@ public class FlushRequest extends BroadcastOperationRequest {
super.writeTo(out);
out.writeBoolean(refresh);
out.writeBoolean(full);
out.writeBoolean(force);
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
refresh = in.readBoolean();
full = in.readBoolean();
force = in.readBoolean();
}
}

View File

@ -31,8 +31,8 @@ import java.io.IOException;
class ShardFlushRequest extends BroadcastShardOperationRequest {
private boolean refresh;
private boolean full;
private boolean force;
ShardFlushRequest() {
}
@ -41,6 +41,7 @@ class ShardFlushRequest extends BroadcastShardOperationRequest {
super(index, shardId);
this.refresh = request.refresh();
this.full = request.full();
this.force = request.force();
}
public boolean refresh() {
@ -51,15 +52,21 @@ class ShardFlushRequest extends BroadcastShardOperationRequest {
return this.full;
}
public boolean force() {
return this.force;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
refresh = in.readBoolean();
full = in.readBoolean();
force = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(refresh);
out.writeBoolean(full);
out.writeBoolean(force);
}
}

View File

@ -111,7 +111,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
@Override protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws ElasticSearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
indexShard.flush(new Engine.Flush().refresh(request.refresh()).full(request.full()));
indexShard.flush(new Engine.Flush().refresh(request.refresh()).full(request.full()).force(request.force()));
return new ShardFlushResponse(request.index(), request.shardId());
}

View File

@ -185,6 +185,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private boolean full = false;
private boolean refresh = false;
private boolean force = false;
/**
* Should a refresh be performed after flushing. Defaults to <tt>false</tt>.
@ -216,8 +217,17 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this;
}
public boolean force() {
return this.force;
}
public Flush force(boolean force) {
this.force = force;
return this;
}
@Override public String toString() {
return "full[" + full + "], refresh[" + refresh + "]";
return "full[" + full + "], refresh[" + refresh + "], force[" + force + "]";
}
}

View File

@ -798,7 +798,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
indexWriter.close(false);
indexWriter = createWriter();
if (flushNeeded) {
if (flushNeeded || flush.force()) {
flushNeeded = false;
long translogId = translogIdGenerator.incrementAndGet();
indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map());
@ -827,7 +827,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
if (flushNeeded) {
if (flushNeeded || flush.force()) {
flushNeeded = false;
try {
long translogId = translogIdGenerator.incrementAndGet();
@ -913,7 +913,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
@Override public void optimize(Optimize optimize) throws EngineException {
if (optimize.flush()) {
flush(new Flush());
flush(new Flush().force(true));
}
if (optimizeMutex.compareAndSet(false, true)) {
rwl.readLock().lock();
@ -950,7 +950,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
indexWriter.waitForMerges();
}
if (optimize.flush()) {
flush(new Flush());
flush(new Flush().force(true));
}
if (optimize.refresh()) {
refresh(new Refresh(false).force(true));

View File

@ -27,7 +27,12 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.XContentRestResponse;
import org.elasticsearch.rest.XContentThrowableRestResponse;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
@ -63,6 +68,7 @@ public class RestFlushAction extends BaseRestHandler {
flushRequest.operationThreading(operationThreading);
flushRequest.refresh(request.paramAsBoolean("refresh", flushRequest.refresh()));
flushRequest.full(request.paramAsBoolean("full", flushRequest.full()));
flushRequest.force(request.paramAsBoolean("force", flushRequest.force()));
client.admin().indices().flush(flushRequest, new ActionListener<FlushResponse>() {
@Override public void onResponse(FlushResponse response) {
try {