cleanup timestamp work
This commit is contained in:
parent
bb02f19f88
commit
ba352e0dc6
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.bulk;
|
package org.elasticsearch.action.bulk;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticSearchException;
|
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.ActionRequest;
|
||||||
|
@ -33,6 +32,7 @@ import org.elasticsearch.action.support.BaseAction;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||||
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||||
import org.elasticsearch.common.UUID;
|
import org.elasticsearch.common.UUID;
|
||||||
|
@ -41,7 +41,6 @@ import org.elasticsearch.common.collect.Maps;
|
||||||
import org.elasticsearch.common.collect.Sets;
|
import org.elasticsearch.common.collect.Sets;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
|
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
@ -136,11 +135,10 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
|
||||||
|
|
||||||
private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener) {
|
private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener) {
|
||||||
ClusterState clusterState = clusterService.state();
|
ClusterState clusterState = clusterService.state();
|
||||||
|
MetaData metaData = clusterState.metaData();
|
||||||
for (ActionRequest request : bulkRequest.requests) {
|
for (ActionRequest request : bulkRequest.requests) {
|
||||||
if (request instanceof IndexRequest) {
|
if (request instanceof IndexRequest) {
|
||||||
IndexRequest indexRequest = (IndexRequest) request;
|
IndexRequest indexRequest = (IndexRequest) request;
|
||||||
indexRequest.routing(clusterState.metaData().resolveIndexRouting(indexRequest.routing(), indexRequest.index()));
|
|
||||||
indexRequest.index(clusterState.metaData().concreteIndex(indexRequest.index()));
|
|
||||||
if (allowIdGeneration) {
|
if (allowIdGeneration) {
|
||||||
if (indexRequest.id() == null) {
|
if (indexRequest.id() == null) {
|
||||||
indexRequest.id(UUID.randomBase64UUID());
|
indexRequest.id(UUID.randomBase64UUID());
|
||||||
|
@ -148,6 +146,15 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
|
||||||
indexRequest.opType(IndexRequest.OpType.CREATE);
|
indexRequest.opType(IndexRequest.OpType.CREATE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String aliasOrIndex = indexRequest.index();
|
||||||
|
indexRequest.index(clusterState.metaData().concreteIndex(indexRequest.index()));
|
||||||
|
|
||||||
|
MappingMetaData mappingMd = null;
|
||||||
|
if (metaData.hasIndex(indexRequest.index())) {
|
||||||
|
mappingMd = metaData.index(indexRequest.index()).mapping(indexRequest.type());
|
||||||
|
}
|
||||||
|
indexRequest.process(metaData, aliasOrIndex, mappingMd);
|
||||||
} else if (request instanceof DeleteRequest) {
|
} else if (request instanceof DeleteRequest) {
|
||||||
DeleteRequest deleteRequest = (DeleteRequest) request;
|
DeleteRequest deleteRequest = (DeleteRequest) request;
|
||||||
deleteRequest.index(clusterState.metaData().concreteIndex(deleteRequest.index()));
|
deleteRequest.index(clusterState.metaData().concreteIndex(deleteRequest.index()));
|
||||||
|
@ -162,33 +169,6 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
|
||||||
ActionRequest request = bulkRequest.requests.get(i);
|
ActionRequest request = bulkRequest.requests.get(i);
|
||||||
if (request instanceof IndexRequest) {
|
if (request instanceof IndexRequest) {
|
||||||
IndexRequest indexRequest = (IndexRequest) request;
|
IndexRequest indexRequest = (IndexRequest) request;
|
||||||
// handle routing & timestamp
|
|
||||||
boolean needToParseExternalTimestamp = indexRequest.timestamp() != null;
|
|
||||||
MappingMetaData mappingMd = clusterState.metaData().index(indexRequest.index()).mapping(indexRequest.type());
|
|
||||||
if (mappingMd != null) {
|
|
||||||
try {
|
|
||||||
if (needToParseExternalTimestamp) {
|
|
||||||
indexRequest.parseStringTimestamp(indexRequest.timestamp(), mappingMd.tsDateTimeFormatter());
|
|
||||||
needToParseExternalTimestamp = false;
|
|
||||||
}
|
|
||||||
indexRequest.processRoutingAndTimestamp(mappingMd);
|
|
||||||
} catch (ElasticSearchException e) {
|
|
||||||
responses[i] = new BulkItemResponse(i, indexRequest.opType().toString().toLowerCase(),
|
|
||||||
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e.getDetailedMessage()));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to parse external timestamp if necessary with no mapping
|
|
||||||
if (needToParseExternalTimestamp) {
|
|
||||||
indexRequest.parseStringTimestamp(indexRequest.timestamp(), TimestampFieldMapper.Defaults.DATE_TIME_FORMATTER);
|
|
||||||
}
|
|
||||||
|
|
||||||
// The timestamp has not been set neither externally nor in the source doc so we generate it
|
|
||||||
if (indexRequest.timestamp() == null) {
|
|
||||||
indexRequest.generateTimestamp();
|
|
||||||
}
|
|
||||||
|
|
||||||
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
|
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
|
||||||
List<BulkItemRequest> list = requestsByShard.get(shardId);
|
List<BulkItemRequest> list = requestsByShard.get(shardId);
|
||||||
if (list == null) {
|
if (list == null) {
|
||||||
|
|
|
@ -123,7 +123,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
}
|
}
|
||||||
|
|
||||||
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
|
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
|
||||||
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.getParsedTimestamp());
|
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp());
|
||||||
long version;
|
long version;
|
||||||
Engine.IndexingOperation op;
|
Engine.IndexingOperation op;
|
||||||
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
|
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
|
||||||
|
@ -232,7 +232,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
IndexRequest indexRequest = (IndexRequest) item.request();
|
IndexRequest indexRequest = (IndexRequest) item.request();
|
||||||
try {
|
try {
|
||||||
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
|
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
|
||||||
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.getParsedTimestamp());
|
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp());
|
||||||
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
|
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
|
||||||
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
|
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
|
||||||
indexShard.index(index);
|
indexShard.index(index);
|
||||||
|
|
|
@ -55,6 +55,17 @@ public class GetField implements Streamable, Iterable<Object> {
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Object value() {
|
||||||
|
if (values != null && !values.isEmpty()) {
|
||||||
|
return values.get(0);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object getValue() {
|
||||||
|
return value();
|
||||||
|
}
|
||||||
|
|
||||||
public List<Object> values() {
|
public List<Object> values() {
|
||||||
return values;
|
return values;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,30 +26,28 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||||
import org.elasticsearch.ElasticSearchParseException;
|
import org.elasticsearch.ElasticSearchParseException;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
import org.elasticsearch.action.RoutingMissingException;
|
import org.elasticsearch.action.RoutingMissingException;
|
||||||
import org.elasticsearch.action.TimestampParsingException;
|
|
||||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||||
import org.elasticsearch.action.support.replication.ReplicationType;
|
import org.elasticsearch.action.support.replication.ReplicationType;
|
||||||
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
|
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
|
||||||
import org.elasticsearch.client.Requests;
|
import org.elasticsearch.client.Requests;
|
||||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||||
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.Required;
|
import org.elasticsearch.common.Required;
|
||||||
import org.elasticsearch.common.Unicode;
|
import org.elasticsearch.common.Unicode;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
|
|
||||||
import org.elasticsearch.common.joda.Joda;
|
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.VersionType;
|
||||||
|
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Date;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.elasticsearch.action.Actions.*;
|
import static org.elasticsearch.action.Actions.*;
|
||||||
|
@ -282,6 +280,9 @@ public class IndexRequest extends ShardReplicationOperationRequest {
|
||||||
return this.parent;
|
return this.parent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the timestamp either as millis since the epoch, or, in the configured date format.
|
||||||
|
*/
|
||||||
public IndexRequest timestamp(String timestamp) {
|
public IndexRequest timestamp(String timestamp) {
|
||||||
this.timestamp = timestamp;
|
this.timestamp = timestamp;
|
||||||
return this;
|
return this;
|
||||||
|
@ -291,38 +292,6 @@ public class IndexRequest extends ShardReplicationOperationRequest {
|
||||||
return this.timestamp;
|
return this.timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void parseStringTimestamp(String timestampAsString, FormatDateTimeFormatter dateTimeFormatter) {
|
|
||||||
long ts;
|
|
||||||
try {
|
|
||||||
ts = Long.parseLong(timestampAsString);
|
|
||||||
} catch (NumberFormatException e) {
|
|
||||||
try {
|
|
||||||
ts = dateTimeFormatter.parser().parseMillis(timestampAsString);
|
|
||||||
} catch (RuntimeException e1) {
|
|
||||||
throw new TimestampParsingException(timestampAsString);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
timestamp = String.valueOf(ts);
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getParsedTimestamp() {
|
|
||||||
if (timestamp != null) {
|
|
||||||
try {
|
|
||||||
return Long.parseLong(timestamp);
|
|
||||||
} catch (NumberFormatException e1) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The timestamp is always set as a parsable long
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
public IndexRequest generateTimestamp() {
|
|
||||||
timestamp = String.valueOf(new Date().getTime());
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The source of the document to index, recopied to a new array if it has an offset or unsafe.
|
* The source of the document to index, recopied to a new array if it has an offset or unsafe.
|
||||||
*/
|
*/
|
||||||
|
@ -609,37 +578,47 @@ public class IndexRequest extends ShardReplicationOperationRequest {
|
||||||
return this.percolate;
|
return this.percolate;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void processRoutingAndTimestamp(MappingMetaData mappingMd) throws ElasticSearchException {
|
public void process(MetaData metaData, String aliasOrIndex, @Nullable MappingMetaData mappingMd) throws ElasticSearchException {
|
||||||
boolean shouldParseRouting = (routing == null && mappingMd.routing().hasPath());
|
// resolve the routing if needed
|
||||||
boolean shouldParseTimestamp = (timestamp == null && mappingMd.timestamp().hasPath());
|
routing(metaData.resolveIndexRouting(routing, aliasOrIndex));
|
||||||
|
// resolve timestamp if provided externally
|
||||||
|
if (timestamp != null) {
|
||||||
|
timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp,
|
||||||
|
mappingMd != null ? mappingMd.timestamp().dateTimeFormatter() : TimestampFieldMapper.Defaults.DATE_TIME_FORMATTER);
|
||||||
|
}
|
||||||
|
// extract values if needed
|
||||||
|
if (mappingMd != null) {
|
||||||
|
boolean shouldParseRouting = (routing == null && mappingMd.routing().hasPath());
|
||||||
|
boolean shouldParseTimestamp = (timestamp == null && mappingMd.timestamp().hasPath());
|
||||||
|
|
||||||
if (shouldParseRouting || shouldParseTimestamp) {
|
if (shouldParseRouting || shouldParseTimestamp) {
|
||||||
XContentParser parser = null;
|
XContentParser parser = null;
|
||||||
try {
|
try {
|
||||||
parser = XContentFactory.xContent(source, sourceOffset, sourceLength)
|
parser = XContentFactory.xContent(source, sourceOffset, sourceLength).createParser(source, sourceOffset, sourceLength);
|
||||||
.createParser(source, sourceOffset, sourceLength);
|
Tuple<String, String> parseResult = mappingMd.parseRoutingAndTimestamp(parser, shouldParseRouting, shouldParseTimestamp);
|
||||||
Tuple<String, String> parseResult = mappingMd.parseRoutingAndTimestamp(parser, shouldParseRouting, shouldParseTimestamp);
|
if (shouldParseRouting) {
|
||||||
if (shouldParseRouting) {
|
routing = parseResult.v1();
|
||||||
routing = parseResult.v1();
|
}
|
||||||
}
|
if (shouldParseTimestamp) {
|
||||||
if (shouldParseTimestamp) {
|
timestamp = parseResult.v2();
|
||||||
timestamp = parseResult.v2();
|
timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, mappingMd.timestamp().dateTimeFormatter());
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new ElasticSearchParseException("failed to parse doc to extract routing/timestamp", e);
|
throw new ElasticSearchParseException("failed to parse doc to extract routing/timestamp", e);
|
||||||
} finally {
|
} finally {
|
||||||
if (parser != null) {
|
if (parser != null) {
|
||||||
parser.close();
|
parser.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// might as well check for routing here
|
||||||
|
if (mappingMd.routing().required() && routing == null) {
|
||||||
|
throw new RoutingMissingException(index, type, id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// might as well check for routing here
|
// generate timestamp if not provided, we always have one post this stage...
|
||||||
if (mappingMd.routing().required() && routing == null) {
|
if (timestamp == null) {
|
||||||
throw new RoutingMissingException(index, type, id);
|
timestamp = String.valueOf(System.currentTimeMillis());
|
||||||
}
|
|
||||||
// Process parsed timestamp found in source
|
|
||||||
if (shouldParseTimestamp && timestamp != null) {
|
|
||||||
parseStringTimestamp(timestamp, mappingMd.tsDateTimeFormatter());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,6 @@ import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||||
import org.elasticsearch.index.mapper.MapperService;
|
import org.elasticsearch.index.mapper.MapperService;
|
||||||
import org.elasticsearch.index.mapper.SourceToParse;
|
import org.elasticsearch.index.mapper.SourceToParse;
|
||||||
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
|
|
||||||
import org.elasticsearch.index.percolator.PercolatorExecutor;
|
import org.elasticsearch.index.percolator.PercolatorExecutor;
|
||||||
import org.elasticsearch.index.service.IndexService;
|
import org.elasticsearch.index.service.IndexService;
|
||||||
import org.elasticsearch.index.shard.service.IndexShard;
|
import org.elasticsearch.index.shard.service.IndexShard;
|
||||||
|
@ -53,7 +52,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -93,13 +91,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
|
@Override protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
|
||||||
if (allowIdGeneration) {
|
|
||||||
if (request.id() == null) {
|
|
||||||
request.id(UUID.randomBase64UUID());
|
|
||||||
// since we generate the id, change it to CREATE
|
|
||||||
request.opType(IndexRequest.OpType.CREATE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(request.index())) {
|
if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(request.index())) {
|
||||||
request.beforeLocalFork(); // we fork on another thread...
|
request.beforeLocalFork(); // we fork on another thread...
|
||||||
createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(index api)"), new ActionListener<CreateIndexResponse>() {
|
createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(index api)"), new ActionListener<CreateIndexResponse>() {
|
||||||
|
@ -126,30 +117,22 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||||
}
|
}
|
||||||
|
|
||||||
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
|
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
|
||||||
boolean needToParseExternalTimestamp = request.timestamp() != null;
|
if (allowIdGeneration) {
|
||||||
MetaData metaData = clusterService.state().metaData();
|
if (request.id() == null) {
|
||||||
request.routing(metaData.resolveIndexRouting(request.routing(), request.index()));
|
request.id(UUID.randomBase64UUID());
|
||||||
request.index(metaData.concreteIndex(request.index()));
|
// since we generate the id, change it to CREATE
|
||||||
if (metaData.hasIndex(request.index())) {
|
request.opType(IndexRequest.OpType.CREATE);
|
||||||
MappingMetaData mappingMd = metaData.index(request.index()).mapping(request.type());
|
|
||||||
if (mappingMd != null) {
|
|
||||||
// Try to parse externally provided timestamp if necessary (with a mapping)
|
|
||||||
if (needToParseExternalTimestamp) {
|
|
||||||
request.parseStringTimestamp(request.timestamp(), mappingMd.tsDateTimeFormatter());
|
|
||||||
needToParseExternalTimestamp = false; // parseTimestamp throws an exception if something gone wrong
|
|
||||||
}
|
|
||||||
request.processRoutingAndTimestamp(mappingMd);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to parse external timestamp if necessary (without mapping)
|
MetaData metaData = clusterService.state().metaData();
|
||||||
if (needToParseExternalTimestamp) {
|
String aliasOrIndex = request.index();
|
||||||
request.parseStringTimestamp(request.timestamp(), TimestampFieldMapper.Defaults.DATE_TIME_FORMATTER);
|
request.index(metaData.concreteIndex(request.index()));
|
||||||
}
|
MappingMetaData mappingMd = null;
|
||||||
// The timestamp has not been sets neither externally nor in the source doc so we generate it
|
if (metaData.hasIndex(request.index())) {
|
||||||
if (request.timestamp() == null) {
|
mappingMd = metaData.index(request.index()).mapping(request.type());
|
||||||
request.generateTimestamp();
|
|
||||||
}
|
}
|
||||||
|
request.process(metaData, aliasOrIndex, mappingMd);
|
||||||
super.doExecute(request, listener);
|
super.doExecute(request, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +178,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||||
|
|
||||||
IndexShard indexShard = indexShard(shardRequest);
|
IndexShard indexShard = indexShard(shardRequest);
|
||||||
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
|
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
|
||||||
.routing(request.routing()).parent(request.parent()).timestamp(request.getParsedTimestamp());
|
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp());
|
||||||
long version;
|
long version;
|
||||||
Engine.IndexingOperation op;
|
Engine.IndexingOperation op;
|
||||||
if (request.opType() == IndexRequest.OpType.INDEX) {
|
if (request.opType() == IndexRequest.OpType.INDEX) {
|
||||||
|
@ -250,7 +233,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||||
IndexShard indexShard = indexShard(shardRequest);
|
IndexShard indexShard = indexShard(shardRequest);
|
||||||
IndexRequest request = shardRequest.request;
|
IndexRequest request = shardRequest.request;
|
||||||
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
|
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
|
||||||
.routing(request.routing()).parent(request.parent()).timestamp(request.getParsedTimestamp());
|
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp());
|
||||||
if (request.opType() == IndexRequest.OpType.INDEX) {
|
if (request.opType() == IndexRequest.OpType.INDEX) {
|
||||||
Engine.Index index = indexShard.prepareIndex(sourceToParse)
|
Engine.Index index = indexShard.prepareIndex(sourceToParse)
|
||||||
.version(request.version())
|
.version(request.version())
|
||||||
|
|
|
@ -303,6 +303,14 @@ public class IndexRequestBuilder extends BaseRequestBuilder<IndexRequest, IndexR
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the timestamp either as millis since the epoch, or, in the configured date format.
|
||||||
|
*/
|
||||||
|
public IndexRequestBuilder setTimestamp(String timestamp) {
|
||||||
|
request.timestamp(timestamp);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Should the listener be called on a separate thread if needed.
|
* Should the listener be called on a separate thread if needed.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster.metadata;
|
package org.elasticsearch.cluster.metadata;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.TimestampParsingException;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
|
@ -27,8 +28,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
|
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
|
||||||
import org.elasticsearch.common.joda.Joda;
|
import org.elasticsearch.common.joda.Joda;
|
||||||
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
|
|
||||||
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||||
|
@ -83,6 +82,21 @@ public class MappingMetaData {
|
||||||
|
|
||||||
public static class Timestamp {
|
public static class Timestamp {
|
||||||
|
|
||||||
|
public static String parseStringTimestamp(String timestampAsString, FormatDateTimeFormatter dateTimeFormatter) throws TimestampParsingException {
|
||||||
|
long ts;
|
||||||
|
try {
|
||||||
|
ts = Long.parseLong(timestampAsString);
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
try {
|
||||||
|
ts = dateTimeFormatter.parser().parseMillis(timestampAsString);
|
||||||
|
} catch (RuntimeException e1) {
|
||||||
|
throw new TimestampParsingException(timestampAsString);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return String.valueOf(ts);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static final Timestamp EMPTY = new Timestamp(false, null, TimestampFieldMapper.DEFAULT_DATE_TIME_FORMAT);
|
public static final Timestamp EMPTY = new Timestamp(false, null, TimestampFieldMapper.DEFAULT_DATE_TIME_FORMAT);
|
||||||
|
|
||||||
private final boolean enabled;
|
private final boolean enabled;
|
||||||
|
@ -93,6 +107,8 @@ public class MappingMetaData {
|
||||||
|
|
||||||
private final String[] pathElements;
|
private final String[] pathElements;
|
||||||
|
|
||||||
|
private final FormatDateTimeFormatter dateTimeFormatter;
|
||||||
|
|
||||||
public Timestamp(boolean enabled, String path, String format) {
|
public Timestamp(boolean enabled, String path, String format) {
|
||||||
this.enabled = enabled;
|
this.enabled = enabled;
|
||||||
this.path = path;
|
this.path = path;
|
||||||
|
@ -102,6 +118,7 @@ public class MappingMetaData {
|
||||||
pathElements = Strings.delimitedListToStringArray(path, ".");
|
pathElements = Strings.delimitedListToStringArray(path, ".");
|
||||||
}
|
}
|
||||||
this.format = format;
|
this.format = format;
|
||||||
|
this.dateTimeFormatter = Joda.forPattern(format);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean enabled() {
|
public boolean enabled() {
|
||||||
|
@ -123,6 +140,10 @@ public class MappingMetaData {
|
||||||
public String format() {
|
public String format() {
|
||||||
return this.format;
|
return this.format;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public FormatDateTimeFormatter dateTimeFormatter() {
|
||||||
|
return this.dateTimeFormatter;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final String type;
|
private final String type;
|
||||||
|
@ -131,14 +152,12 @@ public class MappingMetaData {
|
||||||
|
|
||||||
private final Routing routing;
|
private final Routing routing;
|
||||||
private final Timestamp timestamp;
|
private final Timestamp timestamp;
|
||||||
private final FormatDateTimeFormatter tsDateTimeFormatter;
|
|
||||||
|
|
||||||
public MappingMetaData(DocumentMapper docMapper) {
|
public MappingMetaData(DocumentMapper docMapper) {
|
||||||
this.type = docMapper.type();
|
this.type = docMapper.type();
|
||||||
this.source = docMapper.mappingSource();
|
this.source = docMapper.mappingSource();
|
||||||
this.routing = new Routing(docMapper.routingFieldMapper().required(), docMapper.routingFieldMapper().path());
|
this.routing = new Routing(docMapper.routingFieldMapper().required(), docMapper.routingFieldMapper().path());
|
||||||
this.timestamp = new Timestamp(docMapper.timestampFieldMapper().enabled(), docMapper.timestampFieldMapper().path(), docMapper.timestampFieldMapper().dateTimeFormatter().format());
|
this.timestamp = new Timestamp(docMapper.timestampFieldMapper().enabled(), docMapper.timestampFieldMapper().path(), docMapper.timestampFieldMapper().dateTimeFormatter().format());
|
||||||
this.tsDateTimeFormatter = docMapper.timestampFieldMapper().dateTimeFormatter();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public MappingMetaData(String type, Map<String, Object> mapping) throws IOException {
|
public MappingMetaData(String type, Map<String, Object> mapping) throws IOException {
|
||||||
|
@ -185,7 +204,6 @@ public class MappingMetaData {
|
||||||
} else {
|
} else {
|
||||||
this.timestamp = Timestamp.EMPTY;
|
this.timestamp = Timestamp.EMPTY;
|
||||||
}
|
}
|
||||||
this.tsDateTimeFormatter = Joda.forPattern(timestamp.format());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MappingMetaData(String type, CompressedString source, Routing routing, Timestamp timestamp) {
|
MappingMetaData(String type, CompressedString source, Routing routing, Timestamp timestamp) {
|
||||||
|
@ -193,7 +211,6 @@ public class MappingMetaData {
|
||||||
this.source = source;
|
this.source = source;
|
||||||
this.routing = routing;
|
this.routing = routing;
|
||||||
this.timestamp = timestamp;
|
this.timestamp = timestamp;
|
||||||
this.tsDateTimeFormatter = Joda.forPattern(timestamp.format());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String type() {
|
public String type() {
|
||||||
|
@ -212,23 +229,19 @@ public class MappingMetaData {
|
||||||
return this.timestamp;
|
return this.timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public FormatDateTimeFormatter tsDateTimeFormatter() {
|
|
||||||
return this.tsDateTimeFormatter;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Tuple<String, String> parseRoutingAndTimestamp(XContentParser parser,
|
public Tuple<String, String> parseRoutingAndTimestamp(XContentParser parser,
|
||||||
boolean shouldParseRouting,
|
boolean shouldParseRouting,
|
||||||
boolean shouldParseTimestamp) throws IOException {
|
boolean shouldParseTimestamp) throws IOException {
|
||||||
return parseRoutingAndTimestamp(parser, 0, 0, null, null, shouldParseRouting, shouldParseTimestamp);
|
return parseRoutingAndTimestamp(parser, 0, 0, null, null, shouldParseRouting, shouldParseTimestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Tuple<String, String> parseRoutingAndTimestamp(XContentParser parser,
|
private Tuple<String, String> parseRoutingAndTimestamp(XContentParser parser,
|
||||||
int locationRouting,
|
int locationRouting,
|
||||||
int locationTimestamp,
|
int locationTimestamp,
|
||||||
@Nullable String routingValue,
|
@Nullable String routingValue,
|
||||||
@Nullable String timestampValue,
|
@Nullable String timestampValue,
|
||||||
boolean shouldParseRouting,
|
boolean shouldParseRouting,
|
||||||
boolean shouldParseTimestamp) throws IOException {
|
boolean shouldParseTimestamp) throws IOException {
|
||||||
XContentParser.Token t = parser.currentToken();
|
XContentParser.Token t = parser.currentToken();
|
||||||
if (t == null) {
|
if (t == null) {
|
||||||
t = parser.nextToken();
|
t = parser.nextToken();
|
||||||
|
|
|
@ -117,6 +117,11 @@ public class SourceToParse {
|
||||||
return this.timestamp;
|
return this.timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SourceToParse timestamp(String timestamp) {
|
||||||
|
this.timestamp = Long.parseLong(timestamp);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public SourceToParse timestamp(long timestamp) {
|
public SourceToParse timestamp(long timestamp) {
|
||||||
this.timestamp = timestamp;
|
this.timestamp = timestamp;
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -180,10 +180,10 @@ public class TimestampFieldMapper extends DateFieldMapper implements InternalMap
|
||||||
@Override protected Fieldable parseCreateField(ParseContext context) throws IOException {
|
@Override protected Fieldable parseCreateField(ParseContext context) throws IOException {
|
||||||
if (enabled) {
|
if (enabled) {
|
||||||
long timestamp = context.sourceToParse().timestamp();
|
long timestamp = context.sourceToParse().timestamp();
|
||||||
if (!indexed() && !stored()) {
|
if (!indexed() && !stored()) {
|
||||||
context.ignoredValue(names.indexName(), String.valueOf(timestamp));
|
context.ignoredValue(names.indexName(), String.valueOf(timestamp));
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return new LongFieldMapper.CustomLongNumericField(this, timestamp);
|
return new LongFieldMapper.CustomLongNumericField(this, timestamp);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -0,0 +1,96 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.test.integration.timestamp;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
|
import org.elasticsearch.test.integration.AbstractNodesTests;
|
||||||
|
import org.testng.annotations.AfterClass;
|
||||||
|
import org.testng.annotations.BeforeClass;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.*;
|
||||||
|
import static org.hamcrest.Matchers.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class SimpleTimestampTests extends AbstractNodesTests {
|
||||||
|
|
||||||
|
private Client client;
|
||||||
|
|
||||||
|
@BeforeClass public void createNodes() throws Exception {
|
||||||
|
startNode("node1");
|
||||||
|
startNode("node2");
|
||||||
|
client = getClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass public void closeNodes() {
|
||||||
|
client.close();
|
||||||
|
closeAllNodes();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Client getClient() {
|
||||||
|
return client("node1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testSimpleTimestamp() throws Exception {
|
||||||
|
client.admin().indices().prepareDelete().execute().actionGet();
|
||||||
|
|
||||||
|
client.admin().indices().prepareCreate("test")
|
||||||
|
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_timestamp").field("enabled", true).field("store", "yes").endObject().endObject().endObject())
|
||||||
|
.execute().actionGet();
|
||||||
|
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
|
||||||
|
|
||||||
|
logger.info("--> check with automatic timestamp");
|
||||||
|
long now1 = System.currentTimeMillis();
|
||||||
|
client.prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
|
||||||
|
long now2 = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// we need to add support for fetching _timestamp from the translog in realtime case...
|
||||||
|
GetResponse getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
|
||||||
|
long timestamp = ((Number) getResponse.field("_timestamp").value()).longValue();
|
||||||
|
assertThat(timestamp, greaterThanOrEqualTo(now1));
|
||||||
|
assertThat(timestamp, lessThanOrEqualTo(now2));
|
||||||
|
// verify its the same timestamp when going the replica
|
||||||
|
getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
|
||||||
|
assertThat(((Number) getResponse.field("_timestamp").value()).longValue(), equalTo(timestamp));
|
||||||
|
|
||||||
|
logger.info("--> check with custom timestamp (numeric)");
|
||||||
|
client.prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimestamp("10").setRefresh(true).execute().actionGet();
|
||||||
|
|
||||||
|
getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
|
||||||
|
timestamp = ((Number) getResponse.field("_timestamp").value()).longValue();
|
||||||
|
assertThat(timestamp, equalTo(10l));
|
||||||
|
// verify its the same timestamp when going the replica
|
||||||
|
getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
|
||||||
|
assertThat(((Number) getResponse.field("_timestamp").value()).longValue(), equalTo(timestamp));
|
||||||
|
|
||||||
|
logger.info("--> check with custom timestamp (string)");
|
||||||
|
client.prepareIndex("test", "type1", "1").setSource("field1", "value1").setTimestamp("1970-01-01T00:00:00.020").setRefresh(true).execute().actionGet();
|
||||||
|
|
||||||
|
getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
|
||||||
|
timestamp = ((Number) getResponse.field("_timestamp").value()).longValue();
|
||||||
|
assertThat(timestamp, equalTo(20l));
|
||||||
|
// verify its the same timestamp when going the replica
|
||||||
|
getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
|
||||||
|
assertThat(((Number) getResponse.field("_timestamp").value()).longValue(), equalTo(timestamp));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue