diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 702d411d6c6..885345e27a8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -581,7 +581,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private void startScheduledTasksIfNeeded() { if (refreshInterval.millis() > 0) { - refreshScheduledFuture = threadPool.schedule(new EngineRefresher(), refreshInterval, ThreadPool.ExecutionType.THREADED); + refreshScheduledFuture = threadPool.schedule(new EngineRefresher(), refreshInterval, ThreadPool.ExecutionType.DEFAULT); logger.debug("scheduling refresher every {}", refreshInterval); } else { logger.debug("scheduled refresher disabled"); @@ -606,28 +606,39 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private class EngineRefresher implements Runnable { @Override public void run() { - try { - if (engine.refreshNeeded()) { - engine.refresh(new Engine.Refresh(false)); + // we check before if a refresh is needed, if not, we reschedule, otherwise, we fork, refresh, and then reschedule + if (!engine().refreshNeeded()) { + if (state != IndexShardState.CLOSED) { + refreshScheduledFuture = threadPool.schedule(this, refreshInterval, ThreadPool.ExecutionType.DEFAULT); } - } catch (EngineClosedException e) { - // we are being closed, ignore - } catch (RefreshFailedEngineException e) { - if (e.getCause() instanceof InterruptedException) { - // ignore, we are being shutdown - } else if (e.getCause() instanceof ClosedByInterruptException) { - // ignore, we are being shutdown - } else if (e.getCause() instanceof ThreadInterruptedException) { - // ignore, we are being shutdown - } else { - logger.warn("Failed to perform scheduled engine refresh", e); + return; + } + threadPool.cached().execute(new Runnable() { + @Override public void run() { + try { + if (engine.refreshNeeded()) { + engine.refresh(new Engine.Refresh(false)); + } + } catch (EngineClosedException e) { + // we are being closed, ignore + } catch (RefreshFailedEngineException e) { + if (e.getCause() instanceof InterruptedException) { + // ignore, we are being shutdown + } else if (e.getCause() instanceof ClosedByInterruptException) { + // ignore, we are being shutdown + } else if (e.getCause() instanceof ThreadInterruptedException) { + // ignore, we are being shutdown + } else { + logger.warn("Failed to perform scheduled engine refresh", e); + } + } catch (Exception e) { + logger.warn("Failed to perform scheduled engine refresh", e); + } + if (state != IndexShardState.CLOSED) { + refreshScheduledFuture = threadPool.schedule(this, refreshInterval, ThreadPool.ExecutionType.DEFAULT); + } } - } catch (Exception e) { - logger.warn("Failed to perform scheduled engine refresh", e); - } - if (state != IndexShardState.CLOSED) { - refreshScheduledFuture = threadPool.schedule(this, refreshInterval, ThreadPool.ExecutionType.THREADED); - } + }); } }