mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 02:14:54 +00:00
properly clean mapper data, and cache some mapping structures
This commit is contained in:
parent
efb3e97ce4
commit
166493f9d5
@ -167,9 +167,9 @@ public class SimpleEngineBenchmark {
|
||||
.add(field("content", contentItem)).build();
|
||||
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
|
||||
if (create) {
|
||||
engine.create(new Engine.Create(new Term("_id", sId), pDoc));
|
||||
engine.create(new Engine.Create(null, new Term("_id", sId), pDoc));
|
||||
} else {
|
||||
engine.index(new Engine.Index(new Term("_id", sId), pDoc));
|
||||
engine.index(new Engine.Index(null, new Term("_id", sId), pDoc));
|
||||
}
|
||||
}
|
||||
engine.refresh(new Engine.Refresh(true));
|
||||
@ -281,9 +281,9 @@ public class SimpleEngineBenchmark {
|
||||
.add(field("content", content(id))).build();
|
||||
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
|
||||
if (create) {
|
||||
engine.create(new Engine.Create(new Term("_id", sId), pDoc));
|
||||
engine.create(new Engine.Create(null, new Term("_id", sId), pDoc));
|
||||
} else {
|
||||
engine.index(new Engine.Index(new Term("_id", sId), pDoc));
|
||||
engine.index(new Engine.Index(null, new Term("_id", sId), pDoc));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -40,7 +40,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.percolator.PercolatorExecutor;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
@ -106,7 +105,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||
final BulkShardRequest request = shardRequest.request;
|
||||
IndexShard indexShard = indexShard(shardRequest);
|
||||
|
||||
Engine.Operation[] ops = new Engine.Operation[request.items().length];
|
||||
Engine.IndexingOperation[] ops = new Engine.IndexingOperation[request.items().length];
|
||||
|
||||
|
||||
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
|
||||
@ -127,16 +126,15 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
|
||||
.routing(indexRequest.routing()).parent(indexRequest.parent());
|
||||
long version;
|
||||
ParsedDocument doc;
|
||||
Engine.Operation op;
|
||||
Engine.IndexingOperation op;
|
||||
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
|
||||
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
|
||||
doc = indexShard.index(index);
|
||||
indexShard.index(index);
|
||||
version = index.version();
|
||||
op = index;
|
||||
} else {
|
||||
Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
|
||||
doc = indexShard.create(create);
|
||||
indexShard.create(create);
|
||||
version = create.version();
|
||||
op = create;
|
||||
}
|
||||
@ -144,16 +142,18 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||
indexRequest.version(version);
|
||||
|
||||
// update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added
|
||||
if (doc.mappersAdded()) {
|
||||
if (op.parsedDoc().mappersAdded()) {
|
||||
updateMappingOnMaster(indexRequest);
|
||||
}
|
||||
|
||||
// if we are going to percolate, then we need to keep this op for the postPrimary operation
|
||||
if (Strings.hasLength(indexRequest.percolate())) {
|
||||
if (ops == null) {
|
||||
ops = new Engine.Operation[request.items().length];
|
||||
ops = new Engine.IndexingOperation[request.items().length];
|
||||
}
|
||||
ops[i] = op;
|
||||
} else {
|
||||
op.docMapper().processDocumentAfterIndex(op.doc());
|
||||
}
|
||||
|
||||
// add the response
|
||||
@ -200,7 +200,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||
|
||||
@Override protected void postPrimaryOperation(BulkShardRequest request, PrimaryResponse<BulkShardResponse> response) {
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.index());
|
||||
Engine.Operation[] ops = (Engine.Operation[]) response.payload();
|
||||
Engine.IndexingOperation[] ops = (Engine.IndexingOperation[]) response.payload();
|
||||
for (int i = 0; i < ops.length; i++) {
|
||||
BulkItemRequest itemRequest = request.items()[i];
|
||||
BulkItemResponse itemResponse = response.response().responses()[i];
|
||||
@ -208,7 +208,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||
// failure, continue
|
||||
continue;
|
||||
}
|
||||
if (ops[i] == null) {
|
||||
Engine.IndexingOperation op = ops[i];
|
||||
if (op == null) {
|
||||
continue; // failed
|
||||
}
|
||||
if (itemRequest.request() instanceof IndexRequest) {
|
||||
@ -216,17 +217,13 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||
if (!Strings.hasLength(indexRequest.percolate())) {
|
||||
continue;
|
||||
}
|
||||
ParsedDocument doc;
|
||||
if (ops[i] instanceof Engine.Create) {
|
||||
doc = ((Engine.Create) ops[i]).parsedDoc();
|
||||
} else {
|
||||
doc = ((Engine.Index) ops[i]).parsedDoc();
|
||||
}
|
||||
try {
|
||||
PercolatorExecutor.Response percolate = indexService.percolateService().percolate(new PercolatorExecutor.DocAndSourceQueryRequest(doc, indexRequest.percolate()));
|
||||
PercolatorExecutor.Response percolate = indexService.percolateService().percolate(new PercolatorExecutor.DocAndSourceQueryRequest(op.parsedDoc(), indexRequest.percolate()));
|
||||
((IndexResponse) itemResponse.response()).matches(percolate.matches());
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to percolate [{}]", e, itemRequest.request());
|
||||
} finally {
|
||||
op.docMapper().processDocumentAfterIndex(op.doc());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -245,9 +242,11 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
|
||||
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
|
||||
indexShard.index(index);
|
||||
index.docMapper().processDocumentAfterIndex(index.doc());
|
||||
} else {
|
||||
Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
|
||||
indexShard.create(create);
|
||||
create.docMapper().processDocumentAfterIndex(create.doc());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// ignore, we are on backup
|
||||
|
@ -42,7 +42,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.percolator.PercolatorExecutor;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
@ -174,22 +173,24 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||
IndexShard indexShard = indexShard(shardRequest);
|
||||
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
|
||||
.routing(request.routing()).parent(request.parent());
|
||||
ParsedDocument doc;
|
||||
long version;
|
||||
Engine.IndexingOperation op;
|
||||
if (request.opType() == IndexRequest.OpType.INDEX) {
|
||||
Engine.Index index = indexShard.prepareIndex(sourceToParse)
|
||||
.version(request.version())
|
||||
.versionType(request.versionType())
|
||||
.origin(Engine.Operation.Origin.PRIMARY);
|
||||
doc = indexShard.index(index);
|
||||
indexShard.index(index);
|
||||
version = index.version();
|
||||
op = index;
|
||||
} else {
|
||||
Engine.Create create = indexShard.prepareCreate(sourceToParse)
|
||||
.version(request.version())
|
||||
.versionType(request.versionType())
|
||||
.origin(Engine.Operation.Origin.PRIMARY);
|
||||
doc = indexShard.create(create);
|
||||
indexShard.create(create);
|
||||
version = create.version();
|
||||
op = create;
|
||||
}
|
||||
if (request.refresh()) {
|
||||
try {
|
||||
@ -198,26 +199,30 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
if (doc.mappersAdded()) {
|
||||
if (op.parsedDoc().mappersAdded()) {
|
||||
updateMappingOnMaster(request);
|
||||
}
|
||||
// update the version on the request, so it will be used for the replicas
|
||||
request.version(version);
|
||||
|
||||
IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version);
|
||||
return new PrimaryResponse<IndexResponse>(response, doc);
|
||||
return new PrimaryResponse<IndexResponse>(response, op);
|
||||
}
|
||||
|
||||
@Override protected void postPrimaryOperation(IndexRequest request, PrimaryResponse<IndexResponse> response) {
|
||||
Engine.IndexingOperation op = (Engine.IndexingOperation) response.payload();
|
||||
if (!Strings.hasLength(request.percolate())) {
|
||||
op.docMapper().processDocumentAfterIndex(op.doc());
|
||||
return;
|
||||
}
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.index());
|
||||
try {
|
||||
PercolatorExecutor.Response percolate = indexService.percolateService().percolate(new PercolatorExecutor.DocAndSourceQueryRequest((ParsedDocument) response.payload(), request.percolate()));
|
||||
PercolatorExecutor.Response percolate = indexService.percolateService().percolate(new PercolatorExecutor.DocAndSourceQueryRequest(op.parsedDoc(), request.percolate()));
|
||||
response.response().matches(percolate.matches());
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to percolate [{}]", e, request);
|
||||
} finally {
|
||||
op.docMapper().processDocumentAfterIndex(op.doc());
|
||||
}
|
||||
}
|
||||
|
||||
@ -231,11 +236,13 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||
.version(request.version())
|
||||
.origin(Engine.Operation.Origin.REPLICA);
|
||||
indexShard.index(index);
|
||||
index.docMapper().processDocumentAfterIndex(index.doc());
|
||||
} else {
|
||||
Engine.Create create = indexShard.prepareCreate(sourceToParse)
|
||||
.version(request.version())
|
||||
.origin(Engine.Operation.Origin.REPLICA);
|
||||
indexShard.create(create);
|
||||
create.docMapper().processDocumentAfterIndex(create.doc());
|
||||
}
|
||||
if (request.refresh()) {
|
||||
try {
|
||||
|
@ -19,8 +19,8 @@
|
||||
|
||||
package org.elasticsearch.common.lucene.uid;
|
||||
|
||||
import org.apache.lucene.analysis.TokenFilter;
|
||||
import org.apache.lucene.analysis.TokenStream;
|
||||
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
|
||||
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
|
||||
import org.apache.lucene.document.AbstractField;
|
||||
import org.apache.lucene.document.Field;
|
||||
@ -29,7 +29,6 @@ import org.apache.lucene.index.Payload;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.index.TermPositions;
|
||||
import org.elasticsearch.common.Numbers;
|
||||
import org.elasticsearch.common.io.FastStringReader;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -116,21 +115,28 @@ public class UidField extends AbstractField {
|
||||
}
|
||||
}
|
||||
|
||||
private final String uid;
|
||||
private String uid;
|
||||
|
||||
private long version;
|
||||
|
||||
private final UidPayloadTokenStream tokenStream;
|
||||
|
||||
public UidField(String name, String uid, long version) {
|
||||
super(name, Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.NO);
|
||||
this.uid = uid;
|
||||
this.version = version;
|
||||
this.omitTermFreqAndPositions = false;
|
||||
this.tokenStream = new UidPayloadTokenStream(this);
|
||||
}
|
||||
|
||||
@Override public void setOmitTermFreqAndPositions(boolean omitTermFreqAndPositions) {
|
||||
// never allow to set this, since we want payload!
|
||||
}
|
||||
|
||||
public void setUid(String uid) {
|
||||
this.uid = uid;
|
||||
}
|
||||
|
||||
@Override public String stringValue() {
|
||||
return uid;
|
||||
}
|
||||
@ -148,30 +154,34 @@ public class UidField extends AbstractField {
|
||||
}
|
||||
|
||||
@Override public TokenStream tokenStreamValue() {
|
||||
try {
|
||||
return new UidPayloadTokenStream(Lucene.KEYWORD_ANALYZER.reusableTokenStream("_uid", new FastStringReader(uid)), this);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("failed to create token stream", e);
|
||||
}
|
||||
return tokenStream;
|
||||
}
|
||||
|
||||
public static final class UidPayloadTokenStream extends TokenFilter {
|
||||
public static final class UidPayloadTokenStream extends TokenStream {
|
||||
|
||||
private final PayloadAttribute payloadAttribute;
|
||||
private final PayloadAttribute payloadAttribute = addAttribute(PayloadAttribute.class);
|
||||
private final CharTermAttribute termAtt = addAttribute(CharTermAttribute.class);
|
||||
|
||||
private final UidField field;
|
||||
|
||||
public UidPayloadTokenStream(TokenStream input, UidField field) {
|
||||
super(input);
|
||||
private boolean added = false;
|
||||
|
||||
public UidPayloadTokenStream(UidField field) {
|
||||
this.field = field;
|
||||
payloadAttribute = addAttribute(PayloadAttribute.class);
|
||||
}
|
||||
|
||||
@Override public void reset() throws IOException {
|
||||
added = false;
|
||||
}
|
||||
|
||||
@Override public final boolean incrementToken() throws IOException {
|
||||
if (!input.incrementToken()) {
|
||||
if (added) {
|
||||
return false;
|
||||
}
|
||||
termAtt.setLength(0);
|
||||
termAtt.append(field.uid);
|
||||
payloadAttribute.setPayload(new Payload(Numbers.longToBytes(field.version())));
|
||||
added = true;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadSafe;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.shard.IndexShardComponent;
|
||||
@ -291,18 +292,33 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||
Origin origin();
|
||||
}
|
||||
|
||||
static class Create implements Operation {
|
||||
static interface IndexingOperation extends Operation {
|
||||
|
||||
ParsedDocument parsedDoc();
|
||||
|
||||
Document doc();
|
||||
|
||||
DocumentMapper docMapper();
|
||||
}
|
||||
|
||||
static class Create implements IndexingOperation {
|
||||
private final DocumentMapper docMapper;
|
||||
private final Term uid;
|
||||
private final ParsedDocument doc;
|
||||
private long version;
|
||||
private VersionType versionType = VersionType.INTERNAL;
|
||||
private Origin origin = Origin.PRIMARY;
|
||||
|
||||
public Create(Term uid, ParsedDocument doc) {
|
||||
public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
|
||||
this.docMapper = docMapper;
|
||||
this.uid = uid;
|
||||
this.doc = doc;
|
||||
}
|
||||
|
||||
public DocumentMapper docMapper() {
|
||||
return this.docMapper;
|
||||
}
|
||||
|
||||
@Override public Type opType() {
|
||||
return Type.CREATE;
|
||||
}
|
||||
@ -375,18 +391,24 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||
}
|
||||
}
|
||||
|
||||
static class Index implements Operation {
|
||||
static class Index implements IndexingOperation {
|
||||
private final DocumentMapper docMapper;
|
||||
private final Term uid;
|
||||
private final ParsedDocument doc;
|
||||
private long version;
|
||||
private VersionType versionType = VersionType.INTERNAL;
|
||||
private Origin origin = Origin.PRIMARY;
|
||||
|
||||
public Index(Term uid, ParsedDocument doc) {
|
||||
public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
|
||||
this.docMapper = docMapper;
|
||||
this.uid = uid;
|
||||
this.doc = doc;
|
||||
}
|
||||
|
||||
public DocumentMapper docMapper() {
|
||||
return this.docMapper;
|
||||
}
|
||||
|
||||
@Override public Type opType() {
|
||||
return Type.INDEX;
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.index.mapper;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Fieldable;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
@ -33,6 +34,8 @@ import org.elasticsearch.common.util.concurrent.ThreadSafe;
|
||||
@ThreadSafe
|
||||
public interface DocumentMapper {
|
||||
|
||||
void close();
|
||||
|
||||
String type();
|
||||
|
||||
/**
|
||||
@ -131,6 +134,8 @@ public interface DocumentMapper {
|
||||
*/
|
||||
void addFieldMapperListener(FieldMapperListener fieldMapperListener, boolean includeExisting);
|
||||
|
||||
void processDocumentAfterIndex(Document doc);
|
||||
|
||||
/**
|
||||
* A result of a merge.
|
||||
*/
|
||||
|
@ -168,4 +168,6 @@ public interface FieldMapper<T> {
|
||||
Filter rangeFilter(String lowerTerm, String upperTerm, boolean includeLower, boolean includeUpper);
|
||||
|
||||
FieldDataType fieldDataType();
|
||||
|
||||
void processFieldAfterIndex(Fieldable field);
|
||||
}
|
||||
|
@ -123,6 +123,12 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
|
||||
logger.debug("using dynamic[{}], default mapping: location[{}] and source[{}]", dynamic, defaultMappingLocation, defaultMappingSource);
|
||||
}
|
||||
|
||||
public void close() {
|
||||
for (DocumentMapper documentMapper : mappers.values()) {
|
||||
documentMapper.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public UnmodifiableIterator<DocumentMapper> iterator() {
|
||||
return mappers.values().iterator();
|
||||
}
|
||||
@ -169,6 +175,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
|
||||
if (docMapper == null) {
|
||||
return;
|
||||
}
|
||||
docMapper.close();
|
||||
mappers = newMapBuilder(mappers).remove(type).immutableMap();
|
||||
|
||||
// we need to remove those mappers
|
||||
|
@ -417,4 +417,12 @@ public abstract class AbstractFieldMapper<T> implements FieldMapper<T>, XContent
|
||||
}
|
||||
|
||||
protected abstract String contentType();
|
||||
|
||||
@Override public void close() {
|
||||
// nothing to do here, sub classes to override if needed
|
||||
}
|
||||
|
||||
public void processFieldAfterIndex(Fieldable field) {
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -121,4 +121,8 @@ public class AnalyzerMapper implements XContentMapper {
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
import org.elasticsearch.index.mapper.MergeMappingException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
@ -61,6 +62,12 @@ public class IdFieldMapper extends AbstractFieldMapper<String> implements org.el
|
||||
}
|
||||
}
|
||||
|
||||
private final ThreadLocal<ArrayDeque<Field>> fieldCache = new ThreadLocal<ArrayDeque<Field>>() {
|
||||
@Override protected ArrayDeque<Field> initialValue() {
|
||||
return new ArrayDeque<Field>();
|
||||
}
|
||||
};
|
||||
|
||||
protected IdFieldMapper() {
|
||||
this(Defaults.NAME, Defaults.INDEX_NAME);
|
||||
}
|
||||
@ -105,21 +112,43 @@ public class IdFieldMapper extends AbstractFieldMapper<String> implements org.el
|
||||
}
|
||||
context.id(id);
|
||||
context.parsedId(ParseContext.ParsedIdState.PARSED);
|
||||
return new Field(names.indexName(), context.id(), store, index);
|
||||
ArrayDeque<Field> cache = fieldCache.get();
|
||||
Field field = cache.poll();
|
||||
if (field == null) {
|
||||
field = new Field(names.indexName(), "", store, index);
|
||||
}
|
||||
field.setValue(context.id());
|
||||
return field;
|
||||
} else if (context.parsedIdState() == ParseContext.ParsedIdState.EXTERNAL) {
|
||||
if (context.id() == null) {
|
||||
throw new MapperParsingException("No id mapping with [" + names.name() + "] found in the content, and not explicitly set");
|
||||
}
|
||||
return new Field(names.indexName(), context.id(), store, index);
|
||||
ArrayDeque<Field> cache = fieldCache.get();
|
||||
Field field = cache.poll();
|
||||
if (field == null) {
|
||||
field = new Field(names.indexName(), "", store, index);
|
||||
}
|
||||
field.setValue(context.id());
|
||||
return field;
|
||||
} else {
|
||||
throw new MapperParsingException("Illegal parsed id state");
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void processFieldAfterIndex(Fieldable field) {
|
||||
Field field1 = (Field) field;
|
||||
field1.setValue("");
|
||||
fieldCache.get().add(field1);
|
||||
}
|
||||
|
||||
@Override protected String contentType() {
|
||||
return CONTENT_TYPE;
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
fieldCache.remove();
|
||||
}
|
||||
|
||||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
// if all are defaults, no sense to write it at all
|
||||
if (store == Defaults.STORE) {
|
||||
|
@ -254,6 +254,15 @@ public class MultiFieldMapper implements XContentMapper, IncludeInAllMapper {
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
if (defaultMapper != null) {
|
||||
defaultMapper.close();
|
||||
}
|
||||
for (XContentMapper mapper : mappers.values()) {
|
||||
mapper.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void traverse(FieldMapperListener fieldMapperListener) {
|
||||
if (defaultMapper != null) {
|
||||
defaultMapper.traverse(fieldMapperListener);
|
||||
|
@ -26,7 +26,6 @@ import org.apache.lucene.document.Fieldable;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.util.NumericUtils;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.index.analysis.NamedAnalyzer;
|
||||
import org.elasticsearch.index.cache.field.data.FieldDataCache;
|
||||
import org.elasticsearch.index.field.data.FieldDataType;
|
||||
@ -83,9 +82,9 @@ public abstract class NumberFieldMapper<T extends Number> extends AbstractFieldM
|
||||
|
||||
protected Boolean includeInAll;
|
||||
|
||||
private ThreadLocal<ThreadLocals.CleanableValue<NumericTokenStream>> tokenStream = new ThreadLocal<ThreadLocals.CleanableValue<NumericTokenStream>>() {
|
||||
@Override protected ThreadLocals.CleanableValue<NumericTokenStream> initialValue() {
|
||||
return new ThreadLocals.CleanableValue<NumericTokenStream>(new NumericTokenStream(precisionStep));
|
||||
private ThreadLocal<NumericTokenStream> tokenStream = new ThreadLocal<NumericTokenStream>() {
|
||||
@Override protected NumericTokenStream initialValue() {
|
||||
return new NumericTokenStream(precisionStep);
|
||||
}
|
||||
};
|
||||
|
||||
@ -168,10 +167,14 @@ public abstract class NumberFieldMapper<T extends Number> extends AbstractFieldM
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
tokenStream.remove();
|
||||
}
|
||||
|
||||
@Override public abstract FieldDataType fieldDataType();
|
||||
|
||||
protected NumericTokenStream popCachedStream() {
|
||||
return tokenStream.get().get();
|
||||
return tokenStream.get();
|
||||
}
|
||||
|
||||
// used to we can use a numeric field in a document that is then parsed twice!
|
||||
|
@ -578,6 +578,12 @@ public class ObjectMapper implements XContentMapper, IncludeInAllMapper {
|
||||
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
for (XContentMapper mapper : mappers.values()) {
|
||||
mapper.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
toXContent(builder, params, null, XContentMapper.EMPTY_ARRAY);
|
||||
return builder;
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.mapper.xcontent;
|
||||
|
||||
import org.apache.lucene.document.*;
|
||||
import org.elasticsearch.ElasticSearchParseException;
|
||||
import org.elasticsearch.common.Bytes;
|
||||
import org.elasticsearch.common.compress.lzf.LZF;
|
||||
import org.elasticsearch.common.compress.lzf.LZFDecoder;
|
||||
import org.elasticsearch.common.compress.lzf.LZFEncoder;
|
||||
@ -30,6 +31,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.mapper.MergeMappingException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
@ -80,6 +82,12 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements or
|
||||
}
|
||||
}
|
||||
|
||||
private ThreadLocal<ArrayDeque<Field>> fieldCache = new ThreadLocal<ArrayDeque<Field>>() {
|
||||
@Override protected ArrayDeque<Field> initialValue() {
|
||||
return new ArrayDeque<Field>();
|
||||
}
|
||||
};
|
||||
|
||||
private final boolean enabled;
|
||||
|
||||
private Boolean compress;
|
||||
@ -126,7 +134,19 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements or
|
||||
context.source(data);
|
||||
}
|
||||
}
|
||||
return new Field(names.indexName(), data);
|
||||
ArrayDeque<Field> cache = fieldCache.get();
|
||||
Field field = cache.poll();
|
||||
if (field == null) {
|
||||
field = new Field(names().indexName(), Bytes.EMPTY_ARRAY);
|
||||
}
|
||||
field.setValue(data);
|
||||
return field;
|
||||
}
|
||||
|
||||
@Override public void processFieldAfterIndex(Fieldable field) {
|
||||
Field field1 = (Field) field;
|
||||
field1.setValue(Bytes.EMPTY_ARRAY);
|
||||
fieldCache.get().add(field1);
|
||||
}
|
||||
|
||||
@Override public byte[] value(Document document) {
|
||||
@ -181,6 +201,10 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements or
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
fieldCache.remove();
|
||||
}
|
||||
|
||||
@Override protected String contentType() {
|
||||
return CONTENT_TYPE;
|
||||
}
|
||||
|
@ -89,6 +89,15 @@ public class StringFieldMapper extends AbstractFieldMapper<String> implements In
|
||||
}
|
||||
}
|
||||
|
||||
static class FieldWrapper {
|
||||
public Field field;
|
||||
}
|
||||
|
||||
private ThreadLocal<FieldWrapper> fieldCache = new ThreadLocal<FieldWrapper>() {
|
||||
@Override protected FieldWrapper initialValue() {
|
||||
return new FieldWrapper();
|
||||
}
|
||||
};
|
||||
|
||||
private String nullValue;
|
||||
|
||||
@ -147,7 +156,28 @@ public class StringFieldMapper extends AbstractFieldMapper<String> implements In
|
||||
context.ignoredValue(names.indexName(), value);
|
||||
return null;
|
||||
}
|
||||
return new Field(names.indexName(), value, store, index, termVector);
|
||||
FieldWrapper fieldWrapper = fieldCache.get();
|
||||
Field field = fieldWrapper.field;
|
||||
if (field == null) {
|
||||
field = new Field(names.indexName(), false, value, store, index, termVector);
|
||||
} else {
|
||||
field.setValue(value);
|
||||
fieldWrapper.field = null;
|
||||
}
|
||||
return field;
|
||||
}
|
||||
|
||||
@Override public void processFieldAfterIndex(Fieldable field) {
|
||||
FieldWrapper fieldWrapper = fieldCache.get();
|
||||
if (fieldWrapper.field == null) {
|
||||
Field field1 = (Field) field;
|
||||
field1.setValue("");
|
||||
fieldWrapper.field = field1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
fieldCache.remove();
|
||||
}
|
||||
|
||||
@Override protected String contentType() {
|
||||
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.mapper.MergeMappingException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
@ -61,6 +62,12 @@ public class TypeFieldMapper extends AbstractFieldMapper<String> implements org.
|
||||
}
|
||||
}
|
||||
|
||||
private final ThreadLocal<ArrayDeque<Field>> fieldCache = new ThreadLocal<ArrayDeque<Field>>() {
|
||||
@Override protected ArrayDeque<Field> initialValue() {
|
||||
return new ArrayDeque<Field>();
|
||||
}
|
||||
};
|
||||
|
||||
protected TypeFieldMapper() {
|
||||
this(Defaults.NAME, Defaults.INDEX_NAME);
|
||||
}
|
||||
@ -102,7 +109,21 @@ public class TypeFieldMapper extends AbstractFieldMapper<String> implements org.
|
||||
}
|
||||
|
||||
@Override protected Field parseCreateField(ParseContext context) throws IOException {
|
||||
return new Field(names.indexName(), context.type(), store, index);
|
||||
ArrayDeque<Field> cache = fieldCache.get();
|
||||
Field field = cache.poll();
|
||||
if (field == null) {
|
||||
field = new Field(names.indexName(), "", store, index);
|
||||
}
|
||||
field.setValue(context.type());
|
||||
return field;
|
||||
}
|
||||
|
||||
@Override public void processFieldAfterIndex(Fieldable field) {
|
||||
fieldCache.get().add((Field) field);
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
fieldCache.remove();
|
||||
}
|
||||
|
||||
@Override protected String contentType() {
|
||||
|
@ -59,6 +59,12 @@ public class UidFieldMapper extends AbstractFieldMapper<Uid> implements org.elas
|
||||
}
|
||||
}
|
||||
|
||||
private ThreadLocal<UidField> fieldCache = new ThreadLocal<UidField>() {
|
||||
@Override protected UidField initialValue() {
|
||||
return new UidField(names().indexName(), "", 0);
|
||||
}
|
||||
};
|
||||
|
||||
protected UidFieldMapper() {
|
||||
this(Defaults.NAME);
|
||||
}
|
||||
@ -77,7 +83,9 @@ public class UidFieldMapper extends AbstractFieldMapper<Uid> implements org.elas
|
||||
throw new MapperParsingException("No id found while parsing the content source");
|
||||
}
|
||||
context.uid(Uid.createUid(context.stringBuilder(), context.type(), context.id()));
|
||||
return new UidField(names().indexName(), context.uid(), 0); // version get updated by the engine
|
||||
UidField field = fieldCache.get();
|
||||
field.setUid(context.uid());
|
||||
return field; // version get updated by the engine
|
||||
}
|
||||
|
||||
@Override public Uid value(Fieldable field) {
|
||||
@ -104,6 +112,10 @@ public class UidFieldMapper extends AbstractFieldMapper<Uid> implements org.elas
|
||||
return new Term(names.indexName(), uid);
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
fieldCache.remove();
|
||||
}
|
||||
|
||||
@Override protected String contentType() {
|
||||
return CONTENT_TYPE;
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.mapper.xcontent;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Fieldable;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Preconditions;
|
||||
@ -517,6 +518,16 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void processDocumentAfterIndex(Document doc) {
|
||||
for (Fieldable field : doc.getFields()) {
|
||||
FieldMappers fieldMappers = mappers().indexName(field.name());
|
||||
FieldMapper mapper = fieldMappers.mapper();
|
||||
if (mapper != null) {
|
||||
mapper.processFieldAfterIndex(field);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override public synchronized MergeResult merge(DocumentMapper mergeWith, MergeFlags mergeFlags) {
|
||||
XContentDocumentMapper xContentMergeWith = (XContentDocumentMapper) mergeWith;
|
||||
MergeContext mergeContext = new MergeContext(this, mergeFlags);
|
||||
@ -548,6 +559,17 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
rootObjectMapper.close();
|
||||
idFieldMapper.close();
|
||||
indexFieldMapper.close();
|
||||
typeFieldMapper.close();
|
||||
allFieldMapper.close();
|
||||
analyzerMapper.close();
|
||||
sourceFieldMapper.close();
|
||||
sizeFieldMapper.close();
|
||||
}
|
||||
|
||||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
rootObjectMapper.toXContent(builder, params, new ToXContent() {
|
||||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
|
@ -99,4 +99,6 @@ public interface XContentMapper extends ToXContent {
|
||||
void merge(XContentMapper mergeWith, MergeContext mergeContext) throws MergeMappingException;
|
||||
|
||||
void traverse(FieldMapperListener fieldMapperListener);
|
||||
|
||||
void close();
|
||||
}
|
||||
|
@ -339,6 +339,11 @@ public class GeoPointFieldMapper implements XContentMapper, ArrayValueMapperPars
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
latMapper.close();
|
||||
lonMapper.close();
|
||||
}
|
||||
|
||||
@Override public void merge(XContentMapper mergeWith, MergeContext mergeContext) throws MergeMappingException {
|
||||
// TODO
|
||||
}
|
||||
|
@ -308,6 +308,10 @@ public class PercolatorExecutor extends AbstractIndexComponent {
|
||||
if (!field.isIndexed()) {
|
||||
continue;
|
||||
}
|
||||
// no need to index the UID field
|
||||
if (field.name().equals(UidFieldMapper.NAME)) {
|
||||
continue;
|
||||
}
|
||||
TokenStream tokenStream = field.tokenStreamValue();
|
||||
if (tokenStream != null) {
|
||||
memoryIndex.addField(field.name(), tokenStream, field.getBoost());
|
||||
|
@ -256,7 +256,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||
@Override public Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException {
|
||||
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
|
||||
ParsedDocument doc = docMapper.parse(source);
|
||||
return new Engine.Create(docMapper.uidMapper().term(doc.uid()), doc);
|
||||
return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid()), doc);
|
||||
}
|
||||
|
||||
@Override public ParsedDocument create(Engine.Create create) throws ElasticSearchException {
|
||||
@ -276,7 +276,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||
@Override public Engine.Index prepareIndex(SourceToParse source) throws ElasticSearchException {
|
||||
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
|
||||
ParsedDocument doc = docMapper.parse(source);
|
||||
return new Engine.Index(docMapper.uidMapper().term(doc.uid()), doc);
|
||||
return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid()), doc);
|
||||
}
|
||||
|
||||
@Override public ParsedDocument index(Engine.Index index) throws ElasticSearchException {
|
||||
|
@ -46,6 +46,7 @@ import org.elasticsearch.index.engine.IndexEngine;
|
||||
import org.elasticsearch.index.engine.IndexEngineModule;
|
||||
import org.elasticsearch.index.gateway.IndexGateway;
|
||||
import org.elasticsearch.index.gateway.IndexGatewayModule;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.MapperServiceModule;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.percolator.PercolatorModule;
|
||||
@ -300,6 +301,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
||||
indexInjector.getInstance(IndexServiceManagement.class).close();
|
||||
|
||||
indexInjector.getInstance(IndexGateway.class).close(delete);
|
||||
indexInjector.getInstance(MapperService.class).close();
|
||||
|
||||
Injectors.close(injector);
|
||||
|
||||
|
@ -58,12 +58,22 @@ public class UidFieldTests {
|
||||
assertThat(UidField.loadDocIdAndVersion(reader, new Term("_uid", "1")).version, equalTo(1l));
|
||||
|
||||
doc = new Document();
|
||||
doc.add(new UidField("_uid", "1", 2));
|
||||
UidField uid = new UidField("_uid", "1", 2);
|
||||
doc.add(uid);
|
||||
writer.updateDocument(new Term("_uid", "1"), doc);
|
||||
reader = reader.reopen();
|
||||
assertThat(UidField.loadVersion(reader, new Term("_uid", "1")), equalTo(2l));
|
||||
assertThat(UidField.loadDocIdAndVersion(reader, new Term("_uid", "1")).version, equalTo(2l));
|
||||
|
||||
// test reuse of uid field
|
||||
doc = new Document();
|
||||
uid.version(3);
|
||||
doc.add(uid);
|
||||
writer.updateDocument(new Term("_uid", "1"), doc);
|
||||
reader = reader.reopen();
|
||||
assertThat(UidField.loadVersion(reader, new Term("_uid", "1")), equalTo(3l));
|
||||
assertThat(UidField.loadDocIdAndVersion(reader, new Term("_uid", "1")).version, equalTo(3l));
|
||||
|
||||
writer.deleteDocuments(new Term("_uid", "1"));
|
||||
reader = reader.reopen();
|
||||
assertThat(UidField.loadVersion(reader, new Term("_uid", "1")), equalTo(-1l));
|
||||
|
@ -137,7 +137,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
// create a document
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
engine.create(new Engine.Create(newUid("1"), doc));
|
||||
engine.create(new Engine.Create(null, newUid("1"), doc));
|
||||
|
||||
// its not there...
|
||||
searchResult = engine.searcher();
|
||||
@ -156,7 +156,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
// now do an update
|
||||
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
engine.index(new Engine.Index(newUid("1"), doc));
|
||||
engine.index(new Engine.Index(null, newUid("1"), doc));
|
||||
|
||||
// its not updated yet...
|
||||
searchResult = engine.searcher();
|
||||
@ -195,7 +195,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
// add it back
|
||||
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
engine.create(new Engine.Create(newUid("1"), doc));
|
||||
engine.create(new Engine.Create(null, newUid("1"), doc));
|
||||
|
||||
// its not there...
|
||||
searchResult = engine.searcher();
|
||||
@ -220,7 +220,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
// make sure we can still work with the engine
|
||||
// now do an update
|
||||
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
engine.index(new Engine.Index(newUid("1"), doc));
|
||||
engine.index(new Engine.Index(null, newUid("1"), doc));
|
||||
|
||||
// its not updated yet...
|
||||
searchResult = engine.searcher();
|
||||
@ -248,7 +248,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
// create a document
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
engine.create(new Engine.Create(newUid("1"), doc));
|
||||
engine.create(new Engine.Create(null, newUid("1"), doc));
|
||||
|
||||
// its not there...
|
||||
searchResult = engine.searcher();
|
||||
@ -281,7 +281,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
@Test public void testSimpleSnapshot() throws Exception {
|
||||
// create a document
|
||||
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
engine.create(new Engine.Create(newUid("1"), doc1));
|
||||
engine.create(new Engine.Create(null, newUid("1"), doc1));
|
||||
|
||||
final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
|
||||
@ -297,10 +297,10 @@ public abstract class AbstractSimpleEngineTests {
|
||||
@Override public Object call() throws Exception {
|
||||
engine.flush(new Engine.Flush());
|
||||
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
|
||||
engine.create(new Engine.Create(newUid("2"), doc2));
|
||||
engine.create(new Engine.Create(null, newUid("2"), doc2));
|
||||
engine.flush(new Engine.Flush());
|
||||
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
|
||||
engine.create(new Engine.Create(newUid("3"), doc3));
|
||||
engine.create(new Engine.Create(null, newUid("3"), doc3));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
@ -335,7 +335,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testSimpleRecover() throws Exception {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
engine.create(new Engine.Create(newUid("1"), doc));
|
||||
engine.create(new Engine.Create(null, newUid("1"), doc));
|
||||
engine.flush(new Engine.Flush());
|
||||
|
||||
engine.recover(new Engine.RecoveryHandler() {
|
||||
@ -376,10 +376,10 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2() throws Exception {
|
||||
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
engine.create(new Engine.Create(newUid("1"), doc1));
|
||||
engine.create(new Engine.Create(null, newUid("1"), doc1));
|
||||
engine.flush(new Engine.Flush());
|
||||
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
|
||||
engine.create(new Engine.Create(newUid("2"), doc2));
|
||||
engine.create(new Engine.Create(null, newUid("2"), doc2));
|
||||
|
||||
engine.recover(new Engine.RecoveryHandler() {
|
||||
@Override public void phase1(SnapshotIndexCommit snapshot) throws EngineException {
|
||||
@ -403,10 +403,10 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2AndPhase3() throws Exception {
|
||||
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
engine.create(new Engine.Create(newUid("1"), doc1));
|
||||
engine.create(new Engine.Create(null, newUid("1"), doc1));
|
||||
engine.flush(new Engine.Flush());
|
||||
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
|
||||
engine.create(new Engine.Create(newUid("2"), doc2));
|
||||
engine.create(new Engine.Create(null, newUid("2"), doc2));
|
||||
|
||||
engine.recover(new Engine.RecoveryHandler() {
|
||||
@Override public void phase1(SnapshotIndexCommit snapshot) throws EngineException {
|
||||
@ -420,7 +420,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
// add for phase3
|
||||
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
|
||||
engine.create(new Engine.Create(newUid("3"), doc3));
|
||||
engine.create(new Engine.Create(null, newUid("3"), doc3));
|
||||
}
|
||||
|
||||
@Override public void phase3(Translog.Snapshot snapshot) throws EngineException {
|
||||
@ -437,59 +437,59 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testVersioningNewCreate() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Create create = new Engine.Create(newUid("1"), doc);
|
||||
Engine.Create create = new Engine.Create(null, newUid("1"), doc);
|
||||
engine.create(create);
|
||||
assertThat(create.version(), equalTo(1l));
|
||||
|
||||
create = new Engine.Create(newUid("1"), doc).version(create.version()).origin(REPLICA);
|
||||
create = new Engine.Create(null, newUid("1"), doc).version(create.version()).origin(REPLICA);
|
||||
replicaEngine.create(create);
|
||||
assertThat(create.version(), equalTo(1l));
|
||||
}
|
||||
|
||||
@Test public void testExternalVersioningNewCreate() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Create create = new Engine.Create(newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
|
||||
Engine.Create create = new Engine.Create(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
|
||||
engine.create(create);
|
||||
assertThat(create.version(), equalTo(12l));
|
||||
|
||||
create = new Engine.Create(newUid("1"), doc).version(create.version()).origin(REPLICA);
|
||||
create = new Engine.Create(null, newUid("1"), doc).version(create.version()).origin(REPLICA);
|
||||
replicaEngine.create(create);
|
||||
assertThat(create.version(), equalTo(12l));
|
||||
}
|
||||
|
||||
@Test public void testVersioningNewIndex() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc);
|
||||
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc).version(index.version()).origin(REPLICA);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(index.version()).origin(REPLICA);
|
||||
replicaEngine.index(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
}
|
||||
|
||||
@Test public void testExternalVersioningNewIndex() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
|
||||
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(12l));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc).version(index.version()).origin(REPLICA);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(index.version()).origin(REPLICA);
|
||||
replicaEngine.index(index);
|
||||
assertThat(index.version(), equalTo(12l));
|
||||
}
|
||||
|
||||
@Test public void testVersioningIndexConflict() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc);
|
||||
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc);
|
||||
index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(2l));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc).version(1l);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(1l);
|
||||
try {
|
||||
engine.index(index);
|
||||
assert false;
|
||||
@ -498,7 +498,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
}
|
||||
|
||||
// future versions should not work as well
|
||||
index = new Engine.Index(newUid("1"), doc).version(3l);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(3l);
|
||||
try {
|
||||
engine.index(index);
|
||||
assert false;
|
||||
@ -509,15 +509,15 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testExternalVersioningIndexConflict() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
|
||||
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(12l));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc).versionType(VersionType.EXTERNAL).version(14);
|
||||
index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(14);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(14l));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc).versionType(VersionType.EXTERNAL).version(13l);
|
||||
index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(13l);
|
||||
try {
|
||||
engine.index(index);
|
||||
assert false;
|
||||
@ -528,17 +528,17 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testVersioningIndexConflictWithFlush() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc);
|
||||
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc);
|
||||
index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(2l));
|
||||
|
||||
engine.flush(new Engine.Flush());
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc).version(1l);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(1l);
|
||||
try {
|
||||
engine.index(index);
|
||||
assert false;
|
||||
@ -547,7 +547,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
}
|
||||
|
||||
// future versions should not work as well
|
||||
index = new Engine.Index(newUid("1"), doc).version(3l);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(3l);
|
||||
try {
|
||||
engine.index(index);
|
||||
assert false;
|
||||
@ -558,17 +558,17 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testExternalVersioningIndexConflictWithFlush() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
|
||||
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(12l));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc).versionType(VersionType.EXTERNAL).version(14);
|
||||
index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(14);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(14l));
|
||||
|
||||
engine.flush(new Engine.Flush());
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc).versionType(VersionType.EXTERNAL).version(13);
|
||||
index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(13);
|
||||
try {
|
||||
engine.index(index);
|
||||
assert false;
|
||||
@ -579,11 +579,11 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testVersioningDeleteConflict() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc);
|
||||
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc);
|
||||
index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(2l));
|
||||
|
||||
@ -610,7 +610,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
assertThat(delete.version(), equalTo(3l));
|
||||
|
||||
// now check if we can index to a delete doc with version
|
||||
index = new Engine.Index(newUid("1"), doc).version(2l);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(2l);
|
||||
try {
|
||||
engine.index(index);
|
||||
assert false;
|
||||
@ -619,7 +619,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
}
|
||||
|
||||
// we shouldn't be able to create as well
|
||||
Engine.Create create = new Engine.Create(newUid("1"), doc).version(2l);
|
||||
Engine.Create create = new Engine.Create(null, newUid("1"), doc).version(2l);
|
||||
try {
|
||||
engine.create(create);
|
||||
} catch (VersionConflictEngineException e) {
|
||||
@ -629,11 +629,11 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testVersioningDeleteConflictWithFlush() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc);
|
||||
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc);
|
||||
index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(2l));
|
||||
|
||||
@ -666,7 +666,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
engine.flush(new Engine.Flush());
|
||||
|
||||
// now check if we can index to a delete doc with version
|
||||
index = new Engine.Index(newUid("1"), doc).version(2l);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(2l);
|
||||
try {
|
||||
engine.index(index);
|
||||
assert false;
|
||||
@ -675,7 +675,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
}
|
||||
|
||||
// we shouldn't be able to create as well
|
||||
Engine.Create create = new Engine.Create(newUid("1"), doc).version(2l);
|
||||
Engine.Create create = new Engine.Create(null, newUid("1"), doc).version(2l);
|
||||
try {
|
||||
engine.create(create);
|
||||
} catch (VersionConflictEngineException e) {
|
||||
@ -685,11 +685,11 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testVersioningCreateExistsException() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Create create = new Engine.Create(newUid("1"), doc);
|
||||
Engine.Create create = new Engine.Create(null, newUid("1"), doc);
|
||||
engine.create(create);
|
||||
assertThat(create.version(), equalTo(1l));
|
||||
|
||||
create = new Engine.Create(newUid("1"), doc);
|
||||
create = new Engine.Create(null, newUid("1"), doc);
|
||||
try {
|
||||
engine.create(create);
|
||||
assert false;
|
||||
@ -700,13 +700,13 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testVersioningCreateExistsExceptionWithFlush() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Create create = new Engine.Create(newUid("1"), doc);
|
||||
Engine.Create create = new Engine.Create(null, newUid("1"), doc);
|
||||
engine.create(create);
|
||||
assertThat(create.version(), equalTo(1l));
|
||||
|
||||
engine.flush(new Engine.Flush());
|
||||
|
||||
create = new Engine.Create(newUid("1"), doc);
|
||||
create = new Engine.Create(null, newUid("1"), doc);
|
||||
try {
|
||||
engine.create(create);
|
||||
assert false;
|
||||
@ -717,21 +717,21 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testVersioningReplicaConflict1() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc);
|
||||
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc);
|
||||
index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(2l));
|
||||
|
||||
// apply the second index to the replica, should work fine
|
||||
index = new Engine.Index(newUid("1"), doc).version(2l).origin(REPLICA);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(2l).origin(REPLICA);
|
||||
replicaEngine.index(index);
|
||||
assertThat(index.version(), equalTo(2l));
|
||||
|
||||
// now, the old one should not work
|
||||
index = new Engine.Index(newUid("1"), doc).version(1l).origin(REPLICA);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(1l).origin(REPLICA);
|
||||
try {
|
||||
replicaEngine.index(index);
|
||||
assert false;
|
||||
@ -741,7 +741,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
// second version on replica should fail as well
|
||||
try {
|
||||
index = new Engine.Index(newUid("1"), doc).version(2l).origin(REPLICA);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(2l).origin(REPLICA);
|
||||
replicaEngine.index(index);
|
||||
assertThat(index.version(), equalTo(2l));
|
||||
} catch (VersionConflictEngineException e) {
|
||||
@ -751,17 +751,17 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
@Test public void testVersioningReplicaConflict2() {
|
||||
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc);
|
||||
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
|
||||
// apply the first index to the replica, should work fine
|
||||
index = new Engine.Index(newUid("1"), doc).version(1l).origin(REPLICA);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(1l).origin(REPLICA);
|
||||
replicaEngine.index(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
|
||||
// index it again
|
||||
index = new Engine.Index(newUid("1"), doc);
|
||||
index = new Engine.Index(null, newUid("1"), doc);
|
||||
engine.index(index);
|
||||
assertThat(index.version(), equalTo(2l));
|
||||
|
||||
@ -786,7 +786,7 @@ public abstract class AbstractSimpleEngineTests {
|
||||
|
||||
// now do the second index on the replica, it should fail
|
||||
try {
|
||||
index = new Engine.Index(newUid("1"), doc).version(2l).origin(REPLICA);
|
||||
index = new Engine.Index(null, newUid("1"), doc).version(2l).origin(REPLICA);
|
||||
replicaEngine.index(index);
|
||||
assertThat(index.version(), equalTo(2l));
|
||||
} catch (VersionConflictEngineException e) {
|
||||
|
@ -284,6 +284,14 @@ public class AttachmentMapper implements XContentMapper {
|
||||
keywordsMapper.traverse(fieldMapperListener);
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
contentMapper.close();
|
||||
dateMapper.close();
|
||||
titleMapper.close();
|
||||
authorMapper.close();
|
||||
keywordsMapper.close();
|
||||
}
|
||||
|
||||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(name);
|
||||
builder.field("type", CONTENT_TYPE);
|
||||
|
Loading…
x
Reference in New Issue
Block a user