add _routing to documetn indexed when providing routing value

This commit is contained in:
kimchy 2010-11-14 21:23:52 +02:00
parent 9505cb14f1
commit 8ee038574d
16 changed files with 382 additions and 65 deletions

View File

@ -40,6 +40,7 @@ import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -104,10 +105,11 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (item.request() instanceof IndexRequest) { if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request(); IndexRequest indexRequest = (IndexRequest) item.request();
try { try {
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()).routing(indexRequest.routing());
if (indexRequest.opType() == IndexRequest.OpType.INDEX) { if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
ops[i] = indexShard.prepareIndex(indexRequest.type(), indexRequest.id(), indexRequest.source()); ops[i] = indexShard.prepareIndex(sourceToParse);
} else { } else {
ops[i] = indexShard.prepareCreate(indexRequest.type(), indexRequest.id(), indexRequest.source()); ops[i] = indexShard.prepareCreate(sourceToParse);
} }
} catch (Exception e) { } catch (Exception e) {
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(), responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
@ -185,10 +187,11 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (item.request() instanceof IndexRequest) { if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request(); IndexRequest indexRequest = (IndexRequest) item.request();
try { try {
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()).routing(indexRequest.routing());
if (indexRequest.opType() == IndexRequest.OpType.INDEX) { if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
ops[i] = indexShard.prepareIndex(indexRequest.type(), indexRequest.id(), indexRequest.source()); ops[i] = indexShard.prepareIndex(sourceToParse);
} else { } else {
ops[i] = indexShard.prepareCreate(indexRequest.type(), indexRequest.id(), indexRequest.source()); ops[i] = indexShard.prepareCreate(sourceToParse);
} }
} catch (Exception e) { } catch (Exception e) {
// ignore, we are on backup // ignore, we are on backup

View File

@ -39,6 +39,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -135,13 +136,14 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
@Override protected IndexResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) { @Override protected IndexResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest); IndexShard indexShard = indexShard(shardRequest);
final IndexRequest request = shardRequest.request; final IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()).routing(request.routing());
ParsedDocument doc; ParsedDocument doc;
if (request.opType() == IndexRequest.OpType.INDEX) { if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(request.type(), request.id(), request.source()); Engine.Index index = indexShard.prepareIndex(sourceToParse);
index.refresh(request.refresh()); index.refresh(request.refresh());
doc = indexShard.index(index); doc = indexShard.index(index);
} else { } else {
Engine.Create create = indexShard.prepareCreate(request.type(), request.id(), request.source()); Engine.Create create = indexShard.prepareCreate(sourceToParse);
create.refresh(request.refresh()); create.refresh(request.refresh());
doc = indexShard.create(create); doc = indexShard.create(create);
} }
@ -154,12 +156,13 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) { @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest); IndexShard indexShard = indexShard(shardRequest);
IndexRequest request = shardRequest.request; IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()).routing(request.routing());
if (request.opType() == IndexRequest.OpType.INDEX) { if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(request.type(), request.id(), request.source()); Engine.Index index = indexShard.prepareIndex(sourceToParse);
index.refresh(request.refresh()); index.refresh(request.refresh());
indexShard.index(index); indexShard.index(index);
} else { } else {
Engine.Create create = indexShard.prepareCreate(request.type(), request.id(), request.source()); Engine.Create create = indexShard.prepareCreate(sourceToParse);
create.refresh(request.refresh()); create.refresh(request.refresh());
indexShard.create(create); indexShard.create(create);
} }

View File

@ -198,7 +198,7 @@ public class TransportMoreLikeThisAction extends BaseAction<MoreLikeThisRequest,
if (getResponse.source() == null) { if (getResponse.source() == null) {
return; return;
} }
docMapper.parse(request.type(), request.id(), getResponse.source(), new DocumentMapper.ParseListenerAdapter() { docMapper.parse(SourceToParse.source(getResponse.source()).type(request.type()).id(request.id()), new DocumentMapper.ParseListenerAdapter() {
@Override public boolean beforeFieldAdded(FieldMapper fieldMapper, Fieldable field, Object parseContext) { @Override public boolean beforeFieldAdded(FieldMapper fieldMapper, Fieldable field, Object parseContext) {
if (fieldMapper instanceof InternalMapper) { if (fieldMapper instanceof InternalMapper) {
return true; return true;

View File

@ -91,7 +91,7 @@ public interface DocumentMapper {
* <p>Validates that the source has the provided id and type. Note, most times * <p>Validates that the source has the provided id and type. Note, most times
* we will already have the id and the type even though they exist in the source as well. * we will already have the id and the type even though they exist in the source as well.
*/ */
ParsedDocument parse(@Nullable String type, @Nullable String id, byte[] source) throws MapperParsingException; ParsedDocument parse(byte[] source) throws MapperParsingException;
/** /**
* Parses the source into a parsed document. * Parses the source into a parsed document.
@ -99,12 +99,23 @@ public interface DocumentMapper {
* <p>Validates that the source has the provided id and type. Note, most times * <p>Validates that the source has the provided id and type. Note, most times
* we will already have the id and the type even though they exist in the source as well. * we will already have the id and the type even though they exist in the source as well.
*/ */
ParsedDocument parse(@Nullable String type, @Nullable String id, byte[] source, @Nullable ParseListener listener) throws MapperParsingException; ParsedDocument parse(String type, String id, byte[] source) throws MapperParsingException;
/** /**
* Parses the source into the parsed document. * Parses the source into a parsed document.
*
* <p>Validates that the source has the provided id and type. Note, most times
* we will already have the id and the type even though they exist in the source as well.
*/ */
ParsedDocument parse(byte[] source) throws MapperParsingException; ParsedDocument parse(SourceToParse source) throws MapperParsingException;
/**
* Parses the source into a parsed document.
*
* <p>Validates that the source has the provided id and type. Note, most times
* we will already have the id and the type even though they exist in the source as well.
*/
ParsedDocument parse(SourceToParse source, @Nullable ParseListener listener) throws MapperParsingException;
/** /**
* Merges this document mapper with the provided document mapper. If there are conflicts, the * Merges this document mapper with the provided document mapper. If there are conflicts, the

View File

@ -23,6 +23,8 @@ import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
/** /**
* The result of parsing a document.
*
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class ParsedDocument { public class ParsedDocument {

View File

@ -0,0 +1,30 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper;
import org.apache.lucene.document.Document;
/**
* @author kimchy (shay.banon)
*/
public interface RoutingFieldMapper extends FieldMapper<String>, InternalMapper {
String value(Document document);
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper;
/**
* @author kimchy (shay.banon)
*/
public class SourceToParse {
public static SourceToParse source(byte[] source) {
return new SourceToParse(source);
}
private final byte[] source;
private String type;
private String id;
private String routing;
public SourceToParse(byte[] source) {
this.source = source;
}
public byte[] source() {
return this.source;
}
public String type() {
return this.type;
}
public SourceToParse type(String type) {
this.type = type;
return this;
}
public String id() {
return this.id;
}
public SourceToParse id(String id) {
this.id = id;
return this;
}
public String routing() {
return this.routing;
}
public SourceToParse routing(String routing) {
this.routing = routing;
return this;
}
}

View File

@ -90,7 +90,7 @@ public class ParseContext {
this.path.reset(); this.path.reset();
this.parsedIdState = ParsedIdState.NO; this.parsedIdState = ParsedIdState.NO;
this.mappersAdded = false; this.mappersAdded = false;
this.listener = listener; this.listener = listener == null ? DocumentMapper.ParseListener.EMPTY : listener;
this.allEntries = new AllEntries(); this.allEntries = new AllEntries();
this.ignoredValues.clear(); this.ignoredValues.clear();
} }

View File

@ -0,0 +1,129 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper.xcontent;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.MergeMappingException;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class RoutingFieldMapper extends AbstractFieldMapper<String> implements org.elasticsearch.index.mapper.RoutingFieldMapper {
public static final String CONTENT_TYPE = "_routing";
public static class Defaults extends AbstractFieldMapper.Defaults {
public static final String NAME = "_routing";
public static final String INDEX_NAME = "_routing";
public static final Field.Index INDEX = Field.Index.NOT_ANALYZED;
public static final Field.Store STORE = Field.Store.YES;
public static final boolean OMIT_NORMS = true;
public static final boolean OMIT_TERM_FREQ_AND_POSITIONS = true;
}
public static class Builder extends AbstractFieldMapper.Builder<Builder, RoutingFieldMapper> {
public Builder() {
super(Defaults.NAME);
indexName = Defaults.INDEX_NAME;
store = Defaults.STORE;
index = Defaults.INDEX;
omitNorms = Defaults.OMIT_NORMS;
omitTermFreqAndPositions = Defaults.OMIT_TERM_FREQ_AND_POSITIONS;
}
@Override public RoutingFieldMapper build(BuilderContext context) {
return new RoutingFieldMapper(name, indexName, store, termVector, boost, omitNorms, omitTermFreqAndPositions);
}
}
protected RoutingFieldMapper() {
this(Defaults.NAME, Defaults.INDEX_NAME);
}
protected RoutingFieldMapper(String name, String indexName) {
this(name, indexName, Defaults.STORE, Defaults.TERM_VECTOR, Defaults.BOOST,
Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS);
}
protected RoutingFieldMapper(String name, String indexName, Field.Store store, Field.TermVector termVector,
float boost, boolean omitNorms, boolean omitTermFreqAndPositions) {
super(new Names(name, indexName, indexName, name), Defaults.INDEX, store, termVector, boost, omitNorms, omitTermFreqAndPositions,
Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER);
}
@Override public String value(Document document) {
Fieldable field = document.getFieldable(names.indexName());
return field == null ? null : value(field);
}
@Override public String value(Fieldable field) {
return field.stringValue();
}
@Override public String valueFromString(String value) {
return value;
}
@Override public String valueAsString(Fieldable field) {
return value(field);
}
@Override public String indexedValue(String value) {
return value;
}
@Override protected Field parseCreateField(ParseContext context) throws IOException {
if (context.externalValueSet()) {
String routing = (String) context.externalValue();
if (routing != null) {
return new Field(names.indexName(), routing, store, index);
}
}
return null;
}
@Override protected String contentType() {
return CONTENT_TYPE;
}
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
// if all are defaults, no sense to write it at all
if (index == Defaults.INDEX) {
return;
}
builder.startObject(CONTENT_TYPE);
if (index != Defaults.INDEX) {
builder.field("index", index.name().toLowerCase());
}
builder.endObject();
}
@Override public void merge(XContentMapper mergeWith, MergeContext mergeContext) throws MergeMappingException {
// do nothing here, no merging, but also no exception
}
}

View File

@ -54,6 +54,8 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
private SourceFieldMapper sourceFieldMapper = new SourceFieldMapper(); private SourceFieldMapper sourceFieldMapper = new SourceFieldMapper();
private RoutingFieldMapper routingFieldMapper = new RoutingFieldMapper();
private BoostFieldMapper boostFieldMapper = new BoostFieldMapper(); private BoostFieldMapper boostFieldMapper = new BoostFieldMapper();
private AllFieldMapper allFieldMapper = new AllFieldMapper(); private AllFieldMapper allFieldMapper = new AllFieldMapper();
@ -107,6 +109,11 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
return this; return this;
} }
public Builder routingField(RoutingFieldMapper.Builder builder) {
this.routingFieldMapper = builder.build(builderContext);
return this;
}
public Builder boostField(BoostFieldMapper.Builder builder) { public Builder boostField(BoostFieldMapper.Builder builder) {
this.boostFieldMapper = builder.build(builderContext); this.boostFieldMapper = builder.build(builderContext);
return this; return this;
@ -143,7 +150,7 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
public XContentDocumentMapper build(XContentDocumentMapperParser docMapperParser) { public XContentDocumentMapper build(XContentDocumentMapperParser docMapperParser) {
Preconditions.checkNotNull(rootObjectMapper, "Mapper builder must have the root object mapper set"); Preconditions.checkNotNull(rootObjectMapper, "Mapper builder must have the root object mapper set");
return new XContentDocumentMapper(index, docMapperParser, rootObjectMapper, attributes, uidFieldMapper, idFieldMapper, typeFieldMapper, indexFieldMapper, return new XContentDocumentMapper(index, docMapperParser, rootObjectMapper, attributes, uidFieldMapper, idFieldMapper, typeFieldMapper, indexFieldMapper,
sourceFieldMapper, allFieldMapper, analyzerMapper, indexAnalyzer, searchAnalyzer, boostFieldMapper); sourceFieldMapper, routingFieldMapper, allFieldMapper, analyzerMapper, indexAnalyzer, searchAnalyzer, boostFieldMapper);
} }
} }
@ -174,6 +181,8 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
private final SourceFieldMapper sourceFieldMapper; private final SourceFieldMapper sourceFieldMapper;
private final RoutingFieldMapper routingFieldMapper;
private final BoostFieldMapper boostFieldMapper; private final BoostFieldMapper boostFieldMapper;
private final AllFieldMapper allFieldMapper; private final AllFieldMapper allFieldMapper;
@ -202,6 +211,7 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
TypeFieldMapper typeFieldMapper, TypeFieldMapper typeFieldMapper,
IndexFieldMapper indexFieldMapper, IndexFieldMapper indexFieldMapper,
SourceFieldMapper sourceFieldMapper, SourceFieldMapper sourceFieldMapper,
RoutingFieldMapper routingFieldMapper,
AllFieldMapper allFieldMapper, AllFieldMapper allFieldMapper,
AnalyzerMapper analyzerMapper, AnalyzerMapper analyzerMapper,
NamedAnalyzer indexAnalyzer, NamedAnalyzer searchAnalyzer, NamedAnalyzer indexAnalyzer, NamedAnalyzer searchAnalyzer,
@ -216,6 +226,7 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
this.typeFieldMapper = typeFieldMapper; this.typeFieldMapper = typeFieldMapper;
this.indexFieldMapper = indexFieldMapper; this.indexFieldMapper = indexFieldMapper;
this.sourceFieldMapper = sourceFieldMapper; this.sourceFieldMapper = sourceFieldMapper;
this.routingFieldMapper = routingFieldMapper;
this.allFieldMapper = allFieldMapper; this.allFieldMapper = allFieldMapper;
this.analyzerMapper = analyzerMapper; this.analyzerMapper = analyzerMapper;
this.boostFieldMapper = boostFieldMapper; this.boostFieldMapper = boostFieldMapper;
@ -234,6 +245,7 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
if (boostFieldMapper != null) { if (boostFieldMapper != null) {
rootObjectMapper.putMapper(boostFieldMapper); rootObjectMapper.putMapper(boostFieldMapper);
} }
rootObjectMapper.putMapper(routingFieldMapper);
final List<FieldMapper> tempFieldMappers = newArrayList(); final List<FieldMapper> tempFieldMappers = newArrayList();
// add the basic ones // add the basic ones
@ -316,26 +328,30 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
return this.fieldMappers; return this.fieldMappers;
} }
@Override public ParsedDocument parse(byte[] source) { @Override public ParsedDocument parse(byte[] source) throws MapperParsingException {
return parse(null, null, source); return parse(SourceToParse.source(source));
} }
@Override public ParsedDocument parse(@Nullable String type, @Nullable String id, byte[] source) throws MapperParsingException { @Override public ParsedDocument parse(String type, String id, byte[] source) throws MapperParsingException {
return parse(type, id, source, ParseListener.EMPTY); return parse(SourceToParse.source(source).type(type).id(id));
} }
@Override public ParsedDocument parse(String type, String id, byte[] source, ParseListener listener) { @Override public ParsedDocument parse(SourceToParse source) throws MapperParsingException {
return parse(source, null);
}
@Override public ParsedDocument parse(SourceToParse source, @Nullable ParseListener listener) throws MapperParsingException {
ParseContext context = cache.get().get(); ParseContext context = cache.get().get();
if (type != null && !type.equals(this.type)) { if (source.type() != null && !source.type().equals(this.type)) {
throw new MapperParsingException("Type mismatch, provide type [" + type + "] but mapper is of type [" + this.type + "]"); throw new MapperParsingException("Type mismatch, provide type [" + source.type() + "] but mapper is of type [" + this.type + "]");
} }
type = this.type; source.type(this.type);
XContentParser parser = null; XContentParser parser = null;
try { try {
parser = XContentFactory.xContent(source).createParser(source); parser = XContentFactory.xContent(source.source()).createParser(source.source());
context.reset(parser, new Document(), type, source, listener); context.reset(parser, new Document(), type, source.source(), listener);
// will result in START_OBJECT // will result in START_OBJECT
XContentParser.Token token = parser.nextToken(); XContentParser.Token token = parser.nextToken();
@ -363,18 +379,22 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
sourceFieldMapper.parse(context); sourceFieldMapper.parse(context);
} }
// set the id if we have it so we can validate it later on, also, add the uid if we can // set the id if we have it so we can validate it later on, also, add the uid if we can
if (id != null) { if (source.id() != null) {
context.id(id); context.id(source.id());
uidFieldMapper.parse(context); uidFieldMapper.parse(context);
} }
typeFieldMapper.parse(context); typeFieldMapper.parse(context);
if (source.routing() != null) {
context.externalValue(source.routing());
routingFieldMapper.parse(context);
}
indexFieldMapper.parse(context); indexFieldMapper.parse(context);
rootObjectMapper.parse(context); rootObjectMapper.parse(context);
// if we did not get the id, we need to parse the uid into the document now, after it was added // if we did not get the id, we need to parse the uid into the document now, after it was added
if (id == null) { if (source.id() == null) {
uidFieldMapper.parse(context); uidFieldMapper.parse(context);
} }
if (context.parsedIdState() != ParseContext.ParsedIdState.PARSED) { if (context.parsedIdState() != ParseContext.ParsedIdState.PARSED) {
@ -391,7 +411,7 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
parser.close(); parser.close();
} }
} }
ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), context.doc(), context.analyzer(), source, context.mappersAdded()); ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), context.doc(), context.analyzer(), source.source(), context.mappersAdded());
// reset the context to free up memory // reset the context to free up memory
context.reset(null, null, null, null, null); context.reset(null, null, null, null, null);
return doc; return doc;

View File

@ -145,6 +145,8 @@ public class XContentDocumentMapperParser extends AbstractIndexComponent impleme
docBuilder.typeField(parseTypeField((Map<String, Object>) fieldNode, parserContext)); docBuilder.typeField(parseTypeField((Map<String, Object>) fieldNode, parserContext));
} else if (UidFieldMapper.CONTENT_TYPE.equals(fieldName) || "uidField".equals(fieldName)) { } else if (UidFieldMapper.CONTENT_TYPE.equals(fieldName) || "uidField".equals(fieldName)) {
docBuilder.uidField(parseUidField((Map<String, Object>) fieldNode, parserContext)); docBuilder.uidField(parseUidField((Map<String, Object>) fieldNode, parserContext));
} else if (RoutingFieldMapper.CONTENT_TYPE.equals(fieldName)) {
docBuilder.routingField(parseRoutingField((Map<String, Object>) fieldNode, parserContext));
} else if (BoostFieldMapper.CONTENT_TYPE.equals(fieldName) || "boostField".equals(fieldName)) { } else if (BoostFieldMapper.CONTENT_TYPE.equals(fieldName) || "boostField".equals(fieldName)) {
docBuilder.boostField(parseBoostField((Map<String, Object>) fieldNode, parserContext)); docBuilder.boostField(parseBoostField((Map<String, Object>) fieldNode, parserContext));
} else if (AllFieldMapper.CONTENT_TYPE.equals(fieldName) || "allField".equals(fieldName)) { } else if (AllFieldMapper.CONTENT_TYPE.equals(fieldName) || "allField".equals(fieldName)) {
@ -212,6 +214,12 @@ public class XContentDocumentMapperParser extends AbstractIndexComponent impleme
return builder; return builder;
} }
private RoutingFieldMapper.Builder parseRoutingField(Map<String, Object> routingNode, XContentMapper.TypeParser.ParserContext parserContext) {
RoutingFieldMapper.Builder builder = routing();
parseField(builder, builder.name, routingNode, parserContext);
return builder;
}
private AnalyzerMapper.Builder parseAnalyzerField(Map<String, Object> analyzerNode, XContentMapper.TypeParser.ParserContext parserContext) { private AnalyzerMapper.Builder parseAnalyzerField(Map<String, Object> analyzerNode, XContentMapper.TypeParser.ParserContext parserContext) {
AnalyzerMapper.Builder builder = analyzer(); AnalyzerMapper.Builder builder = analyzer();
for (Map.Entry<String, Object> entry : analyzerNode.entrySet()) { for (Map.Entry<String, Object> entry : analyzerNode.entrySet()) {

View File

@ -40,6 +40,10 @@ public final class XContentMapperBuilders {
return new IdFieldMapper.Builder(); return new IdFieldMapper.Builder();
} }
public static RoutingFieldMapper.Builder routing() {
return new RoutingFieldMapper.Builder();
}
public static UidFieldMapper.Builder uid() { public static UidFieldMapper.Builder uid() {
return new UidFieldMapper.Builder(); return new UidFieldMapper.Builder();
} }

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
@ -48,24 +49,18 @@ public interface IndexShard extends IndexShardComponent, CloseableComponent {
*/ */
ByteSizeValue estimateFlushableMemorySize() throws ElasticSearchException; ByteSizeValue estimateFlushableMemorySize() throws ElasticSearchException;
Engine.Create prepareCreate(String type, String id, byte[] source) throws ElasticSearchException; Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException;
ParsedDocument create(String type, String id, byte[] source) throws ElasticSearchException;
ParsedDocument create(Engine.Create create) throws ElasticSearchException; ParsedDocument create(Engine.Create create) throws ElasticSearchException;
Engine.Index prepareIndex(String type, String id, byte[] source) throws ElasticSearchException; Engine.Index prepareIndex(SourceToParse source) throws ElasticSearchException;
ParsedDocument index(Engine.Index index) throws ElasticSearchException; ParsedDocument index(Engine.Index index) throws ElasticSearchException;
ParsedDocument index(String type, String id, byte[] source) throws ElasticSearchException;
Engine.Delete prepareDelete(String type, String id) throws ElasticSearchException; Engine.Delete prepareDelete(String type, String id) throws ElasticSearchException;
void delete(Engine.Delete delete) throws ElasticSearchException; void delete(Engine.Delete delete) throws ElasticSearchException;
void delete(String type, String id) throws ElasticSearchException;
void delete(Term uid) throws ElasticSearchException; void delete(Term uid) throws ElasticSearchException;
EngineException[] bulk(Engine.Bulk bulk) throws ElasticSearchException; EngineException[] bulk(Engine.Bulk bulk) throws ElasticSearchException;

View File

@ -40,10 +40,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadSafe; import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.*; import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.DocumentMapperNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.query.IndexQueryParser; import org.elasticsearch.index.query.IndexQueryParser;
import org.elasticsearch.index.query.IndexQueryParserMissingException; import org.elasticsearch.index.query.IndexQueryParserMissingException;
import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.query.IndexQueryParserService;
@ -60,6 +57,8 @@ import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import static org.elasticsearch.index.mapper.SourceToParse.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
@ -208,19 +207,15 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return engine.estimateFlushableMemorySize(); return engine.estimateFlushableMemorySize();
} }
@Override public Engine.Create prepareCreate(String type, String id, byte[] source) throws ElasticSearchException { @Override public Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException {
DocumentMapper docMapper = mapperService.type(type); DocumentMapper docMapper = mapperService.type(source.type());
if (docMapper == null) { if (docMapper == null) {
throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]"); throw new DocumentMapperNotFoundException("No mapper found for type [" + source.type() + "]");
} }
ParsedDocument doc = docMapper.parse(type, id, source); ParsedDocument doc = docMapper.parse(source);
return new Engine.Create(doc); return new Engine.Create(doc);
} }
@Override public ParsedDocument create(String type, String id, byte[] source) throws ElasticSearchException {
return create(prepareCreate(type, id, source));
}
@Override public ParsedDocument create(Engine.Create create) throws ElasticSearchException { @Override public ParsedDocument create(Engine.Create create) throws ElasticSearchException {
writeAllowed(); writeAllowed();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -230,19 +225,15 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return create.parsedDoc(); return create.parsedDoc();
} }
@Override public Engine.Index prepareIndex(String type, String id, byte[] source) throws ElasticSearchException { @Override public Engine.Index prepareIndex(SourceToParse source) throws ElasticSearchException {
DocumentMapper docMapper = mapperService.type(type); DocumentMapper docMapper = mapperService.type(source.type());
if (docMapper == null) { if (docMapper == null) {
throw new DocumentMapperNotFoundException("No mapper found for type [" + type + "]"); throw new DocumentMapperNotFoundException("No mapper found for type [" + source.type() + "]");
} }
ParsedDocument doc = docMapper.parse(type, id, source); ParsedDocument doc = docMapper.parse(source);
return new Engine.Index(docMapper.uidMapper().term(doc.uid()), doc); return new Engine.Index(docMapper.uidMapper().term(doc.uid()), doc);
} }
@Override public ParsedDocument index(String type, String id, byte[] source) throws ElasticSearchException {
return index(prepareIndex(type, id, source));
}
@Override public ParsedDocument index(Engine.Index index) throws ElasticSearchException { @Override public ParsedDocument index(Engine.Index index) throws ElasticSearchException {
writeAllowed(); writeAllowed();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -260,10 +251,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return new Engine.Delete(docMapper.uidMapper().term(type, id)); return new Engine.Delete(docMapper.uidMapper().term(type, id));
} }
@Override public void delete(String type, String id) {
delete(prepareDelete(type, id));
}
@Override public void delete(Term uid) { @Override public void delete(Term uid) {
delete(new Engine.Delete(uid)); delete(new Engine.Delete(uid));
} }
@ -474,11 +461,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
switch (operation.opType()) { switch (operation.opType()) {
case CREATE: case CREATE:
Translog.Create create = (Translog.Create) operation; Translog.Create create = (Translog.Create) operation;
engine.create(prepareCreate(create.type(), create.id(), create.source())); engine.create(prepareCreate(source(create.source()).type(create.type()).id(create.id())));
break; break;
case SAVE: case SAVE:
Translog.Index index = (Translog.Index) operation; Translog.Index index = (Translog.Index) operation;
engine.index(prepareIndex(index.type(), index.id(), index.source())); engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id())));
break; break;
case DELETE: case DELETE:
Translog.Delete delete = (Translog.Delete) operation; Translog.Delete delete = (Translog.Delete) operation;

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.NotThreadSafe; import org.elasticsearch.common.util.concurrent.NotThreadSafe;
import org.elasticsearch.common.util.concurrent.ThreadSafe; import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
@ -232,7 +233,7 @@ public interface Translog extends IndexShardComponent {
} }
@Override public void execute(IndexShard indexShard) throws ElasticSearchException { @Override public void execute(IndexShard indexShard) throws ElasticSearchException {
indexShard.create(type, id, source); indexShard.create(indexShard.prepareCreate(SourceToParse.source(source).type(type).id(id)));
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
@ -291,7 +292,7 @@ public interface Translog extends IndexShardComponent {
} }
@Override public void execute(IndexShard indexShard) throws ElasticSearchException { @Override public void execute(IndexShard indexShard) throws ElasticSearchException {
indexShard.index(type, id, source); indexShard.index(indexShard.prepareIndex(SourceToParse.source(source).type(type).id(id)));
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper.xcontent.routing;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.xcontent.MapperTests;
import org.elasticsearch.index.mapper.xcontent.XContentDocumentMapper;
import org.testng.annotations.Test;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class RoutingTypeMapperTests {
@Test public void simpleRoutingMapperTests() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
.endObject().endObject().string();
XContentDocumentMapper docMapper = MapperTests.newParser().parse(mapping);
ParsedDocument doc = docMapper.parse(SourceToParse.source(XContentFactory.jsonBuilder()
.startObject()
.field("field", "value")
.endObject()
.copiedBytes()).type("type").id("1").routing("routing_value"));
assertThat(doc.doc().get("_routing"), equalTo("routing_value"));
assertThat(doc.doc().get("field"), equalTo("value"));
}
}