Mapper: Store the routing (if provided) under a `_routing` field, closes #515.

This commit is contained in:
kimchy 2010-11-15 11:34:56 +02:00
parent 8ee038574d
commit 6d214d69b9
8 changed files with 87 additions and 62 deletions

View File

@ -164,7 +164,7 @@ public class SimpleEngineBenchmark {
String sId = Integer.toString(id);
Document doc = doc().add(field("_id", sId))
.add(field("content", contentItem)).build();
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
if (create) {
engine.create(new Engine.Create(pDoc));
} else {
@ -278,7 +278,7 @@ public class SimpleEngineBenchmark {
String sId = Integer.toString(id);
Document doc = doc().add(field("_id", sId))
.add(field("content", content(id))).build();
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
if (create) {
engine.create(new Engine.Create(pDoc));
} else {

View File

@ -286,6 +286,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc.id();
}
public String routing() {
return this.doc.routing();
}
public Document doc() {
return this.doc.doc();
}
@ -345,6 +349,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc.type();
}
public String routing() {
return this.doc.routing();
}
public byte[] source() {
return this.doc.source();
}

View File

@ -35,6 +35,8 @@ public class ParsedDocument {
private final String type;
private final String routing;
private final Document document;
private final Analyzer analyzer;
@ -43,10 +45,11 @@ public class ParsedDocument {
private boolean mappersAdded;
public ParsedDocument(String uid, String id, String type, Document document, Analyzer analyzer, byte[] source, boolean mappersAdded) {
public ParsedDocument(String uid, String id, String type, String routing, Document document, Analyzer analyzer, byte[] source, boolean mappersAdded) {
this.uid = uid;
this.id = id;
this.type = type;
this.routing = routing;
this.document = document;
this.source = source;
this.analyzer = analyzer;
@ -65,6 +68,10 @@ public class ParsedDocument {
return this.type;
}
public String routing() {
return this.routing;
}
public Document doc() {
return this.document;
}

View File

@ -37,7 +37,6 @@ public class RoutingFieldMapper extends AbstractFieldMapper<String> implements o
public static class Defaults extends AbstractFieldMapper.Defaults {
public static final String NAME = "_routing";
public static final String INDEX_NAME = "_routing";
public static final Field.Index INDEX = Field.Index.NOT_ANALYZED;
public static final Field.Store STORE = Field.Store.YES;
public static final boolean OMIT_NORMS = true;
@ -48,30 +47,21 @@ public class RoutingFieldMapper extends AbstractFieldMapper<String> implements o
public Builder() {
super(Defaults.NAME);
indexName = Defaults.INDEX_NAME;
store = Defaults.STORE;
index = Defaults.INDEX;
omitNorms = Defaults.OMIT_NORMS;
omitTermFreqAndPositions = Defaults.OMIT_TERM_FREQ_AND_POSITIONS;
}
@Override public RoutingFieldMapper build(BuilderContext context) {
return new RoutingFieldMapper(name, indexName, store, termVector, boost, omitNorms, omitTermFreqAndPositions);
return new RoutingFieldMapper(store, index);
}
}
protected RoutingFieldMapper() {
this(Defaults.NAME, Defaults.INDEX_NAME);
this(Defaults.STORE, Defaults.INDEX);
}
protected RoutingFieldMapper(String name, String indexName) {
this(name, indexName, Defaults.STORE, Defaults.TERM_VECTOR, Defaults.BOOST,
Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS);
}
protected RoutingFieldMapper(String name, String indexName, Field.Store store, Field.TermVector termVector,
float boost, boolean omitNorms, boolean omitTermFreqAndPositions) {
super(new Names(name, indexName, indexName, name), Defaults.INDEX, store, termVector, boost, omitNorms, omitTermFreqAndPositions,
protected RoutingFieldMapper(Field.Store store, Field.Index index) {
super(new Names(Defaults.NAME, Defaults.NAME, Defaults.NAME, Defaults.NAME), index, store, Defaults.TERM_VECTOR, 1.0f, Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS,
Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER);
}
@ -100,6 +90,10 @@ public class RoutingFieldMapper extends AbstractFieldMapper<String> implements o
if (context.externalValueSet()) {
String routing = (String) context.externalValue();
if (routing != null) {
if (!indexed() && !stored()) {
context.ignoredValue(names.indexName(), routing);
return null;
}
return new Field(names.indexName(), routing, store, index);
}
}
@ -113,13 +107,16 @@ public class RoutingFieldMapper extends AbstractFieldMapper<String> implements o
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
// if all are defaults, no sense to write it at all
if (index == Defaults.INDEX) {
if (index == Defaults.INDEX && store == Defaults.STORE) {
return;
}
builder.startObject(CONTENT_TYPE);
if (index != Defaults.INDEX) {
builder.field("index", index.name().toLowerCase());
}
if (store != Defaults.STORE) {
builder.field("store", store.name().toLowerCase());
}
builder.endObject();
}

View File

@ -411,7 +411,7 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
parser.close();
}
}
ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), context.doc(), context.analyzer(), source.source(), context.mappersAdded());
ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), source.routing(), context.doc(), context.analyzer(), source.source(), context.mappersAdded());
// reset the context to free up memory
context.reset(null, null, null, null, null);
return doc;

View File

@ -461,11 +461,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
switch (operation.opType()) {
case CREATE:
Translog.Create create = (Translog.Create) operation;
engine.create(prepareCreate(source(create.source()).type(create.type()).id(create.id())));
engine.create(prepareCreate(source(create.source()).type(create.type()).id(create.id()).routing(create.routing())));
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id())));
engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id()).routing(index.routing())));
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.translog;
import org.apache.lucene.index.Term;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -30,9 +29,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.NotThreadSafe;
import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.service.IndexShard;
import javax.annotation.Nullable;
import java.io.IOException;
@ -190,20 +187,20 @@ public interface Translog extends IndexShardComponent {
Type opType();
long estimateSize();
void execute(IndexShard indexShard) throws ElasticSearchException;
}
static class Create implements Operation {
private String id;
private String type;
private byte[] source;
private String routing;
public Create() {
}
public Create(Engine.Create create) {
this(create.type(), create.id(), create.source());
this.routing = create.routing();
}
public Create(String type, String id, byte[] source) {
@ -232,24 +229,35 @@ public interface Translog extends IndexShardComponent {
return this.type;
}
@Override public void execute(IndexShard indexShard) throws ElasticSearchException {
indexShard.create(indexShard.prepareCreate(SourceToParse.source(source).type(type).id(id)));
public String routing() {
return this.routing;
}
@Override public void readFrom(StreamInput in) throws IOException {
in.readVInt(); // version
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
source = new byte[in.readVInt()];
in.readFully(source);
if (version == 1) {
if (in.readBoolean()) {
routing = in.readUTF();
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(0); // version
out.writeVInt(1); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeVInt(source.length);
out.writeBytes(source);
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
}
}
@ -257,12 +265,14 @@ public interface Translog extends IndexShardComponent {
private String id;
private String type;
private byte[] source;
private String routing;
public Index() {
}
public Index(Engine.Index index) {
this(index.type(), index.id(), index.source());
this.routing = index.routing();
}
public Index(String type, String id, byte[] source) {
@ -287,28 +297,39 @@ public interface Translog extends IndexShardComponent {
return this.id;
}
public String routing() {
return this.routing;
}
public byte[] source() {
return this.source;
}
@Override public void execute(IndexShard indexShard) throws ElasticSearchException {
indexShard.index(indexShard.prepareIndex(SourceToParse.source(source).type(type).id(id)));
}
@Override public void readFrom(StreamInput in) throws IOException {
in.readVInt(); // version
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
source = new byte[in.readVInt()];
in.readFully(source);
if (version == 1) {
if (in.readBoolean()) {
routing = in.readUTF();
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(0); // version
out.writeVInt(1); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeVInt(source.length);
out.writeBytes(source);
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
}
}
@ -338,10 +359,6 @@ public interface Translog extends IndexShardComponent {
return this.uid;
}
@Override public void execute(IndexShard indexShard) throws ElasticSearchException {
indexShard.delete(uid);
}
@Override public void readFrom(StreamInput in) throws IOException {
in.readVInt(); // version
uid = new Term(in.readUTF(), in.readUTF());
@ -392,10 +409,6 @@ public interface Translog extends IndexShardComponent {
return this.types;
}
@Override public void execute(IndexShard indexShard) throws ElasticSearchException {
indexShard.deleteByQuery(source, queryParserName, types);
}
@Override public void readFrom(StreamInput in) throws IOException {
in.readVInt(); // version
source = new byte[in.readVInt()];

View File

@ -118,7 +118,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// create a document
ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc));
// its not there...
@ -137,7 +137,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// now do an update
doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.index(new Engine.Index(newUid("1"), doc));
// its not updated yet...
@ -176,7 +176,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// add it back
doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc));
// its not there...
@ -201,7 +201,7 @@ public abstract class AbstractSimpleEngineTests {
// make sure we can still work with the engine
// now do an update
doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.index(new Engine.Index(newUid("1"), doc));
// its not updated yet...
@ -229,13 +229,13 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
List<Engine.Operation> ops = Lists.newArrayList();
ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "1_test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "1_test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ops.add(new Engine.Create(doc));
doc = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "2_test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
doc = new ParsedDocument("2", "2", "test", null, doc().add(field("_uid", "2")).add(field("value", "2_test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
ops.add(new Engine.Create(doc));
doc = new ParsedDocument("3", "3", "test", doc().add(field("_uid", "3")).add(field("value", "3_test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
doc = new ParsedDocument("3", "3", "test", null, doc().add(field("_uid", "3")).add(field("value", "3_test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
ops.add(new Engine.Create(doc));
doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "1_test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "1_test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ops.add(new Engine.Index(newUid("1"), doc));
ops.add(new Engine.Delete(newUid("2")));
@ -261,7 +261,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release();
// create a document
ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc));
// its not there...
@ -294,7 +294,7 @@ public abstract class AbstractSimpleEngineTests {
@Test public void testSimpleSnapshot() throws Exception {
// create a document
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc1));
final ExecutorService executorService = Executors.newCachedThreadPool();
@ -310,10 +310,10 @@ public abstract class AbstractSimpleEngineTests {
Future<Object> future = executorService.submit(new Callable<Object>() {
@Override public Object call() throws Exception {
engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(doc2));
engine.flush(new Engine.Flush());
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", doc().add(field("_uid", "3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(field("_uid", "3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(doc3));
return null;
}
@ -348,7 +348,7 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testSimpleRecover() throws Exception {
ParsedDocument doc = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc));
engine.flush(new Engine.Flush());
@ -389,10 +389,10 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2() throws Exception {
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc1));
engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(doc2));
engine.recover(new Engine.RecoveryHandler() {
@ -416,10 +416,10 @@ public abstract class AbstractSimpleEngineTests {
}
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2AndPhase3() throws Exception {
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(field("_uid", "1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(doc1));
engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(field("_uid", "2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(doc2));
engine.recover(new Engine.RecoveryHandler() {
@ -433,7 +433,7 @@ public abstract class AbstractSimpleEngineTests {
assertThat(create.source(), equalTo(B_2));
// add for phase3
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", doc().add(field("_uid", "3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(field("_uid", "3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(doc3));
}