From c32b9c3846ea416752b9ca3be4d1585d68d08b33 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 23 Sep 2015 13:56:10 +0200 Subject: [PATCH] Add back presumably redundant shouldFlush() check. The check prevents a race condition since we can't use real locks here. Relates to #13707 --- .../elasticsearch/index/shard/IndexShard.java | 45 +++++++++++-------- .../index/shard/IndexShardTests.java | 1 + 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6b61f6955e8..8e6e143cb81 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1496,26 +1496,35 @@ public class IndexShard extends AbstractIndexShardComponent { */ public boolean maybeFlush() { if (shouldFlush()) { - if (asyncFlushRunning.compareAndSet(false, true)) { - final AbstractRunnable abstractRunnable = new AbstractRunnable() { - @Override - public void onFailure(Throwable t) { - if (state != IndexShardState.CLOSED) { - logger.warn("failed to flush index", t); + if (asyncFlushRunning.compareAndSet(false, true)) { // we can't use a lock here since we "release" in a different thread + if (shouldFlush() == false) { + // we have to check again since otherwise there is a race when a thread passes + // the first shouldFlush() check next to another thread which flushes fast enough + // to finish before the current thread could flip the asyncFlushRunning flag. + // in that situation we have an extra unexpected flush. + asyncFlushRunning.compareAndSet(true, false); + } else { + final AbstractRunnable abstractRunnable = new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + if (state != IndexShardState.CLOSED) { + logger.warn("failed to flush index", t); + } } - } - @Override - protected void doRun() throws Exception { - flush(new FlushRequest()); - } - @Override - public void onAfter() { - asyncFlushRunning.compareAndSet(true, false); - } - }; - threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable); - return true; + @Override + protected void doRun() throws Exception { + flush(new FlushRequest()); + } + + @Override + public void onAfter() { + asyncFlushRunning.compareAndSet(true, false); + } + }; + threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable); + return true; + } } } return false; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b586a18807d..09c6dfaea65 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -761,6 +761,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { for (int i = 0; i < threads.length; i++) { threads[i].join(); } + assertEquals(total + 1, shard.flushStats().getTotal()); } }