Add support for timestamp field

This commit is contained in:
Benjamin Devèze 2011-08-26 11:06:10 +02:00 committed by Shay Banon
parent fe0a1d424d
commit bb02f19f88
23 changed files with 935 additions and 179 deletions

View File

@ -165,7 +165,7 @@ public class SimpleEngineBenchmark {
String sId = Integer.toString(id); String sId = Integer.toString(id);
Document doc = doc().add(field("_id", sId)) Document doc = doc().add(field("_id", sId))
.add(field("content", contentItem)).build(); .add(field("content", contentItem)).build();
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false); ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
if (create) { if (create) {
engine.create(new Engine.Create(null, new Term("_id", sId), pDoc)); engine.create(new Engine.Create(null, new Term("_id", sId), pDoc));
} else { } else {
@ -279,7 +279,7 @@ public class SimpleEngineBenchmark {
String sId = Integer.toString(id); String sId = Integer.toString(id);
Document doc = doc().add(field("_id", sId)) Document doc = doc().add(field("_id", sId))
.add(field("content", content(id))).build(); .add(field("content", content(id))).build();
ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false); ParsedDocument pDoc = new ParsedDocument(sId, sId, "type", null, -1, doc, Lucene.STANDARD_ANALYZER, TRANSLOG_PAYLOAD, false);
if (create) { if (create) {
engine.create(new Engine.Create(null, new Term("_id", sId), pDoc)); engine.create(new Engine.Create(null, new Term("_id", sId), pDoc));
} else { } else {

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.ElasticSearchException;
/**
* @author paikan (benjamin.deveze)
*/
public class TimestampParsingException extends ElasticSearchException {
private final String timestamp;
public TimestampParsingException(String timestamp) {
super("failed to parse timestamp [" + timestamp + "]");
this.timestamp = timestamp;
}
public String timestamp() {
return timestamp;
}
}

View File

@ -112,6 +112,7 @@ public class BulkRequest implements ActionRequest {
String id = null; String id = null;
String routing = null; String routing = null;
String parent = null; String parent = null;
String timestamp = null;
String opType = null; String opType = null;
long version = 0; long version = 0;
VersionType versionType = VersionType.INTERNAL; VersionType versionType = VersionType.INTERNAL;
@ -132,6 +133,8 @@ public class BulkRequest implements ActionRequest {
routing = parser.text(); routing = parser.text();
} else if ("_parent".equals(currentFieldName) || "parent".equals(currentFieldName)) { } else if ("_parent".equals(currentFieldName) || "parent".equals(currentFieldName)) {
parent = parser.text(); parent = parser.text();
} else if ("_timestamp".equals(currentFieldName) || "timestamp".equals(currentFieldName)) {
timestamp = parser.text();
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) { } else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
opType = parser.text(); opType = parser.text();
} else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) { } else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) {
@ -154,17 +157,17 @@ public class BulkRequest implements ActionRequest {
// order is important, we set parent after routing, so routing will be set to parent if not set explicitly // order is important, we set parent after routing, so routing will be set to parent if not set explicitly
if ("index".equals(action)) { if ("index".equals(action)) {
if (opType == null) { if (opType == null) {
add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version).versionType(versionType) add(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).version(version).versionType(versionType)
.source(data, from, nextMarker - from, contentUnsafe) .source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate)); .percolate(percolate));
} else { } else {
add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version).versionType(versionType) add(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).version(version).versionType(versionType)
.create("create".equals(opType)) .create("create".equals(opType))
.source(data, from, nextMarker - from, contentUnsafe) .source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate)); .percolate(percolate));
} }
} else if ("create".equals(action)) { } else if ("create".equals(action)) {
add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version).versionType(versionType) add(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).version(version).versionType(versionType)
.create(true) .create(true)
.source(data, from, nextMarker - from, contentUnsafe) .source(data, from, nextMarker - from, contentUnsafe)
.percolate(percolate)); .percolate(percolate));

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -161,11 +162,16 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
ActionRequest request = bulkRequest.requests.get(i); ActionRequest request = bulkRequest.requests.get(i);
if (request instanceof IndexRequest) { if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request; IndexRequest indexRequest = (IndexRequest) request;
// handle routing // handle routing & timestamp
boolean needToParseExternalTimestamp = indexRequest.timestamp() != null;
MappingMetaData mappingMd = clusterState.metaData().index(indexRequest.index()).mapping(indexRequest.type()); MappingMetaData mappingMd = clusterState.metaData().index(indexRequest.index()).mapping(indexRequest.type());
if (mappingMd != null) { if (mappingMd != null) {
try { try {
indexRequest.processRouting(mappingMd); if (needToParseExternalTimestamp) {
indexRequest.parseStringTimestamp(indexRequest.timestamp(), mappingMd.tsDateTimeFormatter());
needToParseExternalTimestamp = false;
}
indexRequest.processRoutingAndTimestamp(mappingMd);
} catch (ElasticSearchException e) { } catch (ElasticSearchException e) {
responses[i] = new BulkItemResponse(i, indexRequest.opType().toString().toLowerCase(), responses[i] = new BulkItemResponse(i, indexRequest.opType().toString().toLowerCase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e.getDetailedMessage())); new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e.getDetailedMessage()));
@ -173,6 +179,16 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
} }
} }
// Try to parse external timestamp if necessary with no mapping
if (needToParseExternalTimestamp) {
indexRequest.parseStringTimestamp(indexRequest.timestamp(), TimestampFieldMapper.Defaults.DATE_TIME_FORMATTER);
}
// The timestamp has not been set neither externally nor in the source doc so we generate it
if (indexRequest.timestamp() == null) {
indexRequest.generateTimestamp();
}
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId(); ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
List<BulkItemRequest> list = requestsByShard.get(shardId); List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) { if (list == null) {

View File

@ -123,7 +123,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} }
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()) SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()); .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.getParsedTimestamp());
long version; long version;
Engine.IndexingOperation op; Engine.IndexingOperation op;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) { if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
@ -232,7 +232,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
IndexRequest indexRequest = (IndexRequest) item.request(); IndexRequest indexRequest = (IndexRequest) item.request();
try { try {
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()) SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()); .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.getParsedTimestamp());
if (indexRequest.opType() == IndexRequest.OpType.INDEX) { if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA); Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
indexShard.index(index); indexShard.index(index);

View File

@ -26,6 +26,7 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.TimestampParsingException;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
@ -34,8 +35,11 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Required; import org.elasticsearch.common.Required;
import org.elasticsearch.common.Unicode; import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
@ -45,6 +49,7 @@ import org.elasticsearch.index.VersionType;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.action.Actions.*; import static org.elasticsearch.action.Actions.*;
@ -115,6 +120,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
private String id; private String id;
@Nullable private String routing; @Nullable private String routing;
@Nullable private String parent; @Nullable private String parent;
@Nullable private String timestamp;
private byte[] source; private byte[] source;
private int sourceOffset; private int sourceOffset;
@ -276,6 +282,47 @@ public class IndexRequest extends ShardReplicationOperationRequest {
return this.parent; return this.parent;
} }
public IndexRequest timestamp(String timestamp) {
this.timestamp = timestamp;
return this;
}
public String timestamp() {
return this.timestamp;
}
public void parseStringTimestamp(String timestampAsString, FormatDateTimeFormatter dateTimeFormatter) {
long ts;
try {
ts = Long.parseLong(timestampAsString);
} catch (NumberFormatException e) {
try {
ts = dateTimeFormatter.parser().parseMillis(timestampAsString);
} catch (RuntimeException e1) {
throw new TimestampParsingException(timestampAsString);
}
}
timestamp = String.valueOf(ts);
}
public long getParsedTimestamp() {
if (timestamp != null) {
try {
return Long.parseLong(timestamp);
} catch (NumberFormatException e1) {
}
}
// The timestamp is always set as a parsable long
return -1;
}
public IndexRequest generateTimestamp() {
timestamp = String.valueOf(new Date().getTime());
return this;
}
/** /**
* The source of the document to index, recopied to a new array if it has an offset or unsafe. * The source of the document to index, recopied to a new array if it has an offset or unsafe.
*/ */
@ -562,15 +609,24 @@ public class IndexRequest extends ShardReplicationOperationRequest {
return this.percolate; return this.percolate;
} }
public void processRouting(MappingMetaData mappingMd) throws ElasticSearchException { public void processRoutingAndTimestamp(MappingMetaData mappingMd) throws ElasticSearchException {
if (routing == null && mappingMd.routing().hasPath()) { boolean shouldParseRouting = (routing == null && mappingMd.routing().hasPath());
boolean shouldParseTimestamp = (timestamp == null && mappingMd.timestamp().hasPath());
if (shouldParseRouting || shouldParseTimestamp) {
XContentParser parser = null; XContentParser parser = null;
try { try {
parser = XContentFactory.xContent(source, sourceOffset, sourceLength) parser = XContentFactory.xContent(source, sourceOffset, sourceLength)
.createParser(source, sourceOffset, sourceLength); .createParser(source, sourceOffset, sourceLength);
routing = mappingMd.parseRouting(parser); Tuple<String, String> parseResult = mappingMd.parseRoutingAndTimestamp(parser, shouldParseRouting, shouldParseTimestamp);
if (shouldParseRouting) {
routing = parseResult.v1();
}
if (shouldParseTimestamp) {
timestamp = parseResult.v2();
}
} catch (Exception e) { } catch (Exception e) {
throw new ElasticSearchParseException("failed to parse doc to extract routing", e); throw new ElasticSearchParseException("failed to parse doc to extract routing/timestamp", e);
} finally { } finally {
if (parser != null) { if (parser != null) {
parser.close(); parser.close();
@ -581,6 +637,10 @@ public class IndexRequest extends ShardReplicationOperationRequest {
if (mappingMd.routing().required() && routing == null) { if (mappingMd.routing().required() && routing == null) {
throw new RoutingMissingException(index, type, id); throw new RoutingMissingException(index, type, id);
} }
// Process parsed timestamp found in source
if (shouldParseTimestamp && timestamp != null) {
parseStringTimestamp(timestamp, mappingMd.tsDateTimeFormatter());
}
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
@ -595,6 +655,9 @@ public class IndexRequest extends ShardReplicationOperationRequest {
if (in.readBoolean()) { if (in.readBoolean()) {
parent = in.readUTF(); parent = in.readUTF();
} }
if (in.readBoolean()) {
timestamp = in.readUTF();
}
sourceUnsafe = false; sourceUnsafe = false;
sourceOffset = 0; sourceOffset = 0;
@ -632,6 +695,12 @@ public class IndexRequest extends ShardReplicationOperationRequest {
out.writeBoolean(true); out.writeBoolean(true);
out.writeUTF(parent); out.writeUTF(parent);
} }
if (timestamp == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(timestamp);
}
out.writeVInt(sourceLength); out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength); out.writeBytes(source, sourceOffset, sourceLength);
out.writeByte(opType.id()); out.writeByte(opType.id());

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.percolator.PercolatorExecutor; import org.elasticsearch.index.percolator.PercolatorExecutor;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
@ -52,6 +53,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -124,14 +126,29 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
} }
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) { private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
boolean needToParseExternalTimestamp = request.timestamp() != null;
MetaData metaData = clusterService.state().metaData(); MetaData metaData = clusterService.state().metaData();
request.routing(metaData.resolveIndexRouting(request.routing(), request.index())); request.routing(metaData.resolveIndexRouting(request.routing(), request.index()));
request.index(metaData.concreteIndex(request.index())); request.index(metaData.concreteIndex(request.index()));
if (metaData.hasIndex(request.index())) { if (metaData.hasIndex(request.index())) {
MappingMetaData mappingMd = metaData.index(request.index()).mapping(request.type()); MappingMetaData mappingMd = metaData.index(request.index()).mapping(request.type());
if (mappingMd != null) { if (mappingMd != null) {
request.processRouting(mappingMd); // Try to parse externally provided timestamp if necessary (with a mapping)
if (needToParseExternalTimestamp) {
request.parseStringTimestamp(request.timestamp(), mappingMd.tsDateTimeFormatter());
needToParseExternalTimestamp = false; // parseTimestamp throws an exception if something gone wrong
} }
request.processRoutingAndTimestamp(mappingMd);
}
}
// Try to parse external timestamp if necessary (without mapping)
if (needToParseExternalTimestamp) {
request.parseStringTimestamp(request.timestamp(), TimestampFieldMapper.Defaults.DATE_TIME_FORMATTER);
}
// The timestamp has not been sets neither externally nor in the source doc so we generate it
if (request.timestamp() == null) {
request.generateTimestamp();
} }
super.doExecute(request, listener); super.doExecute(request, listener);
} }
@ -178,7 +195,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
IndexShard indexShard = indexShard(shardRequest); IndexShard indexShard = indexShard(shardRequest);
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()) SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()); .routing(request.routing()).parent(request.parent()).timestamp(request.getParsedTimestamp());
long version; long version;
Engine.IndexingOperation op; Engine.IndexingOperation op;
if (request.opType() == IndexRequest.OpType.INDEX) { if (request.opType() == IndexRequest.OpType.INDEX) {
@ -233,7 +250,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
IndexShard indexShard = indexShard(shardRequest); IndexShard indexShard = indexShard(shardRequest);
IndexRequest request = shardRequest.request; IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()) SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()); .routing(request.routing()).parent(request.parent()).timestamp(request.getParsedTimestamp());
if (request.opType() == IndexRequest.OpType.INDEX) { if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse) Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(request.version()) .version(request.version())

View File

@ -19,13 +19,20 @@
package org.elasticsearch.cluster.metadata; package org.elasticsearch.cluster.metadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
@ -74,16 +81,64 @@ public class MappingMetaData {
} }
} }
public static class Timestamp {
public static final Timestamp EMPTY = new Timestamp(false, null, TimestampFieldMapper.DEFAULT_DATE_TIME_FORMAT);
private final boolean enabled;
private final String path;
private final String format;
private final String[] pathElements;
public Timestamp(boolean enabled, String path, String format) {
this.enabled = enabled;
this.path = path;
if (path == null) {
pathElements = Strings.EMPTY_ARRAY;
} else {
pathElements = Strings.delimitedListToStringArray(path, ".");
}
this.format = format;
}
public boolean enabled() {
return enabled;
}
public boolean hasPath() {
return path != null;
}
public String path() {
return this.path;
}
public String[] pathElements() {
return this.pathElements;
}
public String format() {
return this.format;
}
}
private final String type; private final String type;
private final CompressedString source; private final CompressedString source;
private final Routing routing; private final Routing routing;
private final Timestamp timestamp;
private final FormatDateTimeFormatter tsDateTimeFormatter;
public MappingMetaData(DocumentMapper docMapper) { public MappingMetaData(DocumentMapper docMapper) {
this.type = docMapper.type(); this.type = docMapper.type();
this.source = docMapper.mappingSource(); this.source = docMapper.mappingSource();
this.routing = new Routing(docMapper.routingFieldMapper().required(), docMapper.routingFieldMapper().path()); this.routing = new Routing(docMapper.routingFieldMapper().required(), docMapper.routingFieldMapper().path());
this.timestamp = new Timestamp(docMapper.timestampFieldMapper().enabled(), docMapper.timestampFieldMapper().path(), docMapper.timestampFieldMapper().dateTimeFormatter().format());
this.tsDateTimeFormatter = docMapper.timestampFieldMapper().dateTimeFormatter();
} }
public MappingMetaData(String type, Map<String, Object> mapping) throws IOException { public MappingMetaData(String type, Map<String, Object> mapping) throws IOException {
@ -110,12 +165,35 @@ public class MappingMetaData {
} else { } else {
this.routing = Routing.EMPTY; this.routing = Routing.EMPTY;
} }
if (withoutType.containsKey("_timestamp")) {
boolean enabled = false;
String path = null;
String format = TimestampFieldMapper.DEFAULT_DATE_TIME_FORMAT;
Map<String, Object> timestampNode = (Map<String, Object>) withoutType.get("_timestamp");
for (Map.Entry<String, Object> entry : timestampNode.entrySet()) {
String fieldName = Strings.toUnderscoreCase(entry.getKey());
Object fieldNode = entry.getValue();
if (fieldName.equals("enabled")) {
enabled = nodeBooleanValue(fieldNode);
} else if (fieldName.equals("path")) {
path = fieldNode.toString();
} else if (fieldName.equals("format")) {
format = fieldNode.toString();
}
}
this.timestamp = new Timestamp(enabled, path, format);
} else {
this.timestamp = Timestamp.EMPTY;
}
this.tsDateTimeFormatter = Joda.forPattern(timestamp.format());
} }
MappingMetaData(String type, CompressedString source, Routing routing) { MappingMetaData(String type, CompressedString source, Routing routing, Timestamp timestamp) {
this.type = type; this.type = type;
this.source = source; this.source = source;
this.routing = routing; this.routing = routing;
this.timestamp = timestamp;
this.tsDateTimeFormatter = Joda.forPattern(timestamp.format());
} }
public String type() { public String type() {
@ -130,11 +208,27 @@ public class MappingMetaData {
return this.routing; return this.routing;
} }
public String parseRouting(XContentParser parser) throws IOException { public Timestamp timestamp() {
return parseRouting(parser, 0); return this.timestamp;
} }
private String parseRouting(XContentParser parser, int location) throws IOException { public FormatDateTimeFormatter tsDateTimeFormatter() {
return this.tsDateTimeFormatter;
}
public Tuple<String, String> parseRoutingAndTimestamp(XContentParser parser,
boolean shouldParseRouting,
boolean shouldParseTimestamp) throws IOException {
return parseRoutingAndTimestamp(parser, 0, 0, null, null, shouldParseRouting, shouldParseTimestamp);
}
private Tuple<String, String> parseRoutingAndTimestamp(XContentParser parser,
int locationRouting,
int locationTimestamp,
@Nullable String routingValue,
@Nullable String timestampValue,
boolean shouldParseRouting,
boolean shouldParseTimestamp) throws IOException {
XContentParser.Token t = parser.currentToken(); XContentParser.Token t = parser.currentToken();
if (t == null) { if (t == null) {
t = parser.nextToken(); t = parser.nextToken();
@ -142,25 +236,67 @@ public class MappingMetaData {
if (t == XContentParser.Token.START_OBJECT) { if (t == XContentParser.Token.START_OBJECT) {
t = parser.nextToken(); t = parser.nextToken();
} }
String routingPart = routing().pathElements()[location]; String routingPart = shouldParseRouting ? routing().pathElements()[locationRouting] : null;
String timestampPart = shouldParseTimestamp ? timestamp().pathElements()[locationTimestamp] : null;
for (; t == XContentParser.Token.FIELD_NAME; t = parser.nextToken()) { for (; t == XContentParser.Token.FIELD_NAME; t = parser.nextToken()) {
// Must point to field name // Must point to field name
String fieldName = parser.currentName(); String fieldName = parser.currentName();
// And then the value... // And then the value...
t = parser.nextToken(); t = parser.nextToken();
if (routingPart.equals(fieldName)) {
if (location + 1 == routing.pathElements().length) { boolean incLocationRouting = false;
return parser.textOrNull(); boolean incLocationTimestamp = false;
if (shouldParseRouting && routingPart.equals(fieldName)) {
if (locationRouting + 1 == routing.pathElements().length) {
routingValue = parser.textOrNull();
shouldParseRouting = false;
} else {
incLocationRouting = true;
} }
}
if (shouldParseTimestamp && timestampPart.equals(fieldName)) {
if (locationTimestamp + 1 == timestamp.pathElements().length) {
timestampValue = parser.textOrNull();
shouldParseTimestamp = false;
} else {
incLocationTimestamp = true;
}
}
if (incLocationRouting || incLocationTimestamp) {
if (t == XContentParser.Token.START_OBJECT) { if (t == XContentParser.Token.START_OBJECT) {
return parseRouting(parser, location + 1); locationRouting += incLocationRouting ? 1 : 0;
locationTimestamp += incLocationTimestamp ? 1 : 0;
Tuple<String, String> result = parseRoutingAndTimestamp(parser, locationRouting, locationTimestamp, routingValue, timestampValue,
shouldParseRouting, shouldParseTimestamp);
routingValue = result.v1();
timestampValue = result.v2();
if (incLocationRouting) {
if (routingValue != null) {
shouldParseRouting = false;
} else {
locationRouting--;
}
}
if (incLocationTimestamp) {
if (timestampValue != null) {
shouldParseTimestamp = false;
} else {
locationTimestamp--;
}
}
} }
} else { } else {
parser.skipChildren(); parser.skipChildren();
} }
if (!shouldParseRouting && !shouldParseTimestamp) {
return Tuple.create(routingValue, timestampValue);
} }
return null; }
return Tuple.create(routingValue, timestampValue);
} }
public static void writeTo(MappingMetaData mappingMd, StreamOutput out) throws IOException { public static void writeTo(MappingMetaData mappingMd, StreamOutput out) throws IOException {
@ -174,6 +310,15 @@ public class MappingMetaData {
} else { } else {
out.writeBoolean(false); out.writeBoolean(false);
} }
// timestamp
out.writeBoolean(mappingMd.timestamp().enabled());
if (mappingMd.timestamp().hasPath()) {
out.writeBoolean(true);
out.writeUTF(mappingMd.timestamp().path());
} else {
out.writeBoolean(false);
}
out.writeUTF(mappingMd.timestamp().format());
} }
public static MappingMetaData readFrom(StreamInput in) throws IOException { public static MappingMetaData readFrom(StreamInput in) throws IOException {
@ -181,6 +326,8 @@ public class MappingMetaData {
CompressedString source = CompressedString.readCompressedString(in); CompressedString source = CompressedString.readCompressedString(in);
// routing // routing
Routing routing = new Routing(in.readBoolean(), in.readBoolean() ? in.readUTF() : null); Routing routing = new Routing(in.readBoolean(), in.readBoolean() ? in.readUTF() : null);
return new MappingMetaData(type, source, routing); // timestamp
Timestamp timestamp = new Timestamp(in.readBoolean(), in.readBoolean() ? in.readUTF() : null, in.readUTF());
return new MappingMetaData(type, source, routing, timestamp);
} }
} }

View File

@ -61,4 +61,16 @@ public class Tuple<V1, V2> {
result = 31 * result + (v2 != null ? v2.hashCode() : 0); result = 31 * result + (v2 != null ? v2.hashCode() : 0);
return result; return result;
} }
/**
* Helper function to create a tuple.
*
* @param v1 the first element for the resulting tuple
* @param v2 the second element for the resulting tuple
* @return the tuple (<code>v1</code>,<code>v2</code>)
*/
public static <V1, V2> Tuple<V1, V2> create(V1 v1, V2 v2)
{
return new Tuple<V1, V2>(v1, v2);
}
} }

View File

@ -363,6 +363,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc.routing(); return this.doc.routing();
} }
public long timestamp() {
return this.doc.timestamp();
}
public long version() { public long version() {
return this.version; return this.version;
} }
@ -511,6 +515,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc.parent(); return this.doc.parent();
} }
public long timestamp() {
return this.doc.timestamp();
}
public byte[] source() { public byte[] source() {
return this.doc.source(); return this.doc.source();
} }

View File

@ -166,6 +166,7 @@ public class DocumentMapper implements ToXContent {
this.rootMappers.put(AnalyzerMapper.class, new AnalyzerMapper()); this.rootMappers.put(AnalyzerMapper.class, new AnalyzerMapper());
this.rootMappers.put(BoostFieldMapper.class, new BoostFieldMapper()); this.rootMappers.put(BoostFieldMapper.class, new BoostFieldMapper());
this.rootMappers.put(RoutingFieldMapper.class, new RoutingFieldMapper()); this.rootMappers.put(RoutingFieldMapper.class, new RoutingFieldMapper());
this.rootMappers.put(TimestampFieldMapper.class, new TimestampFieldMapper());
this.rootMappers.put(UidFieldMapper.class, new UidFieldMapper()); this.rootMappers.put(UidFieldMapper.class, new UidFieldMapper());
// don't add parent field, by default its "null" // don't add parent field, by default its "null"
} }
@ -359,6 +360,10 @@ public class DocumentMapper implements ToXContent {
return rootMapper(ParentFieldMapper.class); return rootMapper(ParentFieldMapper.class);
} }
public TimestampFieldMapper timestampFieldMapper() {
return rootMapper(TimestampFieldMapper.class);
}
public Analyzer indexAnalyzer() { public Analyzer indexAnalyzer() {
return this.indexAnalyzer; return this.indexAnalyzer;
} }
@ -477,7 +482,7 @@ public class DocumentMapper implements ToXContent {
if (context.docs().size() > 1) { if (context.docs().size() > 1) {
Collections.reverse(context.docs()); Collections.reverse(context.docs());
} }
ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), source.routing(), context.docs(), context.analyzer(), ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), source.routing(), source.timestamp(), context.docs(), context.analyzer(),
context.source(), context.mappersAdded()).parent(source.parent()); context.source(), context.mappersAdded()).parent(source.parent());
// reset the context to free up memory // reset the context to free up memory
context.reset(null, null, null, null); context.reset(null, null, null, null);

View File

@ -96,6 +96,7 @@ public class DocumentMapperParser extends AbstractIndexComponent {
.put(BoostFieldMapper.NAME, new BoostFieldMapper.TypeParser()) .put(BoostFieldMapper.NAME, new BoostFieldMapper.TypeParser())
.put(ParentFieldMapper.NAME, new ParentFieldMapper.TypeParser()) .put(ParentFieldMapper.NAME, new ParentFieldMapper.TypeParser())
.put(RoutingFieldMapper.NAME, new RoutingFieldMapper.TypeParser()) .put(RoutingFieldMapper.NAME, new RoutingFieldMapper.TypeParser())
.put(TimestampFieldMapper.NAME, new TimestampFieldMapper.TypeParser())
.put(UidFieldMapper.NAME, new UidFieldMapper.TypeParser()) .put(UidFieldMapper.NAME, new UidFieldMapper.TypeParser())
.put(IdFieldMapper.NAME, new IdFieldMapper.TypeParser()) .put(IdFieldMapper.NAME, new IdFieldMapper.TypeParser())
.immutableMap(); .immutableMap();

View File

@ -22,15 +22,7 @@ package org.elasticsearch.index.mapper;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.core.*; import org.elasticsearch.index.mapper.core.*;
import org.elasticsearch.index.mapper.internal.AllFieldMapper; import org.elasticsearch.index.mapper.internal.*;
import org.elasticsearch.index.mapper.internal.AnalyzerMapper;
import org.elasticsearch.index.mapper.internal.BoostFieldMapper;
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
import org.elasticsearch.index.mapper.internal.IndexFieldMapper;
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.ip.IpFieldMapper; import org.elasticsearch.index.mapper.ip.IpFieldMapper;
import org.elasticsearch.index.mapper.multifield.MultiFieldMapper; import org.elasticsearch.index.mapper.multifield.MultiFieldMapper;
import org.elasticsearch.index.mapper.object.ObjectMapper; import org.elasticsearch.index.mapper.object.ObjectMapper;
@ -77,6 +69,10 @@ public final class MapperBuilders {
return new IndexFieldMapper.Builder(); return new IndexFieldMapper.Builder();
} }
public static TimestampFieldMapper.Builder timestamp() {
return new TimestampFieldMapper.Builder();
}
public static BoostFieldMapper.Builder boost(String name) { public static BoostFieldMapper.Builder boost(String name) {
return new BoostFieldMapper.Builder(name); return new BoostFieldMapper.Builder(name);
} }

View File

@ -40,6 +40,8 @@ public class ParsedDocument {
private final String routing; private final String routing;
private final long timestamp;
private final List<Document> documents; private final List<Document> documents;
private final Analyzer analyzer; private final Analyzer analyzer;
@ -50,15 +52,16 @@ public class ParsedDocument {
private String parent; private String parent;
public ParsedDocument(String uid, String id, String type, String routing, Document document, Analyzer analyzer, byte[] source, boolean mappersAdded) { public ParsedDocument(String uid, String id, String type, String routing, long timestamp, Document document, Analyzer analyzer, byte[] source, boolean mappersAdded) {
this(uid, id, type, routing, Arrays.asList(document), analyzer, source, mappersAdded); this(uid, id, type, routing, timestamp, Arrays.asList(document), analyzer, source, mappersAdded);
} }
public ParsedDocument(String uid, String id, String type, String routing, List<Document> documents, Analyzer analyzer, byte[] source, boolean mappersAdded) { public ParsedDocument(String uid, String id, String type, String routing, long timestamp, List<Document> documents, Analyzer analyzer, byte[] source, boolean mappersAdded) {
this.uid = uid; this.uid = uid;
this.id = id; this.id = id;
this.type = type; this.type = type;
this.routing = routing; this.routing = routing;
this.timestamp = timestamp;
this.documents = documents; this.documents = documents;
this.source = source; this.source = source;
this.analyzer = analyzer; this.analyzer = analyzer;
@ -81,6 +84,10 @@ public class ParsedDocument {
return this.routing; return this.routing;
} }
public long timestamp() {
return this.timestamp;
}
public Document rootDoc() { public Document rootDoc() {
return documents.get(documents.size() - 1); return documents.get(documents.size() - 1);
} }

View File

@ -48,6 +48,8 @@ public class SourceToParse {
private String parentId; private String parentId;
private long timestamp;
public SourceToParse(XContentParser parser) { public SourceToParse(XContentParser parser) {
this.parser = parser; this.parser = parser;
this.source = null; this.source = null;
@ -110,4 +112,13 @@ public class SourceToParse {
this.routing = routing; this.routing = routing;
return this; return this;
} }
public long timestamp() {
return this.timestamp;
}
public SourceToParse timestamp(long timestamp) {
this.timestamp = timestamp;
return this;
}
} }

View File

@ -0,0 +1,225 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper.internal;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.InternalMapper;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MergeContext;
import org.elasticsearch.index.mapper.MergeMappingException;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.RootMapper;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.index.mapper.core.LongFieldMapper;
import org.elasticsearch.index.mapper.core.NumberFieldMapper;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.*;
import static org.elasticsearch.index.mapper.MapperBuilders.*;
import static org.elasticsearch.index.mapper.core.TypeParsers.*;
/**
* @author paikan (benjamin.deveze)
*/
public class TimestampFieldMapper extends DateFieldMapper implements InternalMapper, RootMapper {
public static final String NAME = "_timestamp";
public static final String CONTENT_TYPE = "_timestamp";
public static final String DEFAULT_DATE_TIME_FORMAT = "dateOptionalTime";
public static class Defaults extends DateFieldMapper.Defaults {
public static final String NAME = "_timestamp";
public static final Field.Store STORE = Field.Store.NO;
public static final Field.Index INDEX = Field.Index.NOT_ANALYZED;
public static final boolean ENABLED = false;
public static final String PATH = null;
public static final FormatDateTimeFormatter DATE_TIME_FORMATTER = Joda.forPattern(DEFAULT_DATE_TIME_FORMAT);
}
public static class Builder extends NumberFieldMapper.Builder<Builder, TimestampFieldMapper> {
private boolean enabled = Defaults.ENABLED;
private String path = Defaults.PATH;
private FormatDateTimeFormatter dateTimeFormatter = Defaults.DATE_TIME_FORMATTER;
public Builder() {
super(Defaults.NAME);
store = Defaults.STORE;
index = Defaults.INDEX;
}
public Builder enabled(boolean enabled) {
this.enabled = enabled;
return builder;
}
public Builder path(String path) {
this.path = path;
return builder;
}
public Builder dateTimeFormatter(FormatDateTimeFormatter dateTimeFormatter) {
this.dateTimeFormatter = dateTimeFormatter;
return builder;
}
@Override public TimestampFieldMapper build(BuilderContext context) {
return new TimestampFieldMapper(store, index, enabled, path, dateTimeFormatter);
}
}
public static class TypeParser implements Mapper.TypeParser {
@Override public Mapper.Builder parse(String name, Map<String, Object> node, ParserContext parserContext) throws MapperParsingException {
TimestampFieldMapper.Builder builder = timestamp();
parseField(builder, builder.name, node, parserContext);
for (Map.Entry<String, Object> entry : node.entrySet()) {
String fieldName = Strings.toUnderscoreCase(entry.getKey());
Object fieldNode = entry.getValue();
if (fieldName.equals("enabled")) {
builder.enabled(nodeBooleanValue(fieldNode));
} else if (fieldName.equals("path")) {
builder.path(fieldNode.toString());
} else if (fieldName.equals("format")) {
builder.dateTimeFormatter(parseDateTimeFormatter(builder.name(), fieldNode.toString()));
}
}
return builder;
}
}
private boolean enabled;
private final String path;
private final FormatDateTimeFormatter dateTimeFormatter;
public TimestampFieldMapper() {
this(Defaults.STORE, Defaults.INDEX, Defaults.ENABLED, Defaults.PATH, Defaults.DATE_TIME_FORMATTER);
}
protected TimestampFieldMapper(Field.Store store, Field.Index index, boolean enabled, String path, FormatDateTimeFormatter dateTimeFormatter) {
super(new Names(Defaults.NAME, Defaults.NAME, Defaults.NAME, Defaults.NAME), dateTimeFormatter,
Defaults.PRECISION_STEP, Defaults.FUZZY_FACTOR, index, store, Defaults.BOOST, Defaults.OMIT_NORMS,
Defaults.OMIT_TERM_FREQ_AND_POSITIONS, Defaults.NULL_VALUE);
this.enabled = enabled;
this.path = path;
this.dateTimeFormatter = dateTimeFormatter;
}
public boolean enabled() {
return this.enabled;
}
public String path() {
return this.path;
}
public FormatDateTimeFormatter dateTimeFormatter() {
return this.dateTimeFormatter;
}
/**
* Override the default behavior to return a timestamp
*/
@Override public Object valueForSearch(Fieldable field) {
return value(field);
}
@Override public String valueAsString(Fieldable field) {
Long value = value(field);
if (value == null) {
return null;
}
return value.toString();
}
@Override public void validate(ParseContext context) throws MapperParsingException {
}
@Override public void preParse(ParseContext context) throws IOException {
super.parse(context);
}
@Override public void postParse(ParseContext context) throws IOException {
}
@Override public void parse(ParseContext context) throws IOException {
// nothing to do here, we call the parent in preParse
}
@Override public boolean includeInObject() {
return true;
}
@Override protected Fieldable parseCreateField(ParseContext context) throws IOException {
if (enabled) {
long timestamp = context.sourceToParse().timestamp();
if (!indexed() && !stored()) {
context.ignoredValue(names.indexName(), String.valueOf(timestamp));
return null;
}
return new LongFieldMapper.CustomLongNumericField(this, timestamp);
}
return null;
}
@Override protected String contentType() {
return CONTENT_TYPE;
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// if all are defaults, no sense to write it at all
if (index == Defaults.INDEX && store == Defaults.STORE && enabled == Defaults.ENABLED && path == Defaults.PATH
&& dateTimeFormatter.format().equals(Defaults.DATE_TIME_FORMATTER.format())) {
return builder;
}
builder.startObject(CONTENT_TYPE);
if (index != Defaults.INDEX) {
builder.field("index", index.name().toLowerCase());
}
if (store != Defaults.STORE) {
builder.field("store", store.name().toLowerCase());
}
if (enabled != Defaults.ENABLED) {
builder.field("enabled", enabled);
}
if (path != Defaults.PATH) {
builder.field("path", path);
}
if (!dateTimeFormatter.format().equals(Defaults.DATE_TIME_FORMATTER.format())) {
builder.field("format", dateTimeFormatter.format());
}
builder.endObject();
return builder;
}
@Override public void merge(Mapper mergeWith, MergeContext mergeContext) throws MergeMappingException {
// do nothing here, no merging, but also no exception
}
}

View File

@ -520,13 +520,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
case CREATE: case CREATE:
Translog.Create create = (Translog.Create) operation; Translog.Create create = (Translog.Create) operation;
engine.create(prepareCreate(source(create.source()).type(create.type()).id(create.id()) engine.create(prepareCreate(source(create.source()).type(create.type()).id(create.id())
.routing(create.routing()).parent(create.parent())).version(create.version()) .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp())).version(create.version())
.origin(Engine.Operation.Origin.RECOVERY)); .origin(Engine.Operation.Origin.RECOVERY));
break; break;
case SAVE: case SAVE:
Translog.Index index = (Translog.Index) operation; Translog.Index index = (Translog.Index) operation;
engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id()) engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent())).version(index.version()) .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp())).version(index.version())
.origin(Engine.Operation.Origin.RECOVERY)); .origin(Engine.Operation.Origin.RECOVERY));
break; break;
case DELETE: case DELETE:

View File

@ -232,6 +232,7 @@ public interface Translog extends IndexShardComponent {
private byte[] source; private byte[] source;
private String routing; private String routing;
private String parent; private String parent;
private long timestamp;
private long version; private long version;
public Create() { public Create() {
@ -241,6 +242,7 @@ public interface Translog extends IndexShardComponent {
this(create.type(), create.id(), create.source()); this(create.type(), create.id(), create.source());
this.routing = create.routing(); this.routing = create.routing();
this.parent = create.parent(); this.parent = create.parent();
this.timestamp = create.timestamp();
this.version = create.version(); this.version = create.version();
} }
@ -278,6 +280,10 @@ public interface Translog extends IndexShardComponent {
return this.parent; return this.parent;
} }
public long timestamp() {
return this.timestamp;
}
public long version() { public long version() {
return this.version; return this.version;
} }
@ -311,10 +317,13 @@ public interface Translog extends IndexShardComponent {
if (version >= 3) { if (version >= 3) {
this.version = in.readLong(); this.version = in.readLong();
} }
if (version >= 4) {
this.timestamp = in.readLong();
}
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(3); // version out.writeVInt(4); // version
out.writeUTF(id); out.writeUTF(id);
out.writeUTF(type); out.writeUTF(type);
out.writeVInt(source.length); out.writeVInt(source.length);
@ -332,6 +341,7 @@ public interface Translog extends IndexShardComponent {
out.writeUTF(parent); out.writeUTF(parent);
} }
out.writeLong(version); out.writeLong(version);
out.writeLong(timestamp);
} }
} }
@ -342,6 +352,7 @@ public interface Translog extends IndexShardComponent {
private byte[] source; private byte[] source;
private String routing; private String routing;
private String parent; private String parent;
private long timestamp;
public Index() { public Index() {
} }
@ -351,6 +362,7 @@ public interface Translog extends IndexShardComponent {
this.routing = index.routing(); this.routing = index.routing();
this.parent = index.parent(); this.parent = index.parent();
this.version = index.version(); this.version = index.version();
this.timestamp = index.timestamp();
} }
public Index(String type, String id, byte[] source) { public Index(String type, String id, byte[] source) {
@ -383,6 +395,10 @@ public interface Translog extends IndexShardComponent {
return this.parent; return this.parent;
} }
public long timestamp() {
return this.timestamp;
}
public byte[] source() { public byte[] source() {
return this.source; return this.source;
} }
@ -420,10 +436,13 @@ public interface Translog extends IndexShardComponent {
if (version >= 3) { if (version >= 3) {
this.version = in.readLong(); this.version = in.readLong();
} }
if (version >= 4) {
this.timestamp = in.readLong();
}
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(3); // version out.writeVInt(4); // version
out.writeUTF(id); out.writeUTF(id);
out.writeUTF(type); out.writeUTF(type);
out.writeVInt(source.length); out.writeVInt(source.length);
@ -441,6 +460,7 @@ public interface Translog extends IndexShardComponent {
out.writeUTF(parent); out.writeUTF(parent);
} }
out.writeLong(version); out.writeLong(version);
out.writeLong(timestamp);
} }
} }

View File

@ -64,6 +64,7 @@ public class RestIndexAction extends BaseRestHandler {
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id")); IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
indexRequest.routing(request.param("routing")); indexRequest.routing(request.param("routing"));
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
indexRequest.timestamp(request.param("timestamp"));
indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe()); indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT)); indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh())); indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));

View File

@ -1,104 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.testng.annotations.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class ParseRoutingTests {
@Test public void testParseRouting() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""), new MappingMetaData.Routing(true, "test"));
byte[] bytes = jsonBuilder().startObject().field("aaa", "wr").field("test", "value").field("zzz", "wr").endObject().copiedBytes();
assertThat(md.parseRouting(XContentFactory.xContent(bytes).createParser(bytes)), equalTo("value"));
bytes = jsonBuilder().startObject()
.field("aaa", "wr")
.array("arr1", "1", "2", "3")
.startObject("obj1").field("ob1_field", "obj1_value").endObject()
.field("test", "value")
.field("zzz", "wr")
.endObject().copiedBytes();
assertThat(md.parseRouting(XContentFactory.xContent(bytes).createParser(bytes)), equalTo("value"));
}
@Test public void testParseRoutingWithPath() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""), new MappingMetaData.Routing(true, "obj1.field2"));
byte[] bytes = jsonBuilder().startObject()
.field("aaa", "wr")
.array("arr1", "1", "2", "3")
.startObject("obj1").field("field1", "value1").field("field2", "value2").endObject()
.field("test", "value")
.field("zzz", "wr")
.endObject().copiedBytes();
assertThat(md.parseRouting(XContentFactory.xContent(bytes).createParser(bytes)), equalTo("value2"));
}
@Test public void testParseRoutingWithRepeatedField() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""), new MappingMetaData.Routing(true, "field1.field1"));
byte[] bytes = jsonBuilder().startObject()
.field("aaa", "wr")
.array("arr1", "1", "2", "3")
.field("field1", "foo")
.field("field1", "bar")
.field("test", "value")
.field("zzz", "wr")
.endObject().copiedBytes();
assertThat(md.parseRouting(XContentFactory.xContent(bytes).createParser(bytes)), equalTo(null));
}
@Test public void testParseRoutingWithRepeatedFieldAndObject() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""), new MappingMetaData.Routing(true, "field1.field1.field2"));
byte[] bytes = jsonBuilder().startObject()
.field("aaa", "wr")
.array("arr1", "1", "2", "3")
.field("field1", "foo")
.startObject("field1").field("field2", "bar").endObject()
.field("test", "value")
.field("zzz", "wr")
.endObject().copiedBytes();
assertThat(md.parseRouting(XContentFactory.xContent(bytes).createParser(bytes)), equalTo(null));
}
@Test public void testParseRoutingWithRepeatedFieldAndValidRouting() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""), new MappingMetaData.Routing(true, "field1.field2"));
byte[] bytes = jsonBuilder().startObject()
.field("aaa", "wr")
.array("arr1", "1", "2", "3")
.field("field1", "foo")
.startObject("field1").field("field2", "bar").endObject()
.field("test", "value")
.field("zzz", "wr")
.endObject().copiedBytes();
assertThat(md.parseRouting(XContentFactory.xContent(bytes).createParser(bytes)), equalTo("bar"));
}
}

View File

@ -0,0 +1,189 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.testng.annotations.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class ParseRoutingTimestampTests {
@Test public void testParseRoutingAlone() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""),
new MappingMetaData.Routing(true, "routing"),
new MappingMetaData.Timestamp(true, "timestamp", "dateOptionalTime"));
byte[] bytes = jsonBuilder().startObject().field("field1", "value1").field("field2", "value2")
.field("routing", "routing_value").field("timestamp", "1").endObject().copiedBytes();
Tuple<String, String> parsed = md.parseRoutingAndTimestamp(XContentFactory.xContent(bytes).createParser(bytes), true, false);
assertThat(parsed.v1(), equalTo("routing_value"));
assertThat(parsed.v2(), equalTo(null));
}
@Test public void testParseTimestampAlone() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""),
new MappingMetaData.Routing(true, "routing"),
new MappingMetaData.Timestamp(true, "timestamp", "dateOptionalTime"));
byte[] bytes = jsonBuilder().startObject().field("field1", "value1").field("field2", "value2")
.field("routing", "routing_value").field("timestamp", "1").endObject().copiedBytes();
Tuple<String, String> parsed = md.parseRoutingAndTimestamp(XContentFactory.xContent(bytes).createParser(bytes), false, true);
assertThat(parsed.v1(), equalTo(null));
assertThat(parsed.v2(), equalTo("1"));
}
@Test public void testParseRoutingAndTimestamp() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""),
new MappingMetaData.Routing(true, "routing"),
new MappingMetaData.Timestamp(true, "timestamp", "dateOptionalTime"));
byte[] bytes = jsonBuilder().startObject().field("field1", "value1").field("field2", "value2")
.field("routing", "routing_value").field("timestamp", "1").endObject().copiedBytes();
Tuple<String, String> parsed = md.parseRoutingAndTimestamp(XContentFactory.xContent(bytes).createParser(bytes), true, true);
assertThat(parsed.v1(), equalTo("routing_value"));
assertThat(parsed.v2(), equalTo("1"));
}
@Test public void testParseRoutingAndTimestampWithPath() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""),
new MappingMetaData.Routing(true, "obj1.routing"),
new MappingMetaData.Timestamp(true, "obj2.timestamp", "dateOptionalTime"));
byte[] bytes = jsonBuilder().startObject().field("field1", "value1").field("field2", "value2")
.startObject("obj0").field("field1", "value1").field("field2", "value2").endObject()
.startObject("obj1").field("routing", "routing_value").endObject()
.startObject("obj2").field("timestamp", "1").endObject()
.endObject().copiedBytes();
Tuple<String, String> parsed = md.parseRoutingAndTimestamp(XContentFactory.xContent(bytes).createParser(bytes), true, true);
assertThat(parsed.v1(), equalTo("routing_value"));
assertThat(parsed.v2(), equalTo("1"));
}
@Test public void testParseRoutingAndTimestampWithinSamePath() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""),
new MappingMetaData.Routing(true, "obj1.routing"),
new MappingMetaData.Timestamp(true, "obj1.timestamp", "dateOptionalTime"));
byte[] bytes = jsonBuilder().startObject().field("field1", "value1").field("field2", "value2")
.startObject("obj0").field("field1", "value1").field("field2", "value2").endObject()
.startObject("obj1").field("routing", "routing_value").field("timestamp", "1").endObject()
.startObject("obj2").field("field1", "value1").endObject()
.endObject().copiedBytes();
Tuple<String, String> parsed = md.parseRoutingAndTimestamp(XContentFactory.xContent(bytes).createParser(bytes), true, true);
assertThat(parsed.v1(), equalTo("routing_value"));
assertThat(parsed.v2(), equalTo("1"));
}
@Test public void testParseRoutingAndTimestampWithinSamePathAndMoreLevels() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""),
new MappingMetaData.Routing(true, "obj1.obj2.routing"),
new MappingMetaData.Timestamp(true, "obj1.obj3.timestamp", "dateOptionalTime"));
byte[] bytes = jsonBuilder().startObject().field("field1", "value1").field("field2", "value2")
.startObject("obj0").field("field1", "value1").field("field2", "value2").endObject()
.startObject("obj1")
.startObject("obj2")
.field("routing", "routing_value")
.endObject()
.startObject("obj3")
.field("timestamp", "1")
.endObject()
.endObject()
.startObject("obj2").field("field1", "value1").endObject()
.endObject().copiedBytes();
Tuple<String, String> parsed = md.parseRoutingAndTimestamp(XContentFactory.xContent(bytes).createParser(bytes), true, true);
assertThat(parsed.v1(), equalTo("routing_value"));
assertThat(parsed.v2(), equalTo("1"));
}
@Test public void testParseRoutingAndTimestampWithSameRepeatedObject() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""),
new MappingMetaData.Routing(true, "obj1.routing"),
new MappingMetaData.Timestamp(true, "obj1.timestamp", "dateOptionalTime"));
byte[] bytes = jsonBuilder().startObject().field("field1", "value1").field("field2", "value2")
.startObject("obj0").field("field1", "value1").field("field2", "value2").endObject()
.startObject("obj1").field("routing", "routing_value").endObject()
.startObject("obj1").field("timestamp", "1").endObject()
.endObject().copiedBytes();
Tuple<String, String> parsed = md.parseRoutingAndTimestamp(XContentFactory.xContent(bytes).createParser(bytes), true, true);
assertThat(parsed.v1(), equalTo("routing_value"));
assertThat(parsed.v2(), equalTo("1"));
}
@Test public void testParseRoutingTimestampWithRepeatedField() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""),
new MappingMetaData.Routing(true, "field1.field1"),
new MappingMetaData.Timestamp(true, "field1", "dateOptionalTime"));
byte[] bytes = jsonBuilder().startObject()
.field("aaa", "wr")
.array("arr1", "1", "2", "3")
.field("field1", "foo")
.field("field1", "bar")
.field("test", "value")
.field("zzz", "wr")
.endObject().copiedBytes();
Tuple<String, String> parsed = md.parseRoutingAndTimestamp(XContentFactory.xContent(bytes).createParser(bytes), true, true);
assertThat(parsed.v1(), equalTo(null));
assertThat(parsed.v2(), equalTo("foo"));
}
@Test public void testParseRoutingWithRepeatedFieldAndObject() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""),
new MappingMetaData.Routing(true, "field1.field1.field2"),
new MappingMetaData.Timestamp(true, "field1", "dateOptionalTime"));
byte[] bytes = jsonBuilder().startObject()
.field("aaa", "wr")
.array("arr1", "1", "2", "3")
.field("field1", "foo")
.startObject("field1").field("field2", "bar").endObject()
.field("test", "value")
.field("zzz", "wr")
.endObject().copiedBytes();
Tuple<String, String> parsed = md.parseRoutingAndTimestamp(XContentFactory.xContent(bytes).createParser(bytes), true, true);
assertThat(parsed.v1(), equalTo(null));
assertThat(parsed.v2(), equalTo("foo"));
}
@Test public void testParseRoutingWithRepeatedFieldAndValidRouting() throws Exception {
MappingMetaData md = new MappingMetaData("type1", new CompressedString(""),
new MappingMetaData.Routing(true, "field1.field2"),
new MappingMetaData.Timestamp(true, "field1", "dateOptionalTime"));
byte[] bytes = jsonBuilder().startObject()
.field("aaa", "wr")
.array("arr1", "1", "2", "3")
.field("field1", "foo")
.startObject("field1").field("field2", "bar").endObject()
.field("test", "value")
.field("zzz", "wr")
.endObject().copiedBytes();
Tuple<String, String> parsed = md.parseRoutingAndTimestamp(XContentFactory.xContent(bytes).createParser(bytes), true, true);
assertThat(parsed.v1(), equalTo("bar"));
assertThat(parsed.v2(), equalTo("foo"));
}
}

View File

@ -138,10 +138,10 @@ public abstract class AbstractSimpleEngineTests {
assertThat(segments.isEmpty(), equalTo(true)); assertThat(segments.isEmpty(), equalTo(true));
// create a doc and refresh // create a doc and refresh
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc)); engine.create(new Engine.Create(null, newUid("1"), doc));
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2)); engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.refresh(new Engine.Refresh(true)); engine.refresh(new Engine.Refresh(true));
@ -162,7 +162,7 @@ public abstract class AbstractSimpleEngineTests {
assertThat(segments.get(0).deletedDocs(), equalTo(0)); assertThat(segments.get(0).deletedDocs(), equalTo(0));
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false); ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3)); engine.create(new Engine.Create(null, newUid("3"), doc3));
engine.refresh(new Engine.Refresh(true)); engine.refresh(new Engine.Refresh(true));
@ -202,7 +202,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release(); searchResult.release();
// create a document // create a document
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc)); engine.create(new Engine.Create(null, newUid("1"), doc));
// its not there... // its not there...
@ -236,7 +236,7 @@ public abstract class AbstractSimpleEngineTests {
assertThat(getResult.docIdAndVersion(), notNullValue()); assertThat(getResult.docIdAndVersion(), notNullValue());
// now do an update // now do an update
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).add(field(SourceFieldMapper.NAME, B_2, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_2, false); doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test1")).add(field(SourceFieldMapper.NAME, B_2, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.index(new Engine.Index(null, newUid("1"), doc)); engine.index(new Engine.Index(null, newUid("1"), doc));
// its not updated yet... // its not updated yet...
@ -285,7 +285,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release(); searchResult.release();
// add it back // add it back
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc)); engine.create(new Engine.Create(null, newUid("1"), doc));
// its not there... // its not there...
@ -317,7 +317,7 @@ public abstract class AbstractSimpleEngineTests {
// make sure we can still work with the engine // make sure we can still work with the engine
// now do an update // now do an update
doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.index(new Engine.Index(null, newUid("1"), doc)); engine.index(new Engine.Index(null, newUid("1"), doc));
// its not updated yet... // its not updated yet...
@ -345,7 +345,7 @@ public abstract class AbstractSimpleEngineTests {
searchResult.release(); searchResult.release();
// create a document // create a document
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc)); engine.create(new Engine.Create(null, newUid("1"), doc));
// its not there... // its not there...
@ -378,7 +378,7 @@ public abstract class AbstractSimpleEngineTests {
@Test public void testSimpleSnapshot() throws Exception { @Test public void testSimpleSnapshot() throws Exception {
// create a document // create a document
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc1)); engine.create(new Engine.Create(null, newUid("1"), doc1));
final ExecutorService executorService = Executors.newCachedThreadPool(); final ExecutorService executorService = Executors.newCachedThreadPool();
@ -394,10 +394,10 @@ public abstract class AbstractSimpleEngineTests {
Future<Object> future = executorService.submit(new Callable<Object>() { Future<Object> future = executorService.submit(new Callable<Object>() {
@Override public Object call() throws Exception { @Override public Object call() throws Exception {
engine.flush(new Engine.Flush()); engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2)); engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.flush(new Engine.Flush()); engine.flush(new Engine.Flush());
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false); ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3)); engine.create(new Engine.Create(null, newUid("3"), doc3));
return null; return null;
} }
@ -432,7 +432,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testSimpleRecover() throws Exception { @Test public void testSimpleRecover() throws Exception {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc)); engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(new Engine.Flush()); engine.flush(new Engine.Flush());
@ -473,10 +473,10 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2() throws Exception { @Test public void testRecoverWithOperationsBetweenPhase1AndPhase2() throws Exception {
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc1)); engine.create(new Engine.Create(null, newUid("1"), doc1));
engine.flush(new Engine.Flush()); engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2)); engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.recover(new Engine.RecoveryHandler() { engine.recover(new Engine.RecoveryHandler() {
@ -500,10 +500,10 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testRecoverWithOperationsBetweenPhase1AndPhase2AndPhase3() throws Exception { @Test public void testRecoverWithOperationsBetweenPhase1AndPhase2AndPhase3() throws Exception {
ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc1 = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc1)); engine.create(new Engine.Create(null, newUid("1"), doc1));
engine.flush(new Engine.Flush()); engine.flush(new Engine.Flush());
ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, -1, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2)); engine.create(new Engine.Create(null, newUid("2"), doc2));
engine.recover(new Engine.RecoveryHandler() { engine.recover(new Engine.RecoveryHandler() {
@ -517,7 +517,7 @@ public abstract class AbstractSimpleEngineTests {
assertThat(create.source(), equalTo(B_2)); assertThat(create.source(), equalTo(B_2));
// add for phase3 // add for phase3
ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false); ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, -1, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3)); engine.create(new Engine.Create(null, newUid("3"), doc3));
} }
@ -534,7 +534,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testVersioningNewCreate() { @Test public void testVersioningNewCreate() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(null, newUid("1"), doc); Engine.Create create = new Engine.Create(null, newUid("1"), doc);
engine.create(create); engine.create(create);
assertThat(create.version(), equalTo(1l)); assertThat(create.version(), equalTo(1l));
@ -545,7 +545,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testExternalVersioningNewCreate() { @Test public void testExternalVersioningNewCreate() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); Engine.Create create = new Engine.Create(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
engine.create(create); engine.create(create);
assertThat(create.version(), equalTo(12l)); assertThat(create.version(), equalTo(12l));
@ -556,7 +556,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testVersioningNewIndex() { @Test public void testVersioningNewIndex() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc); Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(1l)); assertThat(index.version(), equalTo(1l));
@ -567,7 +567,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testExternalVersioningNewIndex() { @Test public void testExternalVersioningNewIndex() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(12l)); assertThat(index.version(), equalTo(12l));
@ -578,7 +578,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testVersioningIndexConflict() { @Test public void testVersioningIndexConflict() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc); Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(1l)); assertThat(index.version(), equalTo(1l));
@ -606,7 +606,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testExternalVersioningIndexConflict() { @Test public void testExternalVersioningIndexConflict() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(12l)); assertThat(index.version(), equalTo(12l));
@ -625,7 +625,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testVersioningIndexConflictWithFlush() { @Test public void testVersioningIndexConflictWithFlush() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc); Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(1l)); assertThat(index.version(), equalTo(1l));
@ -655,7 +655,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testExternalVersioningIndexConflictWithFlush() { @Test public void testExternalVersioningIndexConflictWithFlush() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(12l)); assertThat(index.version(), equalTo(12l));
@ -676,7 +676,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testVersioningDeleteConflict() { @Test public void testVersioningDeleteConflict() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc); Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(1l)); assertThat(index.version(), equalTo(1l));
@ -726,7 +726,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testVersioningDeleteConflictWithFlush() { @Test public void testVersioningDeleteConflictWithFlush() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc); Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(1l)); assertThat(index.version(), equalTo(1l));
@ -782,7 +782,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testVersioningCreateExistsException() { @Test public void testVersioningCreateExistsException() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(null, newUid("1"), doc); Engine.Create create = new Engine.Create(null, newUid("1"), doc);
engine.create(create); engine.create(create);
assertThat(create.version(), equalTo(1l)); assertThat(create.version(), equalTo(1l));
@ -797,7 +797,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testVersioningCreateExistsExceptionWithFlush() { @Test public void testVersioningCreateExistsExceptionWithFlush() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(null, newUid("1"), doc); Engine.Create create = new Engine.Create(null, newUid("1"), doc);
engine.create(create); engine.create(create);
assertThat(create.version(), equalTo(1l)); assertThat(create.version(), equalTo(1l));
@ -814,7 +814,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testVersioningReplicaConflict1() { @Test public void testVersioningReplicaConflict1() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc); Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(1l)); assertThat(index.version(), equalTo(1l));
@ -848,7 +848,7 @@ public abstract class AbstractSimpleEngineTests {
} }
@Test public void testVersioningReplicaConflict2() { @Test public void testVersioningReplicaConflict2() {
ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = new ParsedDocument("1", "1", "test", null, -1, doc().add(uidField("1")).build(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc); Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(1l)); assertThat(index.version(), equalTo(1l));

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper.timestamp;
import org.apache.lucene.document.Field;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperTests;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.testng.annotations.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
/**
* @author paikan (benjamin.deveze)
*/
public class TimestampMappingTests {
@Test public void testSimpleDisabled() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type").endObject().string();
DocumentMapper docMapper = MapperTests.newParser().parse(mapping);
byte[] source = XContentFactory.jsonBuilder()
.startObject()
.field("field", "value")
.endObject()
.copiedBytes();
ParsedDocument doc = docMapper.parse(SourceToParse.source(source).type("type").id("1").timestamp(1));
assertThat(doc.rootDoc().getFieldable("_timestamp"), equalTo(null));
}
@Test public void testEnabled() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("_timestamp").field("enabled", "yes").field("store", "yes").endObject()
.endObject().endObject().string();
DocumentMapper docMapper = MapperTests.newParser().parse(mapping);
byte[] source = XContentFactory.jsonBuilder()
.startObject()
.field("field", "value")
.endObject()
.copiedBytes();
ParsedDocument doc = docMapper.parse(SourceToParse.source(source).type("type").id("1").timestamp(1));
assertThat(doc.rootDoc().getFieldable("_timestamp").isStored(), equalTo(true));
assertThat(doc.rootDoc().getFieldable("_timestamp").isIndexed(), equalTo(true));
assertThat(doc.rootDoc().getFieldable("_timestamp").tokenStreamValue(), notNullValue());
}
@Test public void testDefaultValues() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type").endObject().string();
DocumentMapper docMapper = MapperTests.newParser().parse(mapping);
assertThat(docMapper.timestampFieldMapper().enabled(), equalTo(TimestampFieldMapper.Defaults.ENABLED));
assertThat(docMapper.timestampFieldMapper().store(), equalTo(TimestampFieldMapper.Defaults.STORE));
assertThat(docMapper.timestampFieldMapper().index(), equalTo(TimestampFieldMapper.Defaults.INDEX));
assertThat(docMapper.timestampFieldMapper().path(), equalTo(null));
assertThat(docMapper.timestampFieldMapper().dateTimeFormatter().format(), equalTo(TimestampFieldMapper.DEFAULT_DATE_TIME_FORMAT));
}
@Test public void testSetValues() throws Exception {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("_timestamp")
.field("enabled", "yes").field("store", "yes").field("index", "no")
.field("path", "timestamp").field("format", "year")
.endObject()
.endObject().endObject().string();
DocumentMapper docMapper = MapperTests.newParser().parse(mapping);
assertThat(docMapper.timestampFieldMapper().enabled(), equalTo(true));
assertThat(docMapper.timestampFieldMapper().store(), equalTo(Field.Store.YES));
assertThat(docMapper.timestampFieldMapper().index(), equalTo(Field.Index.NO));
assertThat(docMapper.timestampFieldMapper().path(), equalTo("timestamp"));
assertThat(docMapper.timestampFieldMapper().dateTimeFormatter().format(), equalTo("year"));
}
}