From e1d56ea467af777626959e3e24b4976d4c16213c Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 26 Jan 2011 17:16:32 +0200 Subject: [PATCH] call maybeMerge outside of writeLock to reduce work done under writeLock --- .../index/engine/robin/RobinEngine.java | 25 ++++++++++++++++--- .../deps/lucene/SimpleLuceneTests.java | 19 ++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index ac3d683821b..2907fc151fa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -107,6 +107,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, private volatile int disableFlushCounter = 0; + private final AtomicBoolean flushing = new AtomicBoolean(); + private final ConcurrentMap versionMap; private final Object[] dirtyLocks; @@ -621,6 +623,20 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, if (disableFlushCounter > 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); } + // don't allow for concurrent flush operations... + if (!flushing.compareAndSet(false, true)) { + throw new FlushNotAllowedEngineException(shardId, "Already flushing..."); + } + + // call maybeMerge outside of the write lock since it gets called anyhow within commit/refresh + // and we want not to suffer this cost within the write lock + // We can't do prepareCommit here, since we rely on the the segment version for the translog version + try { + indexWriter.maybeMerge(); + } catch (Exception e) { + flushing.set(false); + throw new FlushFailedEngineException(shardId, e); + } rwl.writeLock().lock(); try { if (indexWriter == null) { @@ -658,13 +674,16 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, } versionMap.clear(); dirty = true; // force a refresh + // we need to do a refresh here so we sync versioning support refresh(new Refresh(true)); } finally { rwl.writeLock().unlock(); + flushing.set(false); } - if (flush.refresh()) { - refresh(new Refresh(false)); - } + // we flush anyhow before... +// if (flush.refresh()) { +// refresh(new Refresh(false)); +// } } @Override public void optimize(Optimize optimize) throws EngineException { diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/deps/lucene/SimpleLuceneTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/deps/lucene/SimpleLuceneTests.java index d53ce54f63d..d8c6980939a 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/deps/lucene/SimpleLuceneTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/deps/lucene/SimpleLuceneTests.java @@ -44,6 +44,25 @@ import static org.hamcrest.Matchers.*; */ public class SimpleLuceneTests { + @Test public void testAddDocAfterPrepareCommit() throws Exception { + Directory dir = new RAMDirectory(); + IndexWriter indexWriter = new IndexWriter(dir, Lucene.STANDARD_ANALYZER, true, IndexWriter.MaxFieldLength.UNLIMITED); + indexWriter.addDocument(doc() + .add(field("_id", "1")).build()); + IndexReader reader = indexWriter.getReader(); + assertThat(reader.numDocs(), equalTo(1)); + + indexWriter.prepareCommit(); + reader = indexWriter.getReader(); + assertThat(reader.numDocs(), equalTo(1)); + + indexWriter.addDocument(doc() + .add(field("_id", "2")).build()); + indexWriter.commit(); + reader = indexWriter.getReader(); + assertThat(reader.numDocs(), equalTo(2)); + } + @Test public void testSimpleNumericOps() throws Exception { Directory dir = new RAMDirectory(); IndexWriter indexWriter = new IndexWriter(dir, Lucene.STANDARD_ANALYZER, true, IndexWriter.MaxFieldLength.UNLIMITED);