From d96ffe915390bbc925170cea586c2c6b6a3fcc4e Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 14 Sep 2010 01:46:44 +0200 Subject: [PATCH] internal bulk operaiton on the index shard --- .../index/engine/SimpleEngineBenchmark.java | 11 ++- .../elasticsearch/index/engine/Engine.java | 93 +++++++++++++------ .../index/engine/robin/RobinEngine.java | 52 +++++++++++ .../index/shard/service/IndexShard.java | 16 +++- .../shard/service/InternalIndexShard.java | 75 ++++++++------- .../engine/AbstractSimpleEngineTests.java | 77 ++++++++++++--- 6 files changed, 242 insertions(+), 82 deletions(-) diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java index 98feef3b657..bf2e8818b98 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.robin.RobinEngine; +import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; import org.elasticsearch.index.shard.ShardId; @@ -163,10 +164,11 @@ public class SimpleEngineBenchmark { String sId = Integer.toString(id); Document doc = doc().add(field("_id", sId)) .add(field("content", contentItem)).build(); + ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", doc, TRANSLOG_PAYLOAD, false); if (create) { - engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER, "type", sId, TRANSLOG_PAYLOAD)); + engine.create(new Engine.Create(pDoc, Lucene.STANDARD_ANALYZER)); } else { - engine.index(new Engine.Index(new Term("_id", sId), doc, Lucene.STANDARD_ANALYZER, "type", sId, TRANSLOG_PAYLOAD)); + engine.index(new Engine.Index(new Term("_id", sId), pDoc, Lucene.STANDARD_ANALYZER)); } } engine.refresh(new Engine.Refresh(true)); @@ -276,10 +278,11 @@ public class SimpleEngineBenchmark { String sId = Integer.toString(id); Document doc = doc().add(field("_id", sId)) .add(field("content", content(id))).build(); + ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", doc, TRANSLOG_PAYLOAD, false); if (create) { - engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER, "type", sId, TRANSLOG_PAYLOAD)); + engine.create(new Engine.Create(pDoc, Lucene.STANDARD_ANALYZER)); } else { - engine.index(new Engine.Index(new Term("_id", sId), doc, Lucene.STANDARD_ANALYZER, "type", sId, TRANSLOG_PAYLOAD)); + engine.index(new Engine.Index(new Term("_id", sId), pDoc, Lucene.STANDARD_ANALYZER)); } } } catch (Exception e) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java index bdfec362f3b..28405ba3274 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.ThreadSafe; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; +import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.translog.Translog; @@ -52,6 +53,8 @@ public interface Engine extends IndexShardComponent, CloseableComponent { */ void start() throws EngineException; + EngineException[] bulk(Bulk bulk) throws EngineException; + void create(Create create) throws EngineException; void index(Index index) throws EngineException; @@ -237,31 +240,55 @@ public interface Engine extends IndexShardComponent, CloseableComponent { } } - static class Create { - private final Document document; - private final Analyzer analyzer; - private final String type; - private final String id; - private final byte[] source; + static interface Operation { + static enum Type { + CREATE, + INDEX, + DELETE + } - public Create(Document document, Analyzer analyzer, String type, String id, byte[] source) { - this.document = document; + Type opType(); + } + + static class Bulk { + private final Operation[] ops; + + public Bulk(Operation[] ops) { + this.ops = ops; + } + + public Operation[] ops() { + return this.ops; + } + } + + static class Create implements Operation { + private final ParsedDocument doc; + private final Analyzer analyzer; + + public Create(ParsedDocument doc, Analyzer analyzer) { + this.doc = doc; this.analyzer = analyzer; - this.type = type; - this.id = id; - this.source = source; + } + + @Override public Type opType() { + return Type.CREATE; + } + + public ParsedDocument parsedDoc() { + return this.doc; } public String type() { - return this.type; + return this.doc.type(); } public String id() { - return this.id; + return this.doc.id(); } public Document doc() { - return this.document; + return this.doc.doc(); } public Analyzer analyzer() { @@ -269,33 +296,35 @@ public interface Engine extends IndexShardComponent, CloseableComponent { } public byte[] source() { - return this.source; + return this.doc.source(); } } - static class Index { + static class Index implements Operation { private final Term uid; - private final Document document; + private final ParsedDocument doc; private final Analyzer analyzer; - private final String type; - private final String id; - private final byte[] source; - public Index(Term uid, Document document, Analyzer analyzer, String type, String id, byte[] source) { + public Index(Term uid, ParsedDocument doc, Analyzer analyzer) { this.uid = uid; - this.document = document; + this.doc = doc; this.analyzer = analyzer; - this.type = type; - this.id = id; - this.source = source; + } + + @Override public Type opType() { + return Type.INDEX; } public Term uid() { return this.uid; } + public ParsedDocument parsedDoc() { + return this.doc; + } + public Document doc() { - return this.document; + return this.doc.doc(); } public Analyzer analyzer() { @@ -303,25 +332,29 @@ public interface Engine extends IndexShardComponent, CloseableComponent { } public String id() { - return this.id; + return this.doc.id(); } public String type() { - return this.type; + return this.doc.type(); } public byte[] source() { - return this.source; + return this.doc.source(); } } - static class Delete { + static class Delete implements Operation { private final Term uid; public Delete(Term uid) { this.uid = uid; } + @Override public Type opType() { + return Type.DELETE; + } + public Term uid() { return this.uid; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 462a965c7bb..2a17da9f43c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -179,6 +179,58 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, return refreshInterval; } + @Override public EngineException[] bulk(Bulk bulk) throws EngineException { + EngineException[] failures = null; + rwl.readLock().lock(); + try { + IndexWriter writer = this.indexWriter; + if (writer == null) { + throw new EngineClosedException(shardId); + } + for (int i = 0; i < bulk.ops().length; i++) { + Operation op = bulk.ops()[i]; + try { + switch (op.opType()) { + case CREATE: + Create create = (Create) op; + writer.addDocument(create.doc(), create.analyzer()); + translog.add(new Translog.Create(create)); + break; + case INDEX: + Index index = (Index) op; + writer.updateDocument(index.uid(), index.doc(), index.analyzer()); + translog.add(new Translog.Index(index)); + break; + case DELETE: + Delete delete = (Delete) op; + writer.deleteDocuments(delete.uid()); + translog.add(new Translog.Delete(delete)); + break; + } + } catch (Exception e) { + if (failures == null) { + failures = new EngineException[bulk.ops().length]; + } + switch (op.opType()) { + case CREATE: + failures[i] = new CreateFailedEngineException(shardId, (Create) op, e); + break; + case INDEX: + failures[i] = new IndexFailedEngineException(shardId, (Index) op, e); + break; + case DELETE: + failures[i] = new DeleteFailedEngineException(shardId, (Delete) op, e); + break; + } + } + } + dirty = true; + } finally { + rwl.readLock().unlock(); + } + return failures; + } + @Override public void create(Create create) throws EngineException { rwl.readLock().lock(); try { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index 45ca71fbc22..579973c152c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -48,13 +48,25 @@ public interface IndexShard extends IndexShardComponent, CloseableComponent { */ ByteSizeValue estimateFlushableMemorySize() throws ElasticSearchException; + Engine.Create prepareCreate(String type, String id, byte[] source) throws ElasticSearchException; + ParsedDocument create(String type, String id, byte[] source) throws ElasticSearchException; + ParsedDocument create(Engine.Create create) throws ElasticSearchException; + + Engine.Index prepareIndex(String type, String id, byte[] source) throws ElasticSearchException; + + ParsedDocument index(Engine.Index index) throws ElasticSearchException; + ParsedDocument index(String type, String id, byte[] source) throws ElasticSearchException; - void delete(String type, String id); + Engine.Delete prepareDelete(String type, String id) throws ElasticSearchException; - void delete(Term uid); + void delete(Engine.Delete delete) throws ElasticSearchException; + + void delete(String type, String id) throws ElasticSearchException; + + void delete(Term uid) throws ElasticSearchException; void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 8d0381cbb85..b9e4aa53c02 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -196,61 +196,72 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I return engine.estimateFlushableMemorySize(); } - @Override public ParsedDocument create(String type, String id, byte[] source) throws ElasticSearchException { - writeAllowed(); - return innerCreate(type, id, source); - } - - private ParsedDocument innerCreate(String type, String id, byte[] source) { + @Override public Engine.Create prepareCreate(String type, String id, byte[] source) throws ElasticSearchException { DocumentMapper docMapper = mapperService.type(type); if (docMapper == null) { throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]"); } ParsedDocument doc = docMapper.parse(type, id, source); + return new Engine.Create(doc, docMapper.mappers().indexAnalyzer()); + } + + @Override public ParsedDocument create(String type, String id, byte[] source) throws ElasticSearchException { + return create(prepareCreate(type, id, source)); + } + + @Override public ParsedDocument create(Engine.Create create) throws ElasticSearchException { + writeAllowed(); if (logger.isTraceEnabled()) { - logger.trace("index {}", doc); + logger.trace("index {}", create.doc()); } - engine.create(new Engine.Create(doc.doc(), docMapper.mappers().indexAnalyzer(), docMapper.type(), doc.id(), doc.source())); - return doc; + engine.create(create); + return create.parsedDoc(); + } + + @Override public Engine.Index prepareIndex(String type, String id, byte[] source) throws ElasticSearchException { + DocumentMapper docMapper = mapperService.type(type); + if (docMapper == null) { + throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]"); + } + ParsedDocument doc = docMapper.parse(type, id, source); + return new Engine.Index(docMapper.uidMapper().term(doc.uid()), doc, docMapper.mappers().indexAnalyzer()); } @Override public ParsedDocument index(String type, String id, byte[] source) throws ElasticSearchException { - writeAllowed(); - return innerIndex(type, id, source); + return index(prepareIndex(type, id, source)); } - private ParsedDocument innerIndex(String type, String id, byte[] source) { + @Override public ParsedDocument index(Engine.Index index) throws ElasticSearchException { + writeAllowed(); + if (logger.isTraceEnabled()) { + logger.trace("index {}", index.doc()); + } + engine.index(index); + return index.parsedDoc(); + } + + @Override public Engine.Delete prepareDelete(String type, String id) throws ElasticSearchException { DocumentMapper docMapper = mapperService.type(type); if (docMapper == null) { throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]"); } - ParsedDocument doc = docMapper.parse(type, id, source); - if (logger.isTraceEnabled()) { - logger.trace("index {}", doc); - } - engine.index(new Engine.Index(docMapper.uidMapper().term(doc.uid()), doc.doc(), docMapper.mappers().indexAnalyzer(), docMapper.type(), doc.id(), doc.source())); - return doc; + return new Engine.Delete(docMapper.uidMapper().term(type, id)); } @Override public void delete(String type, String id) { - writeAllowed(); - DocumentMapper docMapper = mapperService.type(type); - if (docMapper == null) { - throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]"); - } - innerDelete(docMapper.uidMapper().term(type, id)); + delete(prepareDelete(type, id)); } @Override public void delete(Term uid) { - writeAllowed(); - innerDelete(uid); + delete(new Engine.Delete(uid)); } - private void innerDelete(Term uid) { + @Override public void delete(Engine.Delete delete) throws ElasticSearchException { + writeAllowed(); if (logger.isTraceEnabled()) { - logger.trace("delete [{}]", uid.text()); + logger.trace("delete [{}]", delete.uid().text()); } - engine.delete(new Engine.Delete(uid)); + engine.delete(delete); } @Override public void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException { @@ -436,15 +447,15 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I switch (operation.opType()) { case CREATE: Translog.Create create = (Translog.Create) operation; - innerCreate(create.type(), create.id(), create.source()); + engine.create(prepareCreate(create.type(), create.id(), create.source())); break; case SAVE: Translog.Index index = (Translog.Index) operation; - innerIndex(index.type(), index.id(), index.source()); + engine.index(prepareIndex(index.type(), index.id(), index.source())); break; case DELETE: Translog.Delete delete = (Translog.Delete) operation; - innerDelete(delete.uid()); + engine.delete(new Engine.Delete(delete.uid())); break; case DELETE_BY_QUERY: Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation; diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java index dfc61687961..fab89dfe5aa 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java @@ -22,11 +22,13 @@ package org.elasticsearch.index.engine; import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; +import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.Index; import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; +import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; @@ -42,6 +44,7 @@ import org.testng.annotations.Test; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -115,7 +118,8 @@ public abstract class AbstractSimpleEngineTests { searchResult.release(); // create a document - engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1)); + ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false); + engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER)); // its not there... searchResult = engine.searcher(); @@ -133,7 +137,8 @@ public abstract class AbstractSimpleEngineTests { searchResult.release(); // now do an update - engine.index(new Engine.Index(newUid("1"), doc().add(field("_uid", "1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1)); + doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test1")).build(), B_1, false); + engine.index(new Engine.Index(newUid("1"), doc, Lucene.STANDARD_ANALYZER)); // its not updated yet... searchResult = engine.searcher(); @@ -171,7 +176,8 @@ public abstract class AbstractSimpleEngineTests { searchResult.release(); // add it back - engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1)); + doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false); + engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER)); // its not there... searchResult = engine.searcher(); @@ -195,7 +201,8 @@ public abstract class AbstractSimpleEngineTests { // make sure we can still work with the engine // now do an update - engine.index(new Engine.Index(newUid("1"), doc().add(field("_uid", "1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1)); + doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test1")).build(), B_1, false); + engine.index(new Engine.Index(newUid("1"), doc, Lucene.STANDARD_ANALYZER)); // its not updated yet... searchResult = engine.searcher(); @@ -216,13 +223,46 @@ public abstract class AbstractSimpleEngineTests { engine.close(); } + @Test public void testBulkOperations() throws Exception { + Engine.Searcher searchResult = engine.searcher(); + assertThat(searchResult, engineSearcherTotalHits(0)); + searchResult.release(); + + List ops = Lists.newArrayList(); + ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "1_test")).build(), B_1, false); + ops.add(new Engine.Create(doc, Lucene.STANDARD_ANALYZER)); + doc = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "2_test")).build(), B_2, false); + ops.add(new Engine.Create(doc, Lucene.STANDARD_ANALYZER)); + doc = new ParsedDocument("3", "3", "test", doc().add(field("_uid", "3")).add(field("value", "3_test")).build(), B_3, false); + ops.add(new Engine.Create(doc, Lucene.STANDARD_ANALYZER)); + doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "1_test1")).build(), B_1, false); + ops.add(new Engine.Index(newUid("1"), doc, Lucene.STANDARD_ANALYZER)); + ops.add(new Engine.Delete(newUid("2"))); + + EngineException[] failures = engine.bulk(new Engine.Bulk(ops.toArray(new Engine.Operation[ops.size()]))); + assertThat(failures, nullValue()); + + engine.refresh(new Engine.Refresh(true)); + + searchResult = engine.searcher(); + assertThat(searchResult, engineSearcherTotalHits(2)); + assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("_uid", "1")), 1)); + assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("_uid", "2")), 0)); + assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("_uid", "3")), 1)); + + assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "1_test")), 0)); + assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "1_test1")), 1)); + searchResult.release(); + } + @Test public void testSearchResultRelease() throws Exception { Engine.Searcher searchResult = engine.searcher(); assertThat(searchResult, engineSearcherTotalHits(0)); searchResult.release(); // create a document - engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1)); + ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false); + engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER)); // its not there... searchResult = engine.searcher(); @@ -254,7 +294,8 @@ public abstract class AbstractSimpleEngineTests { @Test public void testSimpleSnapshot() throws Exception { // create a document - engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1)); + ParsedDocument doc1 = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false); + engine.create(new Engine.Create(doc1, Lucene.STANDARD_ANALYZER)); final ExecutorService executorService = Executors.newCachedThreadPool(); @@ -269,9 +310,11 @@ public abstract class AbstractSimpleEngineTests { Future future = executorService.submit(new Callable() { @Override public Object call() throws Exception { engine.flush(new Engine.Flush()); - engine.create(new Engine.Create(doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "2", B_2)); + ParsedDocument doc2 = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "test")).build(), B_2, false); + engine.create(new Engine.Create(doc2, Lucene.STANDARD_ANALYZER)); engine.flush(new Engine.Flush()); - engine.create(new Engine.Create(doc().add(field("_uid", "3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "3", B_3)); + ParsedDocument doc3 = new ParsedDocument("3", "3", "test", doc().add(field("_uid", "3")).add(field("value", "test")).build(), B_3, false); + engine.create(new Engine.Create(doc3, Lucene.STANDARD_ANALYZER)); return null; } }); @@ -305,7 +348,8 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testSimpleRecover() throws Exception { - engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1)); + ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false); + engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER)); engine.flush(new Engine.Flush()); engine.recover(new Engine.RecoveryHandler() { @@ -345,9 +389,11 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testRecoverWithOperationsBetweenPhase1AndPhase2() throws Exception { - engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1)); + ParsedDocument doc1 = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false); + engine.create(new Engine.Create(doc1, Lucene.STANDARD_ANALYZER)); engine.flush(new Engine.Flush()); - engine.create(new Engine.Create(doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "2", B_2)); + ParsedDocument doc2 = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "test")).build(), B_2, false); + engine.create(new Engine.Create(doc2, Lucene.STANDARD_ANALYZER)); engine.recover(new Engine.RecoveryHandler() { @Override public void phase1(SnapshotIndexCommit snapshot) throws EngineException { @@ -370,9 +416,11 @@ public abstract class AbstractSimpleEngineTests { } @Test public void testRecoverWithOperationsBetweenPhase1AndPhase2AndPhase3() throws Exception { - engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1)); + ParsedDocument doc1 = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false); + engine.create(new Engine.Create(doc1, Lucene.STANDARD_ANALYZER)); engine.flush(new Engine.Flush()); - engine.create(new Engine.Create(doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "2", B_2)); + ParsedDocument doc2 = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "test")).build(), B_2, false); + engine.create(new Engine.Create(doc2, Lucene.STANDARD_ANALYZER)); engine.recover(new Engine.RecoveryHandler() { @Override public void phase1(SnapshotIndexCommit snapshot) throws EngineException { @@ -385,7 +433,8 @@ public abstract class AbstractSimpleEngineTests { assertThat(create.source(), equalTo(B_2)); // add for phase3 - engine.create(new Engine.Create(doc().add(field("_uid", "3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "3", B_3)); + ParsedDocument doc3 = new ParsedDocument("3", "3", "test", doc().add(field("_uid", "3")).add(field("value", "test")).build(), B_3, false); + engine.create(new Engine.Create(doc3, Lucene.STANDARD_ANALYZER)); } @Override public void phase3(Translog.Snapshot snapshot) throws EngineException {