From ba352e0dc6f3078344a3bf1bec2c1e4233357848 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 29 Aug 2011 17:46:01 +0300 Subject: [PATCH] cleanup timestamp work --- .../action/bulk/TransportBulkAction.java | 42 ++----- .../action/bulk/TransportShardBulkAction.java | 4 +- .../elasticsearch/action/get/GetField.java | 11 ++ .../action/index/IndexRequest.java | 105 +++++++----------- .../action/index/TransportIndexAction.java | 45 +++----- .../action/index/IndexRequestBuilder.java | 8 ++ .../cluster/metadata/MappingMetaData.java | 49 +++++--- .../index/mapper/SourceToParse.java | 5 + .../mapper/internal/TimestampFieldMapper.java | 8 +- .../timestamp/SimpleTimestampTests.java | 96 ++++++++++++++++ 10 files changed, 224 insertions(+), 149 deletions(-) create mode 100644 modules/test/integration/src/test/java/org/elasticsearch/test/integration/timestamp/SimpleTimestampTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index cb1d89e3c73..6249bd54179 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.bulk; -import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -33,6 +32,7 @@ import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.UUID; @@ -41,7 +41,6 @@ import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.threadpool.ThreadPool; @@ -136,11 +135,10 @@ public class TransportBulkAction extends BaseAction { private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener listener) { ClusterState clusterState = clusterService.state(); + MetaData metaData = clusterState.metaData(); for (ActionRequest request : bulkRequest.requests) { if (request instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) request; - indexRequest.routing(clusterState.metaData().resolveIndexRouting(indexRequest.routing(), indexRequest.index())); - indexRequest.index(clusterState.metaData().concreteIndex(indexRequest.index())); if (allowIdGeneration) { if (indexRequest.id() == null) { indexRequest.id(UUID.randomBase64UUID()); @@ -148,6 +146,15 @@ public class TransportBulkAction extends BaseAction { indexRequest.opType(IndexRequest.OpType.CREATE); } } + + String aliasOrIndex = indexRequest.index(); + indexRequest.index(clusterState.metaData().concreteIndex(indexRequest.index())); + + MappingMetaData mappingMd = null; + if (metaData.hasIndex(indexRequest.index())) { + mappingMd = metaData.index(indexRequest.index()).mapping(indexRequest.type()); + } + indexRequest.process(metaData, aliasOrIndex, mappingMd); } else if (request instanceof DeleteRequest) { DeleteRequest deleteRequest = (DeleteRequest) request; deleteRequest.index(clusterState.metaData().concreteIndex(deleteRequest.index())); @@ -162,33 +169,6 @@ public class TransportBulkAction extends BaseAction { ActionRequest request = bulkRequest.requests.get(i); if (request instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) request; - // handle routing & timestamp - boolean needToParseExternalTimestamp = indexRequest.timestamp() != null; - MappingMetaData mappingMd = clusterState.metaData().index(indexRequest.index()).mapping(indexRequest.type()); - if (mappingMd != null) { - try { - if (needToParseExternalTimestamp) { - indexRequest.parseStringTimestamp(indexRequest.timestamp(), mappingMd.tsDateTimeFormatter()); - needToParseExternalTimestamp = false; - } - indexRequest.processRoutingAndTimestamp(mappingMd); - } catch (ElasticSearchException e) { - responses[i] = new BulkItemResponse(i, indexRequest.opType().toString().toLowerCase(), - new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e.getDetailedMessage())); - continue; - } - } - - // Try to parse external timestamp if necessary with no mapping - if (needToParseExternalTimestamp) { - indexRequest.parseStringTimestamp(indexRequest.timestamp(), TimestampFieldMapper.Defaults.DATE_TIME_FORMATTER); - } - - // The timestamp has not been set neither externally nor in the source doc so we generate it - if (indexRequest.timestamp() == null) { - indexRequest.generateTimestamp(); - } - ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId(); List list = requestsByShard.get(shardId); if (list == null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 506ae39fa60..f0b91e50edb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -123,7 +123,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()) - .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.getParsedTimestamp()); + .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()); long version; Engine.IndexingOperation op; if (indexRequest.opType() == IndexRequest.OpType.INDEX) { @@ -232,7 +232,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation IndexRequest indexRequest = (IndexRequest) item.request(); try { SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()) - .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.getParsedTimestamp()); + .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()); if (indexRequest.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA); indexShard.index(index); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetField.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetField.java index 18be5731524..646b249fa43 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetField.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetField.java @@ -55,6 +55,17 @@ public class GetField implements Streamable, Iterable { return name; } + public Object value() { + if (values != null && !values.isEmpty()) { + return values.get(0); + } + return null; + } + + public Object getValue() { + return value(); + } + public List values() { return values; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 2e1c6e7aa71..aaecf960ea3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -26,30 +26,28 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.RoutingMissingException; -import org.elasticsearch.action.TimestampParsingException; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Required; import org.elasticsearch.common.Unicode; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.joda.FormatDateTimeFormatter; -import org.elasticsearch.common.joda.Joda; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import java.io.IOException; import java.util.Arrays; -import java.util.Date; import java.util.Map; import static org.elasticsearch.action.Actions.*; @@ -282,6 +280,9 @@ public class IndexRequest extends ShardReplicationOperationRequest { return this.parent; } + /** + * Sets the timestamp either as millis since the epoch, or, in the configured date format. + */ public IndexRequest timestamp(String timestamp) { this.timestamp = timestamp; return this; @@ -291,38 +292,6 @@ public class IndexRequest extends ShardReplicationOperationRequest { return this.timestamp; } - public void parseStringTimestamp(String timestampAsString, FormatDateTimeFormatter dateTimeFormatter) { - long ts; - try { - ts = Long.parseLong(timestampAsString); - } catch (NumberFormatException e) { - try { - ts = dateTimeFormatter.parser().parseMillis(timestampAsString); - } catch (RuntimeException e1) { - throw new TimestampParsingException(timestampAsString); - } - } - timestamp = String.valueOf(ts); - } - - public long getParsedTimestamp() { - if (timestamp != null) { - try { - return Long.parseLong(timestamp); - } catch (NumberFormatException e1) { - - } - } - - // The timestamp is always set as a parsable long - return -1; - } - - public IndexRequest generateTimestamp() { - timestamp = String.valueOf(new Date().getTime()); - return this; - } - /** * The source of the document to index, recopied to a new array if it has an offset or unsafe. */ @@ -609,37 +578,47 @@ public class IndexRequest extends ShardReplicationOperationRequest { return this.percolate; } - public void processRoutingAndTimestamp(MappingMetaData mappingMd) throws ElasticSearchException { - boolean shouldParseRouting = (routing == null && mappingMd.routing().hasPath()); - boolean shouldParseTimestamp = (timestamp == null && mappingMd.timestamp().hasPath()); + public void process(MetaData metaData, String aliasOrIndex, @Nullable MappingMetaData mappingMd) throws ElasticSearchException { + // resolve the routing if needed + routing(metaData.resolveIndexRouting(routing, aliasOrIndex)); + // resolve timestamp if provided externally + if (timestamp != null) { + timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, + mappingMd != null ? mappingMd.timestamp().dateTimeFormatter() : TimestampFieldMapper.Defaults.DATE_TIME_FORMATTER); + } + // extract values if needed + if (mappingMd != null) { + boolean shouldParseRouting = (routing == null && mappingMd.routing().hasPath()); + boolean shouldParseTimestamp = (timestamp == null && mappingMd.timestamp().hasPath()); - if (shouldParseRouting || shouldParseTimestamp) { - XContentParser parser = null; - try { - parser = XContentFactory.xContent(source, sourceOffset, sourceLength) - .createParser(source, sourceOffset, sourceLength); - Tuple parseResult = mappingMd.parseRoutingAndTimestamp(parser, shouldParseRouting, shouldParseTimestamp); - if (shouldParseRouting) { - routing = parseResult.v1(); - } - if (shouldParseTimestamp) { - timestamp = parseResult.v2(); - } - } catch (Exception e) { - throw new ElasticSearchParseException("failed to parse doc to extract routing/timestamp", e); - } finally { - if (parser != null) { - parser.close(); + if (shouldParseRouting || shouldParseTimestamp) { + XContentParser parser = null; + try { + parser = XContentFactory.xContent(source, sourceOffset, sourceLength).createParser(source, sourceOffset, sourceLength); + Tuple parseResult = mappingMd.parseRoutingAndTimestamp(parser, shouldParseRouting, shouldParseTimestamp); + if (shouldParseRouting) { + routing = parseResult.v1(); + } + if (shouldParseTimestamp) { + timestamp = parseResult.v2(); + timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, mappingMd.timestamp().dateTimeFormatter()); + } + } catch (Exception e) { + throw new ElasticSearchParseException("failed to parse doc to extract routing/timestamp", e); + } finally { + if (parser != null) { + parser.close(); + } } } + // might as well check for routing here + if (mappingMd.routing().required() && routing == null) { + throw new RoutingMissingException(index, type, id); + } } - // might as well check for routing here - if (mappingMd.routing().required() && routing == null) { - throw new RoutingMissingException(index, type, id); - } - // Process parsed timestamp found in source - if (shouldParseTimestamp && timestamp != null) { - parseStringTimestamp(timestamp, mappingMd.tsDateTimeFormatter()); + // generate timestamp if not provided, we always have one post this stage... + if (timestamp == null) { + timestamp = String.valueOf(System.currentTimeMillis()); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 0d18c79eb56..ecc2d81f6fa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -43,7 +43,6 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; -import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.percolator.PercolatorExecutor; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.service.IndexShard; @@ -53,7 +52,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -93,13 +91,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } @Override protected void doExecute(final IndexRequest request, final ActionListener listener) { - if (allowIdGeneration) { - if (request.id() == null) { - request.id(UUID.randomBase64UUID()); - // since we generate the id, change it to CREATE - request.opType(IndexRequest.OpType.CREATE); - } - } if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(request.index())) { request.beforeLocalFork(); // we fork on another thread... createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(index api)"), new ActionListener() { @@ -126,30 +117,22 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } private void innerExecute(final IndexRequest request, final ActionListener listener) { - boolean needToParseExternalTimestamp = request.timestamp() != null; - MetaData metaData = clusterService.state().metaData(); - request.routing(metaData.resolveIndexRouting(request.routing(), request.index())); - request.index(metaData.concreteIndex(request.index())); - if (metaData.hasIndex(request.index())) { - MappingMetaData mappingMd = metaData.index(request.index()).mapping(request.type()); - if (mappingMd != null) { - // Try to parse externally provided timestamp if necessary (with a mapping) - if (needToParseExternalTimestamp) { - request.parseStringTimestamp(request.timestamp(), mappingMd.tsDateTimeFormatter()); - needToParseExternalTimestamp = false; // parseTimestamp throws an exception if something gone wrong - } - request.processRoutingAndTimestamp(mappingMd); + if (allowIdGeneration) { + if (request.id() == null) { + request.id(UUID.randomBase64UUID()); + // since we generate the id, change it to CREATE + request.opType(IndexRequest.OpType.CREATE); } } - // Try to parse external timestamp if necessary (without mapping) - if (needToParseExternalTimestamp) { - request.parseStringTimestamp(request.timestamp(), TimestampFieldMapper.Defaults.DATE_TIME_FORMATTER); - } - // The timestamp has not been sets neither externally nor in the source doc so we generate it - if (request.timestamp() == null) { - request.generateTimestamp(); + MetaData metaData = clusterService.state().metaData(); + String aliasOrIndex = request.index(); + request.index(metaData.concreteIndex(request.index())); + MappingMetaData mappingMd = null; + if (metaData.hasIndex(request.index())) { + mappingMd = metaData.index(request.index()).mapping(request.type()); } + request.process(metaData, aliasOrIndex, mappingMd); super.doExecute(request, listener); } @@ -195,7 +178,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi IndexShard indexShard = indexShard(shardRequest); SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()) - .routing(request.routing()).parent(request.parent()).timestamp(request.getParsedTimestamp()); + .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()); long version; Engine.IndexingOperation op; if (request.opType() == IndexRequest.OpType.INDEX) { @@ -250,7 +233,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi IndexShard indexShard = indexShard(shardRequest); IndexRequest request = shardRequest.request; SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()) - .routing(request.routing()).parent(request.parent()).timestamp(request.getParsedTimestamp()); + .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()); if (request.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse) .version(request.version()) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java index 3ce17f387f9..8b914b77d7a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java @@ -303,6 +303,14 @@ public class IndexRequestBuilder extends BaseRequestBuilder mapping) throws IOException { @@ -185,7 +204,6 @@ public class MappingMetaData { } else { this.timestamp = Timestamp.EMPTY; } - this.tsDateTimeFormatter = Joda.forPattern(timestamp.format()); } MappingMetaData(String type, CompressedString source, Routing routing, Timestamp timestamp) { @@ -193,7 +211,6 @@ public class MappingMetaData { this.source = source; this.routing = routing; this.timestamp = timestamp; - this.tsDateTimeFormatter = Joda.forPattern(timestamp.format()); } public String type() { @@ -212,23 +229,19 @@ public class MappingMetaData { return this.timestamp; } - public FormatDateTimeFormatter tsDateTimeFormatter() { - return this.tsDateTimeFormatter; - } - public Tuple parseRoutingAndTimestamp(XContentParser parser, - boolean shouldParseRouting, - boolean shouldParseTimestamp) throws IOException { + boolean shouldParseRouting, + boolean shouldParseTimestamp) throws IOException { return parseRoutingAndTimestamp(parser, 0, 0, null, null, shouldParseRouting, shouldParseTimestamp); } private Tuple parseRoutingAndTimestamp(XContentParser parser, - int locationRouting, - int locationTimestamp, - @Nullable String routingValue, - @Nullable String timestampValue, - boolean shouldParseRouting, - boolean shouldParseTimestamp) throws IOException { + int locationRouting, + int locationTimestamp, + @Nullable String routingValue, + @Nullable String timestampValue, + boolean shouldParseRouting, + boolean shouldParseTimestamp) throws IOException { XContentParser.Token t = parser.currentToken(); if (t == null) { t = parser.nextToken(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java index 7731e450ae2..f20a750ebcd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java @@ -117,6 +117,11 @@ public class SourceToParse { return this.timestamp; } + public SourceToParse timestamp(String timestamp) { + this.timestamp = Long.parseLong(timestamp); + return this; + } + public SourceToParse timestamp(long timestamp) { this.timestamp = timestamp; return this; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/TimestampFieldMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/TimestampFieldMapper.java index 5b33c12eef3..bb66f57caef 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/TimestampFieldMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/TimestampFieldMapper.java @@ -180,10 +180,10 @@ public class TimestampFieldMapper extends DateFieldMapper implements InternalMap @Override protected Fieldable parseCreateField(ParseContext context) throws IOException { if (enabled) { long timestamp = context.sourceToParse().timestamp(); - if (!indexed() && !stored()) { - context.ignoredValue(names.indexName(), String.valueOf(timestamp)); - return null; - } + if (!indexed() && !stored()) { + context.ignoredValue(names.indexName(), String.valueOf(timestamp)); + return null; + } return new LongFieldMapper.CustomLongNumericField(this, timestamp); } return null; diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/timestamp/SimpleTimestampTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/timestamp/SimpleTimestampTests.java new file mode 100644 index 00000000000..f918d418b69 --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/timestamp/SimpleTimestampTests.java @@ -0,0 +1,96 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.timestamp; + +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + */ +public class SimpleTimestampTests extends AbstractNodesTests { + + private Client client; + + @BeforeClass public void createNodes() throws Exception { + startNode("node1"); + startNode("node2"); + client = getClient(); + } + + @AfterClass public void closeNodes() { + client.close(); + closeAllNodes(); + } + + protected Client getClient() { + return client("node1"); + } + + @Test public void testSimpleTimestamp() throws Exception { + client.admin().indices().prepareDelete().execute().actionGet(); + + client.admin().indices().prepareCreate("test") + .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_timestamp").field("enabled", true).field("store", "yes").endObject().endObject().endObject()) + .execute().actionGet(); + client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + + logger.info("--> check with automatic timestamp"); + long now1 = System.currentTimeMillis(); + client.prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet(); + long now2 = System.currentTimeMillis(); + + // we need to add support for fetching _timestamp from the translog in realtime case... + GetResponse getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet(); + long timestamp = ((Number) getResponse.field("_timestamp").value()).longValue(); + assertThat(timestamp, greaterThanOrEqualTo(now1)); + assertThat(timestamp, lessThanOrEqualTo(now2)); + // verify its the same timestamp when going the replica + getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet(); + assertThat(((Number) getResponse.field("_timestamp").value()).longValue(), equalTo(timestamp)); + + logger.info("--> check with custom timestamp (numeric)"); + client.prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimestamp("10").setRefresh(true).execute().actionGet(); + + getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet(); + timestamp = ((Number) getResponse.field("_timestamp").value()).longValue(); + assertThat(timestamp, equalTo(10l)); + // verify its the same timestamp when going the replica + getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet(); + assertThat(((Number) getResponse.field("_timestamp").value()).longValue(), equalTo(timestamp)); + + logger.info("--> check with custom timestamp (string)"); + client.prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimestamp("1970-01-01T00:00:00.020").setRefresh(true).execute().actionGet(); + + getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet(); + timestamp = ((Number) getResponse.field("_timestamp").value()).longValue(); + assertThat(timestamp, equalTo(20l)); + // verify its the same timestamp when going the replica + getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet(); + assertThat(((Number) getResponse.field("_timestamp").value()).longValue(), equalTo(timestamp)); + } +}