mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 14:26:27 +00:00
internal bulk operaiton on the index shard
This commit is contained in:
parent
85160ae341
commit
d96ffe9153
@ -36,6 +36,7 @@ import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.robin.RobinEngine;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
|
||||
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
@ -163,10 +164,11 @@ public class SimpleEngineBenchmark {
|
||||
String sId = Integer.toString(id);
|
||||
Document doc = doc().add(field("_id", sId))
|
||||
.add(field("content", contentItem)).build();
|
||||
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", doc, TRANSLOG_PAYLOAD, false);
|
||||
if (create) {
|
||||
engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER, "type", sId, TRANSLOG_PAYLOAD));
|
||||
engine.create(new Engine.Create(pDoc, Lucene.STANDARD_ANALYZER));
|
||||
} else {
|
||||
engine.index(new Engine.Index(new Term("_id", sId), doc, Lucene.STANDARD_ANALYZER, "type", sId, TRANSLOG_PAYLOAD));
|
||||
engine.index(new Engine.Index(new Term("_id", sId), pDoc, Lucene.STANDARD_ANALYZER));
|
||||
}
|
||||
}
|
||||
engine.refresh(new Engine.Refresh(true));
|
||||
@ -276,10 +278,11 @@ public class SimpleEngineBenchmark {
|
||||
String sId = Integer.toString(id);
|
||||
Document doc = doc().add(field("_id", sId))
|
||||
.add(field("content", content(id))).build();
|
||||
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", doc, TRANSLOG_PAYLOAD, false);
|
||||
if (create) {
|
||||
engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER, "type", sId, TRANSLOG_PAYLOAD));
|
||||
engine.create(new Engine.Create(pDoc, Lucene.STANDARD_ANALYZER));
|
||||
} else {
|
||||
engine.index(new Engine.Index(new Term("_id", sId), doc, Lucene.STANDARD_ANALYZER, "type", sId, TRANSLOG_PAYLOAD));
|
||||
engine.index(new Engine.Index(new Term("_id", sId), pDoc, Lucene.STANDARD_ANALYZER));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -31,6 +31,7 @@ import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadSafe;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.shard.IndexShardComponent;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
||||
@ -52,6 +53,8 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||
*/
|
||||
void start() throws EngineException;
|
||||
|
||||
EngineException[] bulk(Bulk bulk) throws EngineException;
|
||||
|
||||
void create(Create create) throws EngineException;
|
||||
|
||||
void index(Index index) throws EngineException;
|
||||
@ -237,31 +240,55 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||
}
|
||||
}
|
||||
|
||||
static class Create {
|
||||
private final Document document;
|
||||
private final Analyzer analyzer;
|
||||
private final String type;
|
||||
private final String id;
|
||||
private final byte[] source;
|
||||
static interface Operation {
|
||||
static enum Type {
|
||||
CREATE,
|
||||
INDEX,
|
||||
DELETE
|
||||
}
|
||||
|
||||
public Create(Document document, Analyzer analyzer, String type, String id, byte[] source) {
|
||||
this.document = document;
|
||||
Type opType();
|
||||
}
|
||||
|
||||
static class Bulk {
|
||||
private final Operation[] ops;
|
||||
|
||||
public Bulk(Operation[] ops) {
|
||||
this.ops = ops;
|
||||
}
|
||||
|
||||
public Operation[] ops() {
|
||||
return this.ops;
|
||||
}
|
||||
}
|
||||
|
||||
static class Create implements Operation {
|
||||
private final ParsedDocument doc;
|
||||
private final Analyzer analyzer;
|
||||
|
||||
public Create(ParsedDocument doc, Analyzer analyzer) {
|
||||
this.doc = doc;
|
||||
this.analyzer = analyzer;
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
@Override public Type opType() {
|
||||
return Type.CREATE;
|
||||
}
|
||||
|
||||
public ParsedDocument parsedDoc() {
|
||||
return this.doc;
|
||||
}
|
||||
|
||||
public String type() {
|
||||
return this.type;
|
||||
return this.doc.type();
|
||||
}
|
||||
|
||||
public String id() {
|
||||
return this.id;
|
||||
return this.doc.id();
|
||||
}
|
||||
|
||||
public Document doc() {
|
||||
return this.document;
|
||||
return this.doc.doc();
|
||||
}
|
||||
|
||||
public Analyzer analyzer() {
|
||||
@ -269,33 +296,35 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||
}
|
||||
|
||||
public byte[] source() {
|
||||
return this.source;
|
||||
return this.doc.source();
|
||||
}
|
||||
}
|
||||
|
||||
static class Index {
|
||||
static class Index implements Operation {
|
||||
private final Term uid;
|
||||
private final Document document;
|
||||
private final ParsedDocument doc;
|
||||
private final Analyzer analyzer;
|
||||
private final String type;
|
||||
private final String id;
|
||||
private final byte[] source;
|
||||
|
||||
public Index(Term uid, Document document, Analyzer analyzer, String type, String id, byte[] source) {
|
||||
public Index(Term uid, ParsedDocument doc, Analyzer analyzer) {
|
||||
this.uid = uid;
|
||||
this.document = document;
|
||||
this.doc = doc;
|
||||
this.analyzer = analyzer;
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
@Override public Type opType() {
|
||||
return Type.INDEX;
|
||||
}
|
||||
|
||||
public Term uid() {
|
||||
return this.uid;
|
||||
}
|
||||
|
||||
public ParsedDocument parsedDoc() {
|
||||
return this.doc;
|
||||
}
|
||||
|
||||
public Document doc() {
|
||||
return this.document;
|
||||
return this.doc.doc();
|
||||
}
|
||||
|
||||
public Analyzer analyzer() {
|
||||
@ -303,25 +332,29 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||
}
|
||||
|
||||
public String id() {
|
||||
return this.id;
|
||||
return this.doc.id();
|
||||
}
|
||||
|
||||
public String type() {
|
||||
return this.type;
|
||||
return this.doc.type();
|
||||
}
|
||||
|
||||
public byte[] source() {
|
||||
return this.source;
|
||||
return this.doc.source();
|
||||
}
|
||||
}
|
||||
|
||||
static class Delete {
|
||||
static class Delete implements Operation {
|
||||
private final Term uid;
|
||||
|
||||
public Delete(Term uid) {
|
||||
this.uid = uid;
|
||||
}
|
||||
|
||||
@Override public Type opType() {
|
||||
return Type.DELETE;
|
||||
}
|
||||
|
||||
public Term uid() {
|
||||
return this.uid;
|
||||
}
|
||||
|
@ -179,6 +179,58 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||
return refreshInterval;
|
||||
}
|
||||
|
||||
@Override public EngineException[] bulk(Bulk bulk) throws EngineException {
|
||||
EngineException[] failures = null;
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
IndexWriter writer = this.indexWriter;
|
||||
if (writer == null) {
|
||||
throw new EngineClosedException(shardId);
|
||||
}
|
||||
for (int i = 0; i < bulk.ops().length; i++) {
|
||||
Operation op = bulk.ops()[i];
|
||||
try {
|
||||
switch (op.opType()) {
|
||||
case CREATE:
|
||||
Create create = (Create) op;
|
||||
writer.addDocument(create.doc(), create.analyzer());
|
||||
translog.add(new Translog.Create(create));
|
||||
break;
|
||||
case INDEX:
|
||||
Index index = (Index) op;
|
||||
writer.updateDocument(index.uid(), index.doc(), index.analyzer());
|
||||
translog.add(new Translog.Index(index));
|
||||
break;
|
||||
case DELETE:
|
||||
Delete delete = (Delete) op;
|
||||
writer.deleteDocuments(delete.uid());
|
||||
translog.add(new Translog.Delete(delete));
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (failures == null) {
|
||||
failures = new EngineException[bulk.ops().length];
|
||||
}
|
||||
switch (op.opType()) {
|
||||
case CREATE:
|
||||
failures[i] = new CreateFailedEngineException(shardId, (Create) op, e);
|
||||
break;
|
||||
case INDEX:
|
||||
failures[i] = new IndexFailedEngineException(shardId, (Index) op, e);
|
||||
break;
|
||||
case DELETE:
|
||||
failures[i] = new DeleteFailedEngineException(shardId, (Delete) op, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
dirty = true;
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
return failures;
|
||||
}
|
||||
|
||||
@Override public void create(Create create) throws EngineException {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
|
@ -48,13 +48,25 @@ public interface IndexShard extends IndexShardComponent, CloseableComponent {
|
||||
*/
|
||||
ByteSizeValue estimateFlushableMemorySize() throws ElasticSearchException;
|
||||
|
||||
Engine.Create prepareCreate(String type, String id, byte[] source) throws ElasticSearchException;
|
||||
|
||||
ParsedDocument create(String type, String id, byte[] source) throws ElasticSearchException;
|
||||
|
||||
ParsedDocument create(Engine.Create create) throws ElasticSearchException;
|
||||
|
||||
Engine.Index prepareIndex(String type, String id, byte[] source) throws ElasticSearchException;
|
||||
|
||||
ParsedDocument index(Engine.Index index) throws ElasticSearchException;
|
||||
|
||||
ParsedDocument index(String type, String id, byte[] source) throws ElasticSearchException;
|
||||
|
||||
void delete(String type, String id);
|
||||
Engine.Delete prepareDelete(String type, String id) throws ElasticSearchException;
|
||||
|
||||
void delete(Term uid);
|
||||
void delete(Engine.Delete delete) throws ElasticSearchException;
|
||||
|
||||
void delete(String type, String id) throws ElasticSearchException;
|
||||
|
||||
void delete(Term uid) throws ElasticSearchException;
|
||||
|
||||
void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException;
|
||||
|
||||
|
@ -196,61 +196,72 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||
return engine.estimateFlushableMemorySize();
|
||||
}
|
||||
|
||||
@Override public ParsedDocument create(String type, String id, byte[] source) throws ElasticSearchException {
|
||||
writeAllowed();
|
||||
return innerCreate(type, id, source);
|
||||
}
|
||||
|
||||
private ParsedDocument innerCreate(String type, String id, byte[] source) {
|
||||
@Override public Engine.Create prepareCreate(String type, String id, byte[] source) throws ElasticSearchException {
|
||||
DocumentMapper docMapper = mapperService.type(type);
|
||||
if (docMapper == null) {
|
||||
throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]");
|
||||
}
|
||||
ParsedDocument doc = docMapper.parse(type, id, source);
|
||||
return new Engine.Create(doc, docMapper.mappers().indexAnalyzer());
|
||||
}
|
||||
|
||||
@Override public ParsedDocument create(String type, String id, byte[] source) throws ElasticSearchException {
|
||||
return create(prepareCreate(type, id, source));
|
||||
}
|
||||
|
||||
@Override public ParsedDocument create(Engine.Create create) throws ElasticSearchException {
|
||||
writeAllowed();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("index {}", doc);
|
||||
logger.trace("index {}", create.doc());
|
||||
}
|
||||
engine.create(new Engine.Create(doc.doc(), docMapper.mappers().indexAnalyzer(), docMapper.type(), doc.id(), doc.source()));
|
||||
return doc;
|
||||
engine.create(create);
|
||||
return create.parsedDoc();
|
||||
}
|
||||
|
||||
@Override public Engine.Index prepareIndex(String type, String id, byte[] source) throws ElasticSearchException {
|
||||
DocumentMapper docMapper = mapperService.type(type);
|
||||
if (docMapper == null) {
|
||||
throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]");
|
||||
}
|
||||
ParsedDocument doc = docMapper.parse(type, id, source);
|
||||
return new Engine.Index(docMapper.uidMapper().term(doc.uid()), doc, docMapper.mappers().indexAnalyzer());
|
||||
}
|
||||
|
||||
@Override public ParsedDocument index(String type, String id, byte[] source) throws ElasticSearchException {
|
||||
writeAllowed();
|
||||
return innerIndex(type, id, source);
|
||||
return index(prepareIndex(type, id, source));
|
||||
}
|
||||
|
||||
private ParsedDocument innerIndex(String type, String id, byte[] source) {
|
||||
@Override public ParsedDocument index(Engine.Index index) throws ElasticSearchException {
|
||||
writeAllowed();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("index {}", index.doc());
|
||||
}
|
||||
engine.index(index);
|
||||
return index.parsedDoc();
|
||||
}
|
||||
|
||||
@Override public Engine.Delete prepareDelete(String type, String id) throws ElasticSearchException {
|
||||
DocumentMapper docMapper = mapperService.type(type);
|
||||
if (docMapper == null) {
|
||||
throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]");
|
||||
}
|
||||
ParsedDocument doc = docMapper.parse(type, id, source);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("index {}", doc);
|
||||
}
|
||||
engine.index(new Engine.Index(docMapper.uidMapper().term(doc.uid()), doc.doc(), docMapper.mappers().indexAnalyzer(), docMapper.type(), doc.id(), doc.source()));
|
||||
return doc;
|
||||
return new Engine.Delete(docMapper.uidMapper().term(type, id));
|
||||
}
|
||||
|
||||
@Override public void delete(String type, String id) {
|
||||
writeAllowed();
|
||||
DocumentMapper docMapper = mapperService.type(type);
|
||||
if (docMapper == null) {
|
||||
throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]");
|
||||
}
|
||||
innerDelete(docMapper.uidMapper().term(type, id));
|
||||
delete(prepareDelete(type, id));
|
||||
}
|
||||
|
||||
@Override public void delete(Term uid) {
|
||||
writeAllowed();
|
||||
innerDelete(uid);
|
||||
delete(new Engine.Delete(uid));
|
||||
}
|
||||
|
||||
private void innerDelete(Term uid) {
|
||||
@Override public void delete(Engine.Delete delete) throws ElasticSearchException {
|
||||
writeAllowed();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("delete [{}]", uid.text());
|
||||
logger.trace("delete [{}]", delete.uid().text());
|
||||
}
|
||||
engine.delete(new Engine.Delete(uid));
|
||||
engine.delete(delete);
|
||||
}
|
||||
|
||||
@Override public void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
|
||||
@ -436,15 +447,15 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||
switch (operation.opType()) {
|
||||
case CREATE:
|
||||
Translog.Create create = (Translog.Create) operation;
|
||||
innerCreate(create.type(), create.id(), create.source());
|
||||
engine.create(prepareCreate(create.type(), create.id(), create.source()));
|
||||
break;
|
||||
case SAVE:
|
||||
Translog.Index index = (Translog.Index) operation;
|
||||
innerIndex(index.type(), index.id(), index.source());
|
||||
engine.index(prepareIndex(index.type(), index.id(), index.source()));
|
||||
break;
|
||||
case DELETE:
|
||||
Translog.Delete delete = (Translog.Delete) operation;
|
||||
innerDelete(delete.uid());
|
||||
engine.delete(new Engine.Delete(delete.uid()));
|
||||
break;
|
||||
case DELETE_BY_QUERY:
|
||||
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation;
|
||||
|
@ -22,11 +22,13 @@ package org.elasticsearch.index.engine;
|
||||
import org.apache.lucene.index.IndexDeletionPolicy;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
|
||||
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
|
||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||
@ -42,6 +44,7 @@ import org.testng.annotations.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -115,7 +118,8 @@ public abstract class AbstractSimpleEngineTests {
|
||||
searchResult.release();
|
||||
|
||||
// create a document
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1));
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false);
|
||||
engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
// its not there...
|
||||
searchResult = engine.searcher();
|
||||
@ -133,7 +137,8 @@ public abstract class AbstractSimpleEngineTests {
|
||||
searchResult.release();
|
||||
|
||||
// now do an update
|
||||
engine.index(new Engine.Index(newUid("1"), doc().add(field("_uid", "1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1));
|
||||
doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test1")).build(), B_1, false);
|
||||
engine.index(new Engine.Index(newUid("1"), doc, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
// its not updated yet...
|
||||
searchResult = engine.searcher();
|
||||
@ -171,7 +176,8 @@ public abstract class AbstractSimpleEngineTests {
|
||||
searchResult.release();
|
||||
|
||||
// add it back
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1));
|
||||
doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false);
|
||||
engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
// its not there...
|
||||
searchResult = engine.searcher();
|
||||
@ -195,7 +201,8 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
// make sure we can still work with the engine
|
||||
// now do an update
|
||||
engine.index(new Engine.Index(newUid("1"), doc().add(field("_uid", "1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1));
|
||||
doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test1")).build(), B_1, false);
|
||||
engine.index(new Engine.Index(newUid("1"), doc, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
// its not updated yet...
|
||||
searchResult = engine.searcher();
|
||||
@ -216,13 +223,46 @@ public abstract class AbstractSimpleEngineTests {
|
||||
engine.close();
|
||||
}
|
||||
|
||||
@Test public void testBulkOperations() throws Exception {
|
||||
Engine.Searcher searchResult = engine.searcher();
|
||||
assertThat(searchResult, engineSearcherTotalHits(0));
|
||||
searchResult.release();
|
||||
|
||||
List<Engine.Operation> ops = Lists.newArrayList();
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "1_test")).build(), B_1, false);
|
||||
ops.add(new Engine.Create(doc, Lucene.STANDARD_ANALYZER));
|
||||
doc = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "2_test")).build(), B_2, false);
|
||||
ops.add(new Engine.Create(doc, Lucene.STANDARD_ANALYZER));
|
||||
doc = new ParsedDocument("3", "3", "test", doc().add(field("_uid", "3")).add(field("value", "3_test")).build(), B_3, false);
|
||||
ops.add(new Engine.Create(doc, Lucene.STANDARD_ANALYZER));
|
||||
doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "1_test1")).build(), B_1, false);
|
||||
ops.add(new Engine.Index(newUid("1"), doc, Lucene.STANDARD_ANALYZER));
|
||||
ops.add(new Engine.Delete(newUid("2")));
|
||||
|
||||
EngineException[] failures = engine.bulk(new Engine.Bulk(ops.toArray(new Engine.Operation[ops.size()])));
|
||||
assertThat(failures, nullValue());
|
||||
|
||||
engine.refresh(new Engine.Refresh(true));
|
||||
|
||||
searchResult = engine.searcher();
|
||||
assertThat(searchResult, engineSearcherTotalHits(2));
|
||||
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("_uid", "1")), 1));
|
||||
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("_uid", "2")), 0));
|
||||
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("_uid", "3")), 1));
|
||||
|
||||
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "1_test")), 0));
|
||||
assertThat(searchResult, engineSearcherTotalHits(new TermQuery(new Term("value", "1_test1")), 1));
|
||||
searchResult.release();
|
||||
}
|
||||
|
||||
@Test public void testSearchResultRelease() throws Exception {
|
||||
Engine.Searcher searchResult = engine.searcher();
|
||||
assertThat(searchResult, engineSearcherTotalHits(0));
|
||||
searchResult.release();
|
||||
|
||||
// create a document
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1));
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false);
|
||||
engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
// its not there...
|
||||
searchResult = engine.searcher();
|
||||
@ -254,7 +294,8 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testSimpleSnapshot() throws Exception {
|
||||
// create a document
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1));
|
||||
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false);
|
||||
engine.create(new Engine.Create(doc1, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
|
||||
@ -269,9 +310,11 @@ public abstract class AbstractSimpleEngineTests {
|
||||
Future<Object> future = executorService.submit(new Callable<Object>() {
|
||||
@Override public Object call() throws Exception {
|
||||
engine.flush(new Engine.Flush());
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "2", B_2));
|
||||
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "test")).build(), B_2, false);
|
||||
engine.create(new Engine.Create(doc2, Lucene.STANDARD_ANALYZER));
|
||||
engine.flush(new Engine.Flush());
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "3", B_3));
|
||||
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", doc().add(field("_uid", "3")).add(field("value", "test")).build(), B_3, false);
|
||||
engine.create(new Engine.Create(doc3, Lucene.STANDARD_ANALYZER));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
@ -305,7 +348,8 @@ public abstract class AbstractSimpleEngineTests {
|
||||
}
|
||||
|
||||
@Test public void testSimpleRecover() throws Exception {
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1));
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false);
|
||||
engine.create(new Engine.Create(doc, Lucene.STANDARD_ANALYZER));
|
||||
engine.flush(new Engine.Flush());
|
||||
|
||||
engine.recover(new Engine.RecoveryHandler() {
|
||||
@ -345,9 +389,11 @@ public abstract class AbstractSimpleEngineTests {
|
||||
}
|
||||
|
||||
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2() throws Exception {
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1));
|
||||
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false);
|
||||
engine.create(new Engine.Create(doc1, Lucene.STANDARD_ANALYZER));
|
||||
engine.flush(new Engine.Flush());
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "2", B_2));
|
||||
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "test")).build(), B_2, false);
|
||||
engine.create(new Engine.Create(doc2, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
engine.recover(new Engine.RecoveryHandler() {
|
||||
@Override public void phase1(SnapshotIndexCommit snapshot) throws EngineException {
|
||||
@ -370,9 +416,11 @@ public abstract class AbstractSimpleEngineTests {
|
||||
}
|
||||
|
||||
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2AndPhase3() throws Exception {
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "1", B_1));
|
||||
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), B_1, false);
|
||||
engine.create(new Engine.Create(doc1, Lucene.STANDARD_ANALYZER));
|
||||
engine.flush(new Engine.Flush());
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "2", B_2));
|
||||
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "test")).build(), B_2, false);
|
||||
engine.create(new Engine.Create(doc2, Lucene.STANDARD_ANALYZER));
|
||||
|
||||
engine.recover(new Engine.RecoveryHandler() {
|
||||
@Override public void phase1(SnapshotIndexCommit snapshot) throws EngineException {
|
||||
@ -385,7 +433,8 @@ public abstract class AbstractSimpleEngineTests {
|
||||
assertThat(create.source(), equalTo(B_2));
|
||||
|
||||
// add for phase3
|
||||
engine.create(new Engine.Create(doc().add(field("_uid", "3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, "test", "3", B_3));
|
||||
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", doc().add(field("_uid", "3")).add(field("value", "test")).build(), B_3, false);
|
||||
engine.create(new Engine.Create(doc3, Lucene.STANDARD_ANALYZER));
|
||||
}
|
||||
|
||||
@Override public void phase3(Translog.Snapshot snapshot) throws EngineException {
|
||||
|
Loading…
x
Reference in New Issue
Block a user