diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java
index 4cbd366f86d..4b551f8c1e7 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -69,6 +69,13 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
*/
boolean refreshNeeded();
+ /**
+ * Returns true if a possible merge is really needed.
+ */
+ boolean possibleMergeNeeded();
+
+ void maybeMerge() throws EngineException;
+
/**
* Refreshes the engine for new search operations to reflect the latest
* changes. Pass true if the refresh operation should include
@@ -131,10 +138,21 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final boolean waitForOperations;
+ private boolean force = false;
+
public Refresh(boolean waitForOperations) {
this.waitForOperations = waitForOperations;
}
+ public Refresh force(boolean force) {
+ this.force = force;
+ return this;
+ }
+
+ public boolean force() {
+ return this.force;
+ }
+
public boolean waitForOperations() {
return waitForOperations;
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java
index 19440398eb1..8a0c5342674 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java
@@ -113,6 +113,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
// flag indicating if a dirty operation has occurred since the last refresh
private volatile boolean dirty = false;
+ private volatile boolean possibleMergeNeeded = false;
+
private volatile int disableFlushCounter = 0;
// indexing searcher is initialized
@@ -228,6 +230,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
innerCreate(create, writer);
dirty = true;
+ possibleMergeNeeded = true;
if (create.refresh()) {
refresh(new Refresh(false));
}
@@ -318,6 +321,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
innerIndex(index, writer);
dirty = true;
+ possibleMergeNeeded = true;
if (index.refresh()) {
refresh(new Refresh(false));
}
@@ -402,6 +406,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
innerDelete(delete, writer);
dirty = true;
+ possibleMergeNeeded = true;
if (delete.refresh()) {
refresh(new Refresh(false));
}
@@ -485,6 +490,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
writer.deleteDocuments(delete.query());
translog.add(new Translog.DeleteByQuery(delete));
dirty = true;
+ possibleMergeNeeded = true;
} catch (IOException e) {
throw new DeleteByQueryFailedEngineException(shardId, delete, e);
} finally {
@@ -521,6 +527,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
return dirty;
}
+ @Override public boolean possibleMergeNeeded() {
+ return this.possibleMergeNeeded;
+ }
+
@Override public void refresh(Refresh refresh) throws EngineException {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
@@ -535,7 +545,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
throw new EngineClosedException(shardId);
}
try {
- if (dirty) {
+ if (dirty || refresh.force()) {
// we eagerly set dirty to false so we won't miss refresh requests
dirty = false;
AcquirableResource current = nrtResource;
@@ -586,6 +596,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
if (disableFlushCounter > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
+ if (indexingSearcher.get() != null) {
+ indexingSearcher.get().release();
+ indexingSearcher.set(null);
+ }
if (flush.full()) {
// disable refreshing, not dirty
dirty = false;
@@ -613,11 +627,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
versionMap.clear();
dirty = true; // force a refresh
// we need to do a refresh here so we sync versioning support
- refresh(new Refresh(true));
- if (indexingSearcher.get() != null) {
- indexingSearcher.get().release();
- indexingSearcher.set(null);
- }
+ refresh(new Refresh(true).force(true));
} finally {
rwl.writeLock().unlock();
flushing.set(false);
@@ -628,6 +638,30 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
// }
}
+ @Override public void maybeMerge() throws EngineException {
+ if (!possibleMergeNeeded) {
+ return;
+ }
+ possibleMergeNeeded = false;
+ rwl.readLock().lock();
+ try {
+ if (indexWriter == null) {
+ throw new EngineClosedException(shardId);
+ }
+ if (indexWriter.getMergePolicy() instanceof EnableMergePolicy) {
+ ((EnableMergePolicy) indexWriter.getMergePolicy()).enableMerge();
+ }
+ indexWriter.maybeMerge();
+ } catch (Exception e) {
+ throw new OptimizeFailedEngineException(shardId, e);
+ } finally {
+ rwl.readLock().unlock();
+ if (indexWriter != null && indexWriter.getMergePolicy() instanceof EnableMergePolicy) {
+ ((EnableMergePolicy) indexWriter.getMergePolicy()).disableMerge();
+ }
+ }
+ }
+
@Override public void optimize(Optimize optimize) throws EngineException {
if (optimizeMutex.compareAndSet(false, true)) {
rwl.readLock().lock();
@@ -642,6 +676,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
indexWriter.expungeDeletes(false);
} else if (optimize.maxNumSegments() <= 0) {
indexWriter.maybeMerge();
+ possibleMergeNeeded = false;
} else {
indexWriter.optimize(optimize.maxNumSegments(), false);
}
@@ -659,14 +694,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
if (optimize.waitForMerge()) {
indexWriter.waitForMerges();
}
- // once we did the optimization, we are "dirty" since we removed deletes potentially which
- // affects TermEnum
- dirty = true;
if (optimize.flush()) {
flush(new Flush());
}
if (optimize.refresh()) {
- refresh(new Refresh(false));
+ refresh(new Refresh(false).force(true));
}
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java
index 7aeddcb538e..6f0d7398cb7 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java
@@ -562,7 +562,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
// so, make sure we periodically call it, this need to be a small enough value so mergine will actually
// happen and reduce the number of segments
if (mergeInterval.millis() > 0) {
- mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.MERGE, new EngineMerger());
+ mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger());
logger.debug("scheduling optimizer / merger every {}", mergeInterval);
} else {
logger.debug("scheduled optimizer / merger disabled");
@@ -616,27 +616,36 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private class EngineMerger implements Runnable {
@Override public void run() {
- try {
- // -1 means maybe merge
- engine.optimize(new Engine.Optimize().maxNumSegments(-1).waitForMerge(false).flush(false).refresh(false));
- } catch (EngineClosedException e) {
- // we are being closed, ignore
- } catch (OptimizeFailedEngineException e) {
- if (e.getCause() instanceof InterruptedException) {
- // ignore, we are being shutdown
- } else if (e.getCause() instanceof ClosedByInterruptException) {
- // ignore, we are being shutdown
- } else if (e.getCause() instanceof ThreadInterruptedException) {
- // ignore, we are being shutdown
- } else {
- logger.warn("Failed to perform scheduled engine optimize/merge", e);
+ if (!engine().possibleMergeNeeded()) {
+ if (state != IndexShardState.CLOSED) {
+ mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this);
}
- } catch (Exception e) {
- logger.warn("Failed to perform scheduled engine optimize/merge", e);
- }
- if (state != IndexShardState.CLOSED) {
- mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.MERGE, this);
+ return;
}
+ threadPool.executor(ThreadPool.Names.MERGE).execute(new Runnable() {
+ @Override public void run() {
+ try {
+ engine.maybeMerge();
+ } catch (EngineClosedException e) {
+ // we are being closed, ignore
+ } catch (OptimizeFailedEngineException e) {
+ if (e.getCause() instanceof InterruptedException) {
+ // ignore, we are being shutdown
+ } else if (e.getCause() instanceof ClosedByInterruptException) {
+ // ignore, we are being shutdown
+ } else if (e.getCause() instanceof ThreadInterruptedException) {
+ // ignore, we are being shutdown
+ } else {
+ logger.warn("Failed to perform scheduled engine optimize/merge", e);
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to perform scheduled engine optimize/merge", e);
+ }
+ if (state != IndexShardState.CLOSED) {
+ mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, EngineMerger.this);
+ }
+ }
+ });
}
}