[ENGINE] Remove engine related command classes

Todaqy we pass structs to the engine to call optimize / refresh and flush.
This commit cleans up this logic to reduce complexity in the engine.
This commit is contained in:
Simon Willnauer 2014-12-11 12:38:47 +01:00
parent 80bd69811d
commit ba881a9b58
23 changed files with 167 additions and 301 deletions

View File

@ -45,7 +45,6 @@ public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {
private boolean waitIfOngoing = false;
FlushRequest() {
}
/**

View File

@ -31,46 +31,30 @@ import java.io.IOException;
*
*/
class ShardFlushRequest extends BroadcastShardOperationRequest {
private boolean full;
private boolean force;
private boolean waitIfOngoing;
private FlushRequest request = new FlushRequest();
ShardFlushRequest() {
}
ShardFlushRequest(ShardId shardId, FlushRequest request) {
super(shardId, request);
this.full = request.full();
this.force = request.force();
this.waitIfOngoing = request.waitIfOngoing();
this.request = request;
}
public boolean full() {
return this.full;
}
public boolean force() {
return this.force;
}
public boolean waitIfOngoing() {
return waitIfOngoing;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
full = in.readBoolean();
force = in.readBoolean();
waitIfOngoing = in.readBoolean();
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(full);
out.writeBoolean(force);
out.writeBoolean(waitIfOngoing);
request.writeTo(out);
}
FlushRequest getRequest() {
return request;
}
}

View File

@ -107,8 +107,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
@Override
protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws ElasticsearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
indexShard.flush(new Engine.Flush().waitIfOngoing(request.waitIfOngoing()).
type(request.full() ? Engine.Flush.Type.NEW_WRITER : Engine.Flush.Type.COMMIT_TRANSLOG).force(request.force()));
indexShard.flush(request.getRequest());
return new ShardFlushResponse(request.shardId());
}

View File

@ -166,4 +166,15 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
out.writeBoolean(flush);
out.writeBoolean(upgrade);
}
@Override
public String toString() {
return "OptimizeRequest{" +
"waitForMerge=" + waitForMerge +
", maxNumSegments=" + maxNumSegments +
", onlyExpungeDeletes=" + onlyExpungeDeletes +
", flush=" + flush +
", upgrade=" + upgrade +
'}';
}
}

View File

@ -31,63 +31,31 @@ import java.io.IOException;
/**
*
*/
class ShardOptimizeRequest extends BroadcastShardOperationRequest {
final class ShardOptimizeRequest extends BroadcastShardOperationRequest {
private boolean waitForMerge = OptimizeRequest.Defaults.WAIT_FOR_MERGE;
private int maxNumSegments = OptimizeRequest.Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = OptimizeRequest.Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = OptimizeRequest.Defaults.FLUSH;
private boolean upgrade = OptimizeRequest.Defaults.UPGRADE;
private OptimizeRequest request = new OptimizeRequest();
ShardOptimizeRequest() {
}
ShardOptimizeRequest(ShardId shardId, OptimizeRequest request) {
super(shardId, request);
waitForMerge = request.waitForMerge();
maxNumSegments = request.maxNumSegments();
onlyExpungeDeletes = request.onlyExpungeDeletes();
flush = request.flush();
upgrade = request.upgrade();
}
boolean waitForMerge() {
return waitForMerge;
}
int maxNumSegments() {
return maxNumSegments;
}
public boolean onlyExpungeDeletes() {
return onlyExpungeDeletes;
}
public boolean flush() {
return flush;
}
public boolean upgrade() {
return upgrade;
this.request = request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
waitForMerge = in.readBoolean();
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
upgrade = in.readBoolean();
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(waitForMerge);
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
out.writeBoolean(upgrade);
request.writeTo(out);
}
public OptimizeRequest optimizeRequest() {
return this.request;
}
}

View File

@ -33,7 +33,6 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -108,13 +107,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
@Override
protected ShardOptimizeResponse shardOperation(ShardOptimizeRequest request) throws ElasticsearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
indexShard.optimize(new Engine.Optimize()
.waitForMerge(request.waitForMerge())
.maxNumSegments(request.maxNumSegments())
.onlyExpungeDeletes(request.onlyExpungeDeletes())
.flush(request.flush())
.upgrade(request.upgrade())
);
indexShard.optimize(request.optimizeRequest());
return new ShardOptimizeResponse(request.shardId());
}

View File

@ -108,7 +108,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
@Override
protected ShardRefreshResponse shardOperation(ShardRefreshRequest request) throws ElasticsearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
indexShard.refresh(new Engine.Refresh("api").force(request.force()));
indexShard.refresh("api", request.force());
logger.trace("{} refresh request executed, force: [{}]", indexShard.shardId(), request.force());
return new ShardRefreshResponse(request.shardId());
}

View File

@ -355,7 +355,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh("refresh_flag_bulk").force(false));
indexShard.refresh("refresh_flag_bulk", false);
} catch (Throwable e) {
// ignore
}
@ -618,7 +618,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh("refresh_flag_bulk").force(false));
indexShard.refresh("refresh_flag_bulk", false);
} catch (Throwable e) {
// ignore
}

View File

@ -179,7 +179,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh("refresh_flag_delete").force(false));
indexShard.refresh("refresh_flag_delete", false);
} catch (Exception e) {
// ignore
}
@ -199,7 +199,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh("refresh_flag_delete").force(false));
indexShard.refresh("refresh_flag_delete", false);
} catch (Exception e) {
// ignore
}

View File

@ -91,7 +91,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh("refresh_flag_delete").force(false));
indexShard.refresh("refresh_flag_delete", false);
} catch (Exception e) {
// ignore
}
@ -117,7 +117,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh("refresh_flag_delete").force(false));
indexShard.refresh("refresh_flag_delete", false);
} catch (Exception e) {
// ignore
}

View File

@ -91,7 +91,7 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
IndexShard indexShard = indexService.shardSafe(shardId.id());
if (request.refresh() && !request.realtime()) {
indexShard.refresh(new Engine.Refresh("refresh_flag_get").force(REFRESH_FORCE));
indexShard.refresh("refresh_flag_get", REFRESH_FORCE);
}
GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(),

View File

@ -99,7 +99,7 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA
IndexShard indexShard = indexService.shardSafe(shardId.id());
if (request.refresh() && !request.realtime()) {
indexShard.refresh(new Engine.Refresh("refresh_flag_mget").force(TransportGetAction.REFRESH_FORCE));
indexShard.refresh("refresh_flag_mget", TransportGetAction.REFRESH_FORCE);
}
MultiGetShardResponse response = new MultiGetShardResponse();

View File

@ -207,7 +207,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh("refresh_flag_index").force(false));
indexShard.refresh("refresh_flag_index", false);
} catch (Throwable e) {
// ignore
}
@ -239,7 +239,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh("refresh_flag_index").force(false));
indexShard.refresh("refresh_flag_index", false);
} catch (Exception e) {
// ignore
}

View File

@ -110,26 +110,24 @@ public interface Engine extends CloseableComponent {
*/
boolean refreshNeeded();
/**
* Returns <tt>true</tt> if a possible merge is really needed.
*/
boolean possibleMergeNeeded();
void maybeMerge() throws EngineException;
/**
* Refreshes the engine for new search operations to reflect the latest
* changes. Pass <tt>true</tt> if the refresh operation should include
* all the operations performed up to this call.
*/
void refresh(Refresh refresh) throws EngineException;
void refresh(String source, boolean force) throws EngineException;
/**
* Flushes the state of the engine, clearing memory.
*/
void flush(Flush flush) throws EngineException, FlushNotAllowedEngineException;
void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException;
void optimize(Optimize optimize) throws EngineException;
/**
* Optimizes to 1 segment
*/
void forceMerge(boolean flush, boolean waitForMerge);
void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
/**
* Snapshots the index and returns a handle to it. Will always try and "commit" the
@ -210,96 +208,19 @@ public interface Engine extends CloseableComponent {
}
}
static class Refresh {
private final String source;
private boolean force = false;
public Refresh(String source) {
this.source = source;
}
public static enum FlushType {
/**
* Forces calling refresh, overriding the check that dirty operations even happened. Defaults
* to true (note, still lightweight if no refresh is needed).
* A flush that causes a new writer to be created.
*/
public Refresh force(boolean force) {
this.force = force;
return this;
}
public boolean force() {
return this.force;
}
public String source() {
return this.source;
}
@Override
public String toString() {
return "force[" + force + "], source [" + source + "]";
}
}
static class Flush {
public static enum Type {
/**
* A flush that causes a new writer to be created.
*/
NEW_WRITER,
/**
* A flush that just commits the writer, without cleaning the translog.
*/
COMMIT,
/**
* A flush that does a commit, as well as clears the translog.
*/
COMMIT_TRANSLOG
}
private Type type = Type.COMMIT_TRANSLOG;
private boolean force = false;
NEW_WRITER,
/**
* Should the flush operation wait if there is an ongoing flush operation.
* A flush that just commits the writer, without cleaning the translog.
*/
private boolean waitIfOngoing = false;
public Type type() {
return this.type;
}
COMMIT,
/**
* Should a "full" flush be issued, basically cleaning as much memory as possible.
* A flush that does a commit, as well as clears the translog.
*/
public Flush type(Type type) {
this.type = type;
return this;
}
public boolean force() {
return this.force;
}
public Flush force(boolean force) {
this.force = force;
return this;
}
public boolean waitIfOngoing() {
return this.waitIfOngoing;
}
public Flush waitIfOngoing(boolean waitIfOngoing) {
this.waitIfOngoing = waitIfOngoing;
return this;
}
@Override
public String toString() {
return "type[" + type + "], force[" + force + "]";
}
COMMIT_TRANSLOG
}
static class Optimize {

View File

@ -25,7 +25,6 @@ import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
@ -223,7 +222,7 @@ public class InternalEngine implements Engine {
if (indexingBufferSize == Engine.INACTIVE_SHARD_INDEXING_BUFFER && preValue != Engine.INACTIVE_SHARD_INDEXING_BUFFER) {
logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, indexingBufferSize);
try {
flush(new Flush().type(Flush.Type.COMMIT));
flush(FlushType.COMMIT, false, false);
} catch (EngineClosedException e) {
// ignore
} catch (FlushNotAllowedEngineException e) {
@ -379,7 +378,7 @@ public class InternalEngine implements Engine {
} finally {
if (requiresFlushing) {
flush(new Flush().type(Flush.Type.NEW_WRITER));
flush(FlushType.NEW_WRITER, false, false);
}
}
}
@ -564,7 +563,7 @@ public class InternalEngine implements Engine {
threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
public void run() {
try {
refresh(new Refresh("version_table_full"));
refresh("version_table_full", false);
} catch (EngineClosedException ex) {
// ignore
}
@ -731,7 +730,7 @@ public class InternalEngine implements Engine {
// TODO: This is heavy, since we refresh, but we must do this because we don't know which documents were in fact deleted (i.e., our
// versionMap isn't updated), so we must force a cutover to a new reader to "see" the deletions:
refresh(new Refresh("delete_by_query").force(true));
refresh("delete_by_query", true);
}
@Override
@ -807,8 +806,7 @@ public class InternalEngine implements Engine {
return false;
}
@Override
public boolean possibleMergeNeeded() {
private boolean possibleMergeNeeded() {
IndexWriter writer = this.indexWriter;
if (writer == null) {
return false;
@ -819,7 +817,7 @@ public class InternalEngine implements Engine {
}
@Override
public void refresh(Refresh refresh) throws EngineException {
public void refresh(String source, boolean force) throws EngineException {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
}
@ -830,7 +828,7 @@ public class InternalEngine implements Engine {
// maybeRefresh will only allow one refresh to execute, and the rest will "pass through",
// but, we want to make sure not to loose ant refresh calls, if one is taking time
synchronized (refreshMutex) {
if (refreshNeeded() || refresh.force()) {
if (refreshNeeded() || force) {
// we set dirty to false, even though the refresh hasn't happened yet
// as the refresh only holds for data indexed before it. Any data indexed during
// the refresh will not be part of it and will set the dirty flag back to true
@ -856,23 +854,23 @@ public class InternalEngine implements Engine {
}
@Override
public void flush(Flush flush) throws EngineException {
public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
if (flush.type() == Flush.Type.NEW_WRITER || flush.type() == Flush.Type.COMMIT_TRANSLOG) {
if (type == FlushType.NEW_WRITER || type == FlushType.COMMIT_TRANSLOG) {
// check outside the lock as well so we can check without blocking on the write lock
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "recovery is in progress, flush [" + flush.type() + "] is not allowed");
throw new FlushNotAllowedEngineException(shardId, "recovery is in progress, flush [" + type + "] is not allowed");
}
}
int currentFlushing = flushing.incrementAndGet();
if (currentFlushing > 1 && !flush.waitIfOngoing()) {
if (currentFlushing > 1 && waitIfOngoing == false) {
flushing.decrementAndGet();
throw new FlushNotAllowedEngineException(shardId, "already flushing...");
}
flushLock.lock();
try {
if (flush.type() == Flush.Type.NEW_WRITER) {
if (type == FlushType.NEW_WRITER) {
try (InternalLock _ = writeLock.acquire()) {
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
@ -889,7 +887,7 @@ public class InternalEngine implements Engine {
indexWriter = createWriter();
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
if (flushNeeded || flush.force()) {
if (flushNeeded || force) {
flushNeeded = false;
long translogId = translogIdGenerator.incrementAndGet();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
@ -913,14 +911,14 @@ public class InternalEngine implements Engine {
throw new FlushFailedEngineException(shardId, t);
}
}
} else if (flush.type() == Flush.Type.COMMIT_TRANSLOG) {
} else if (type == FlushType.COMMIT_TRANSLOG) {
try (InternalLock _ = readLock.acquire()) {
final IndexWriter indexWriter = currentIndexWriter();
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
if (flushNeeded || flush.force()) {
if (flushNeeded || force) {
flushNeeded = false;
try {
long translogId = translogIdGenerator.incrementAndGet();
@ -928,7 +926,7 @@ public class InternalEngine implements Engine {
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
// we need to refresh in order to clear older version values
refresh(new Refresh("version_table_flush").force(true));
refresh("version_table_flush", true);
// we need to move transient to current only after we refresh
// so items added to current will still be around for realtime get
// when tans overrides it
@ -951,7 +949,7 @@ public class InternalEngine implements Engine {
pruneDeletedTombstones();
}
} else if (flush.type() == Flush.Type.COMMIT) {
} else if (type == FlushType.COMMIT) {
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
// translog on an index that was opened on a committed point in time that is "in the future"
// of that translog
@ -976,7 +974,7 @@ public class InternalEngine implements Engine {
}
} else {
throw new ElasticsearchIllegalStateException("flush type [" + flush.type() + "] not supported");
throw new ElasticsearchIllegalStateException("flush type [" + type + "] not supported");
}
// reread the last committed segment infos
@ -1042,8 +1040,7 @@ public class InternalEngine implements Engine {
lastDeleteVersionPruneTimeMSec = timeMSec;
}
@Override
public void maybeMerge() throws EngineException {
private void maybeMerge() throws EngineException {
if (!possibleMergeNeeded()) {
return;
}
@ -1066,12 +1063,17 @@ public class InternalEngine implements Engine {
throw new OptimizeFailedEngineException(shardId, e);
}
if (flushAfter) {
flush(new Flush().force(true).waitIfOngoing(true));
flush(FlushType.COMMIT_TRANSLOG, true, true);
}
}
@Override
public void optimize(Optimize optimize) throws EngineException {
public void forceMerge(boolean flush, boolean waitForMerge) {
forceMerge(flush, waitForMerge, 1, false, false);
}
@Override
public void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException {
if (optimizeMutex.compareAndSet(false, true)) {
try (InternalLock _ = readLock.acquire()) {
final IndexWriter writer = currentIndexWriter();
@ -1085,17 +1087,17 @@ public class InternalEngine implements Engine {
*/
MergePolicy mp = writer.getConfig().getMergePolicy();
assert mp instanceof ElasticsearchMergePolicy : "MergePolicy is " + mp.getClass().getName();
if (optimize.upgrade()) {
if (upgrade) {
((ElasticsearchMergePolicy) mp).setUpgradeInProgress(true);
}
if (optimize.onlyExpungeDeletes()) {
if (onlyExpungeDeletes) {
writer.forceMergeDeletes(false);
} else if (optimize.maxNumSegments() <= 0) {
} else if (maxNumSegments <= 0) {
writer.maybeMerge();
possibleMergeNeeded = false;
} else {
writer.forceMerge(optimize.maxNumSegments(), false);
writer.forceMerge(maxNumSegments, false);
}
} catch (Throwable t) {
maybeFailEngine(t, "optimize");
@ -1106,9 +1108,9 @@ public class InternalEngine implements Engine {
}
// wait for the merges outside of the read lock
if (optimize.waitForMerge()) {
waitForMerges(optimize.flush());
} else if (optimize.flush()) {
if (waitForMerge) {
waitForMerges(flush);
} else if (flush) {
// we only need to monitor merges for async calls if we are going to flush
threadPool.executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() {
@Override
@ -1129,7 +1131,7 @@ public class InternalEngine implements Engine {
public SnapshotIndexCommit snapshotIndex() throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true));
flush(FlushType.COMMIT, false, true);
try (InternalLock _ = readLock.acquire()) {
ensureOpen();
return deletionPolicy.snapshot();

View File

@ -284,28 +284,23 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
}
@Override
public boolean possibleMergeNeeded() {
return engineSafe().possibleMergeNeeded();
public void refresh(String source, boolean force) throws EngineException {
engineSafe().refresh(source, force);
}
@Override
public void maybeMerge() throws EngineException {
engineSafe().maybeMerge();
public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException, FlushNotAllowedEngineException {
engineSafe().flush(type, force, waitIfOngoing);
}
@Override
public void refresh(Refresh refresh) throws EngineException {
engineSafe().refresh(refresh);
public void forceMerge(boolean flush, boolean waitForMerge) {
engineSafe().forceMerge(flush, waitForMerge);
}
@Override
public void flush(Flush flush) throws EngineException, FlushNotAllowedEngineException {
engineSafe().flush(flush);
}
@Override
public void optimize(Optimize optimize) throws EngineException {
engineSafe().optimize(optimize);
public void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException {
engineSafe().forceMerge(flush, waitForMerge, maxNumSegments, onlyExpungeDeletes, upgrade);
}
@Override

View File

@ -145,7 +145,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
indexShard.postRecovery("post recovery from gateway");
}
// refresh the shard
indexShard.refresh(new Engine.Refresh("post_gateway").force(true));
indexShard.refresh("post_gateway", true);
recoveryState.getTimer().time(System.currentTimeMillis() - recoveryState.getTimer().startTime());
recoveryState.setStage(RecoveryState.Stage.DONE);

View File

@ -265,7 +265,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
}
private int loadQueries(IndexShard shard) {
shard.refresh(new Engine.Refresh("percolator_load_queries").force(true));
shard.refresh("percolator_load_queries", true);
// Maybe add a mode load? This isn't really a write. We need write b/c state=post_recovery
try (Engine.Searcher searcher = shard.acquireSearcher("percolator_load_queries", IndexShard.Mode.WRITE)) {
Query query = new ConstantScoreQuery(

View File

@ -20,6 +20,8 @@
package org.elasticsearch.index.shard.service;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
@ -155,13 +157,11 @@ public interface IndexShard extends IndexShardComponent {
Engine.GetResult get(Engine.Get get) throws ElasticsearchException;
void refresh(Engine.Refresh refresh) throws ElasticsearchException;
void refresh(String source, boolean force) throws ElasticsearchException;
void flush(Engine.Flush flush) throws ElasticsearchException;
void flush(FlushRequest request) throws ElasticsearchException;
void optimize(Engine.Optimize optimize) throws ElasticsearchException;
SnapshotIndexCommit snapshotIndex() throws EngineException;
void optimize(OptimizeRequest optimize) throws ElasticsearchException;
void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException;
@ -178,8 +178,6 @@ public interface IndexShard extends IndexShardComponent {
void readAllowed() throws IllegalIndexShardStateException;
void readAllowed(Mode mode) throws IllegalIndexShardStateException;
ShardId shardId();
public enum Mode {

View File

@ -31,6 +31,8 @@ import org.apache.lucene.util.ThreadInterruptedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
@ -59,7 +61,6 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.engine.OptimizeFailedEngineException;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
@ -324,7 +325,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
// we want to refresh *before* we move to internal STARTED state
try {
engine.refresh(new Engine.Refresh("cluster_state_started").force(true));
engine.refresh("cluster_state_started", true);
} catch (Throwable t) {
logger.debug("failed to refresh due to move to cluster wide started", t);
}
@ -515,13 +516,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
@Override
public void refresh(Engine.Refresh refresh) throws ElasticsearchException {
public void refresh(String source, boolean force) throws ElasticsearchException {
verifyNotClosed();
if (logger.isTraceEnabled()) {
logger.trace("refresh with {}", refresh);
logger.trace("refresh with soruce: {} force: {}", source, force);
}
long time = System.nanoTime();
engine.refresh(refresh);
engine.refresh(source, force);
refreshMetric.inc(System.nanoTime() - time);
}
@ -641,29 +642,29 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
@Override
public void flush(Engine.Flush flush) throws ElasticsearchException {
public void flush(FlushRequest request) throws ElasticsearchException {
// we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
// we don't gc).
verifyStartedOrRecovering();
if (logger.isTraceEnabled()) {
logger.trace("flush with {}", flush);
logger.trace("flush with {}", request);
}
long time = System.nanoTime();
engine.flush(flush);
engine.flush(request.full() ? Engine.FlushType.NEW_WRITER : Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing());
flushMetric.inc(System.nanoTime() - time);
}
@Override
public void optimize(Engine.Optimize optimize) throws ElasticsearchException {
public void optimize(OptimizeRequest optimize) throws ElasticsearchException {
verifyStarted();
if (logger.isTraceEnabled()) {
logger.trace("optimize with {}", optimize);
}
engine.optimize(optimize);
engine.forceMerge(optimize.flush(), optimize.waitForMerge(), optimize
.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
}
@Override
public SnapshotIndexCommit snapshotIndex() throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
@ -768,11 +769,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchException {
if (withFlush) {
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
}
// clear unreferenced files
translog.clearUnreferenced();
engine.refresh(new Engine.Refresh("recovery_finalization").force(true));
engine.refresh("recovery_finalization", true);
synchronized (mutex) {
changeState(IndexShardState.POST_RECOVERY, "post recovery");
}
@ -970,7 +971,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
public void run() {
try {
if (engine.refreshNeeded()) {
refresh(new Engine.Refresh("scheduled").force(false));
refresh("schedule", false);
}
} catch (EngineClosedException e) {
// we are being closed, ignore

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.translog;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -199,7 +200,7 @@ public class TranslogService extends AbstractIndexShardComponent {
@Override
public void run() {
try {
indexShard.flush(new Engine.Flush());
indexShard.flush(new FlushRequest());
} catch (IllegalIndexShardStateException e) {
// we are being closed, or in created state, ignore
} catch (FlushNotAllowedEngineException e) {

View File

@ -26,7 +26,6 @@ import org.apache.log4j.Logger;
import org.apache.log4j.LogManager;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField;
@ -40,7 +39,6 @@ import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
@ -66,7 +64,6 @@ import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexDynamicSettings;
import org.elasticsearch.index.settings.IndexDynamicSettingsModule;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
@ -256,7 +253,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.refresh(new Engine.Refresh("test").force(false));
engine.refresh("test", false);
segments = engine.segments();
assertThat(segments.size(), equalTo(1));
@ -273,7 +270,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
segments = engine.segments();
assertThat(segments.size(), equalTo(1));
@ -288,7 +285,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3));
engine.refresh(new Engine.Refresh("test").force(false));
engine.refresh("test", false);
segments = engine.segments();
assertThat(segments.size(), equalTo(2));
@ -314,7 +311,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
engine.delete(new Engine.Delete("test", "1", newUid("1")));
engine.refresh(new Engine.Refresh("test").force(false));
engine.refresh("test", false);
segments = engine.segments();
assertThat(segments.size(), equalTo(2));
@ -335,7 +332,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, true).build());
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("4"), doc4));
engine.refresh(new Engine.Refresh("test").force(false));
engine.refresh("test", false);
segments = engine.segments();
assertThat(segments.size(), equalTo(3));
@ -421,18 +418,18 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertThat(engine.segments().size(), equalTo(1));
index = new Engine.Index(null, newUid("2"), doc);
engine.index(index);
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertThat(engine.segments().size(), equalTo(2));
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), nullValue());
}
index = new Engine.Index(null, newUid("3"), doc);
engine.index(index);
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertThat(engine.segments().size(), equalTo(3));
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), nullValue());
@ -440,7 +437,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
waitTillMerge.set(new CountDownLatch(1));
waitForMerge.set(new CountDownLatch(1));
engine.optimize(new Engine.Optimize().maxNumSegments(1).waitForMerge(false));
engine.forceMerge(false, false);
waitTillMerge.get().await();
for (Segment segment : engine.segments()) {
@ -451,10 +448,10 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
index = new Engine.Index(null, newUid("4"), doc);
engine.index(index);
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration();
// now, optimize and wait for merges, see that we have no merge flag
engine.optimize(new Engine.Optimize().flush(true).maxNumSegments(1).waitForMerge(true));
engine.forceMerge(true, true);
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), nullValue());
@ -464,7 +461,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
final boolean flush = randomBoolean();
final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration();
engine.optimize(new Engine.Optimize().flush(flush).maxNumSegments(1).waitForMerge(false));
engine.forceMerge(flush, false);
waitTillMerge.get().await();
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), nullValue());
@ -519,7 +516,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertThat(getResult.exists(), equalTo(false));
getResult.release();
// refresh and it should be there
engine.refresh(new Engine.Refresh("test").force(false));
engine.refresh("test", false);
// now its there...
searchResult = engine.acquireSearcher("test");
@ -555,7 +552,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
getResult.release();
// refresh and it should be updated
engine.refresh(new Engine.Refresh("test").force(false));
engine.refresh("test", false);
searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
@ -579,7 +576,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
getResult.release();
// refresh and it should be deleted
engine.refresh(new Engine.Refresh("test").force(false));
engine.refresh("test", false);
searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
@ -601,7 +598,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
searchResult.close();
// refresh and it should be there
engine.refresh(new Engine.Refresh("test").force(false));
engine.refresh("test", false);
// now its there...
searchResult = engine.acquireSearcher("test");
@ -611,7 +608,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
searchResult.close();
// now flush
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
// and, verify get (in real time)
getResult = engine.get(new Engine.Get(true, newUid("1")));
@ -635,7 +632,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
searchResult.close();
// refresh and it should be updated
engine.refresh(new Engine.Refresh("test").force(false));
engine.refresh("test", false);
searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
@ -663,7 +660,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
searchResult.close();
// refresh and it should be there
engine.refresh(new Engine.Refresh("test").force(false));
engine.refresh("test", false);
// now its there...
searchResult = engine.acquireSearcher("test");
@ -673,7 +670,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
// delete, refresh and do a new search, it should not be there
engine.delete(new Engine.Delete("test", "1", newUid("1")));
engine.refresh(new Engine.Refresh("test").force(false));
engine.refresh("test", false);
Engine.Searcher updateSearchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(updateSearchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
updateSearchResult.close();
@ -688,7 +685,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
public void testFailEngineOnCorruption() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
final boolean failEngine = defaultSettings.getAsBoolean(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, false);
final int failInPhase = randomIntBetween(1, 3);
try {
@ -726,7 +723,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.refresh(new Engine.Refresh("foo"));
engine.refresh("foo", false);
searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 2));
@ -743,13 +740,13 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
public void testSimpleRecover() throws Exception {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.recover(new Engine.RecoveryHandler() {
@Override
public void phase1(SnapshotIndexCommit snapshot) throws EngineException {
try {
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertThat("flush is not allowed in phase 3", false, equalTo(true));
} catch (FlushNotAllowedEngineException e) {
// all is well
@ -760,7 +757,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
public void phase2(Translog.Snapshot snapshot) throws EngineException {
MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(0));
try {
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertThat("flush is not allowed in phase 3", false, equalTo(true));
} catch (FlushNotAllowedEngineException e) {
// all is well
@ -772,7 +769,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(0));
try {
// we can do this here since we are on the same thread
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertThat("flush is not allowed in phase 3", false, equalTo(true));
} catch (FlushNotAllowedEngineException e) {
// all is well
@ -780,7 +777,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
}
});
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.close();
}
@ -788,7 +785,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
public void testRecoverWithOperationsBetweenPhase1AndPhase2() throws Exception {
ParsedDocument doc1 = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc1));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));
@ -811,7 +808,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
}
});
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.close();
}
@ -819,7 +816,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
public void testRecoverWithOperationsBetweenPhase1AndPhase2AndPhase3() throws Exception {
ParsedDocument doc1 = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc1));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));
@ -849,7 +846,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
}
});
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.close();
}
@ -961,7 +958,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
engine.index(index);
assertThat(index.version(), equalTo(2l));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
index = new Engine.Index(null, newUid("1"), doc, 1l, VersionType.INTERNAL, PRIMARY, 0);
try {
@ -992,7 +989,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
engine.index(index);
assertThat(index.version(), equalTo(14l));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
index = new Engine.Index(null, newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0);
try {
@ -1065,7 +1062,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
engine.index(index);
assertThat(index.version(), equalTo(2l));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1l, VersionType.INTERNAL, PRIMARY, 0, false);
try {
@ -1084,14 +1081,14 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
// all is well
}
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
// now actually delete
delete = new Engine.Delete("test", "1", newUid("1"), 2l, VersionType.INTERNAL, PRIMARY, 0, false);
engine.delete(delete);
assertThat(delete.version(), equalTo(3l));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
// now check if we can index to a delete doc with version
index = new Engine.Index(null, newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0);
@ -1134,7 +1131,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
engine.create(create);
assertThat(create.version(), equalTo(1l));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
create = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
try {
@ -1258,7 +1255,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
engine.delete(new Engine.Delete(null, "1", newUid("1")));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
@ -1309,13 +1306,13 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
// First, with DEBUG, which should NOT log IndexWriter output:
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertFalse(mockAppender.sawIndexWriterMessage);
// Again, with TRACE, which should log IndexWriter output:
rootLogger.setLevel(Level.TRACE);
engine.create(new Engine.Create(null, newUid("2"), doc));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertTrue(mockAppender.sawIndexWriterMessage);
} finally {
@ -1344,14 +1341,14 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
// First, with DEBUG, which should NOT log IndexWriter output:
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertFalse(mockAppender.sawIndexWriterMessage);
assertFalse(mockAppender.sawIndexWriterIFDMessage);
// Again, with TRACE, which should only log IndexWriter IFD output:
iwIFDLogger.setLevel(Level.TRACE);
engine.create(new Engine.Create(null, newUid("2"), doc));
engine.flush(new Engine.Flush());
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
assertFalse(mockAppender.sawIndexWriterMessage);
assertTrue(mockAppender.sawIndexWriterIFDMessage);
@ -1400,7 +1397,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
Thread.sleep(1000);
if (randomBoolean()) {
engine.refresh(new Engine.Refresh("test"));
engine.refresh("test", false);
}
// Delete non-existent document

View File

@ -26,6 +26,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.AbstractRandomizedTest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -87,11 +88,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
// When the the internal engine closes we do a rollback, which removes uncommitted segments
// By doing a commit flush we perform a Lucene commit, but don't clear the translog,
// so that even in tests where don't flush we can check the integrity of the Lucene index
indexShard.flush(
new Engine.Flush()
.type(Engine.Flush.Type.COMMIT) // Keep translog for tests that rely on replaying it
.waitIfOngoing(true)
);
((InternalIndexShard)indexShard).engine().flush(Engine.FlushType.COMMIT, false, true); // Keep translog for tests that rely on replaying it
logger.info("flush finished in beforeIndexShardClosed");
}
}