diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 200b83ff02b..170af189681 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -162,18 +162,18 @@ public class SimulatePipelineRequest extends ActionRequest { if (pipeline == null) { throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist"); } - List ingestDocumentList = parseDocs(config); + List ingestDocumentList = parseDocs(config, pipelineStore.isNewIngestDateFormat()); return new Parsed(pipeline, ingestDocumentList, verbose); } static Parsed parse(Map config, boolean verbose, PipelineStore pipelineStore) throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories()); - List ingestDocumentList = parseDocs(config); + List ingestDocumentList = parseDocs(config, pipelineStore.isNewIngestDateFormat()); return new Parsed(pipeline, ingestDocumentList, verbose); } - private static List parseDocs(Map config) { + private static List parseDocs(Map config, boolean newDateFormat) { List> docs = ConfigurationUtils.readList(null, null, config, Fields.DOCS); List ingestDocumentList = new ArrayList<>(); for (Map dataMap : docs) { @@ -183,7 +183,7 @@ public class SimulatePipelineRequest extends ActionRequest { ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.ID.getFieldName(), "_id"), ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.ROUTING.getFieldName()), ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.PARENT.getFieldName()), - document); + document, newDateFormat); ingestDocumentList.add(ingestDocument); } return ingestDocumentList; diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 778b02dd791..0b3f4d4cbc9 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -73,6 +73,7 @@ import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.monitor.fs.FsService; import org.elasticsearch.monitor.jvm.JvmGcMonitorService; import org.elasticsearch.monitor.jvm.JvmService; @@ -404,6 +405,7 @@ public final class ClusterSettings extends AbstractScopedSettings { SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING, ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING, FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, - Node.BREAKER_TYPE_KEY + Node.BREAKER_TYPE_KEY, + IngestService.NEW_INGEST_DATE_FORMAT ))); } diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java index fcf49ef6992..05b92b57723 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.IndexFieldMapper; import org.elasticsearch.index.mapper.ParentFieldMapper; @@ -27,6 +28,8 @@ import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.TypeFieldMapper; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -52,6 +55,11 @@ public final class IngestDocument { private final Map ingestMetadata; public IngestDocument(String index, String type, String id, String routing, String parent, Map source) { + this(index, type, id, routing, parent, source, false); + } + + public IngestDocument(String index, String type, String id, String routing, String parent, Map source, + boolean newDateFormat) { this.sourceAndMetadata = new HashMap<>(); this.sourceAndMetadata.putAll(source); this.sourceAndMetadata.put(MetaData.INDEX.getFieldName(), index); @@ -65,7 +73,11 @@ public final class IngestDocument { } this.ingestMetadata = new HashMap<>(); - this.ingestMetadata.put(TIMESTAMP, new Date()); + if (newDateFormat) { + this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC)); + } else { + this.ingestMetadata.put(TIMESTAMP, new Date()); + } } /** @@ -608,6 +620,9 @@ public final class IngestDocument { return value; } else if (value instanceof Date) { return ((Date) value).clone(); + } else if (value instanceof ZonedDateTime) { + ZonedDateTime zonedDateTime = (ZonedDateTime) value; + return ZonedDateTime.of(zonedDateTime.toLocalDate(), zonedDateTime.toLocalTime(), zonedDateTime.getZone()); } else { throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]"); } diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestService.java b/core/src/main/java/org/elasticsearch/ingest/IngestService.java index 5249ed7a7dc..1455e37588a 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -25,6 +25,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; @@ -32,15 +34,19 @@ import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; +import static org.elasticsearch.common.settings.Setting.Property; + /** * Holder class for several ingest related services. */ public class IngestService { + public static final Setting NEW_INGEST_DATE_FORMAT = + Setting.boolSetting("ingest.new_date_format", false, Property.NodeScope, Property.Dynamic, Property.Deprecated); private final PipelineStore pipelineStore; private final PipelineExecutionService pipelineExecutionService; - public IngestService(Settings settings, ThreadPool threadPool, + public IngestService(ClusterSettings clusterSettings, Settings settings, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, List ingestPlugins) { @@ -56,7 +62,7 @@ public class IngestService { } } } - this.pipelineStore = new PipelineStore(settings, Collections.unmodifiableMap(processorFactories)); + this.pipelineStore = new PipelineStore(clusterSettings, settings, Collections.unmodifiableMap(processorFactories)); this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); } diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index c1b46e49567..cd7787a6910 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -160,7 +160,8 @@ public class PipelineExecutionService implements ClusterStateApplier { String routing = indexRequest.routing(); String parent = indexRequest.parent(); Map sourceAsMap = indexRequest.sourceAsMap(); - IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, sourceAsMap); + IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, + sourceAsMap, store.isNewIngestDateFormat()); pipeline.execute(ingestDocument); Map metadataMap = ingestDocument.extractMetadata(); diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java index 8c46c14fad6..327ab18907c 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; @@ -51,6 +52,7 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl private final Pipeline.Factory factory = new Pipeline.Factory(); private final Map processorFactories; + private volatile boolean newIngestDateFormat; // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. // We know of all the processor factories when a node with all its plugin have been initialized. Also some @@ -58,9 +60,15 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. volatile Map pipelines = new HashMap<>(); - public PipelineStore(Settings settings, Map processorFactories) { + public PipelineStore(ClusterSettings clusterSettings, Settings settings, Map processorFactories) { super(settings); this.processorFactories = processorFactories; + this.newIngestDateFormat = IngestService.NEW_INGEST_DATE_FORMAT.get(settings); + clusterSettings.addSettingsUpdateConsumer(IngestService.NEW_INGEST_DATE_FORMAT, this::setNewIngestDateFormat); + } + + private void setNewIngestDateFormat(Boolean newIngestDateFormat) { + this.newIngestDateFormat = newIngestDateFormat; } @Override @@ -204,6 +212,10 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl return processorFactories; } + public boolean isNewIngestDateFormat() { + return newIngestDateFormat; + } + /** * @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines * may be returned diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 28d2f42f592..a8f3921b692 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -339,7 +339,7 @@ public class Node implements Closeable { final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool); clusterService.addListener(scriptModule.getScriptService()); resourcesToClose.add(clusterService); - final IngestService ingestService = new IngestService(settings, threadPool, this.environment, + final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java b/core/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java index e1ec12e29b6..1dc6a0d27ff 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java @@ -22,6 +22,8 @@ package org.elasticsearch.ingest; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -45,13 +47,18 @@ import static org.hamcrest.Matchers.sameInstance; public class IngestDocumentTests extends ESTestCase { private static final Date BOGUS_TIMESTAMP = new Date(0L); + private static final ZonedDateTime BOGUS_TIMESTAMP_NEW_DATE_FORMAT = ZonedDateTime.of(2016, 10, 23, 0, 0, 0, 0, ZoneOffset.UTC); private IngestDocument ingestDocument; + private IngestDocument ingestDocumentWithNewDateFormat; - @Before - public void setIngestDocument() { + public IngestDocument getTestIngestDocument(boolean newDateFormat) { Map document = new HashMap<>(); Map ingestMap = new HashMap<>(); - ingestMap.put("timestamp", BOGUS_TIMESTAMP); + if (newDateFormat) { + ingestMap.put("timestamp", BOGUS_TIMESTAMP_NEW_DATE_FORMAT); + } else { + ingestMap.put("timestamp", BOGUS_TIMESTAMP); + } document.put("_ingest", ingestMap); document.put("foo", "bar"); document.put("int", 123); @@ -72,7 +79,18 @@ public class IngestDocumentTests extends ESTestCase { list.add(null); document.put("list", list); - ingestDocument = new IngestDocument("index", "type", "id", null, null, document); + return new IngestDocument("index", "type", "id", null, null, document, newDateFormat); + } + + @Before + public void setIngestDocuments() { + ingestDocument = getTestIngestDocument(false); + ingestDocumentWithNewDateFormat = getTestIngestDocument(true); + } + + public void testDefaultConstructorUsesDateClass() { + IngestDocument ingestDocument = new IngestDocument("foo", "bar", "baz", "fuzz", "buzz", Collections.emptyMap()); + assertThat(ingestDocument.getFieldValue("_ingest.timestamp", Object.class).getClass(), equalTo(Date.class)); } public void testSimpleGetFieldValue() { @@ -88,6 +106,13 @@ public class IngestDocumentTests extends ESTestCase { assertThat(ingestDocument.getFieldValue("_source._ingest.timestamp", Date.class), equalTo(BOGUS_TIMESTAMP)); } + public void testNewDateFormat() { + assertThat(ingestDocumentWithNewDateFormat.getFieldValue("_ingest.timestamp", ZonedDateTime.class), + both(notNullValue()).and(not(equalTo(BOGUS_TIMESTAMP_NEW_DATE_FORMAT)))); + assertThat(ingestDocumentWithNewDateFormat.getFieldValue("_source._ingest.timestamp", ZonedDateTime.class), + equalTo(BOGUS_TIMESTAMP_NEW_DATE_FORMAT)); + } + public void testGetSourceObject() { try { ingestDocument.getFieldValue("_source", Object.class); diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 3a842a4690a..d36b3390e41 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Map; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.test.ESTestCase; @@ -39,7 +40,8 @@ public class IngestServiceTests extends ESTestCase { public void testIngestPlugin() { ThreadPool tp = Mockito.mock(ThreadPool.class); - IngestService ingestService = new IngestService(Settings.EMPTY, tp, null, null, null, Collections.singletonList(DUMMY_PLUGIN)); + IngestService ingestService = new IngestService(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + Settings.EMPTY, tp, null, null, null, Collections.singletonList(DUMMY_PLUGIN)); Map factories = ingestService.getPipelineStore().getProcessorFactories(); assertTrue(factories.containsKey("foo")); assertEquals(1, factories.size()); @@ -48,7 +50,8 @@ public class IngestServiceTests extends ESTestCase { public void testIngestPluginDuplicate() { ThreadPool tp = Mockito.mock(ThreadPool.class); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> - new IngestService(Settings.EMPTY, tp, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN)) + new IngestService(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + Settings.EMPTY, tp, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN)) ); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index 19a269c3f71..a503ac9aaa8 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; @@ -49,6 +50,7 @@ import static org.hamcrest.Matchers.nullValue; public class PipelineStoreTests extends ESTestCase { + private ClusterSettings clusterSettings; private PipelineStore store; @Before @@ -93,7 +95,8 @@ public class PipelineStoreTests extends ESTestCase { } }; }); - store = new PipelineStore(Settings.EMPTY, processorFactories); + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + store = new PipelineStore(clusterSettings, Settings.EMPTY, processorFactories); } public void testUpdatePipelines() { @@ -369,4 +372,11 @@ public class PipelineStoreTests extends ESTestCase { store.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest); } + public void testUpdateIngestNewDateFormatSetting() throws Exception { + assertFalse(store.isNewIngestDateFormat()); + clusterSettings.applySettings(Settings.builder().put(IngestService.NEW_INGEST_DATE_FORMAT.getKey(), true).build()); + assertTrue(store.isNewIngestDateFormat()); + assertWarnings("[ingest.new_date_format] setting was deprecated in Elasticsearch and will be " + + "removed in a future release! See the breaking changes documentation for the next major version."); + } } diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/60_pipeline_timestamp_date_mapping.yaml b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/60_pipeline_timestamp_date_mapping.yaml new file mode 100644 index 00000000000..e172ad9f4ce --- /dev/null +++ b/qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/60_pipeline_timestamp_date_mapping.yaml @@ -0,0 +1,107 @@ +--- +"Test timestamp templating does not match date-mapping defaults": + - do: + cluster.health: + wait_for_status: green + + - do: + indices.create: + index: timetest + body: + mappings: + test: { "properties": { "my_time": {"type": "date"}}} + + - do: + ingest.put_pipeline: + id: "my_timely_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field": "my_time", + "value": "{{ _ingest.timestamp }}" + } + }, + { + "date" : { + "field" : "my_time", + "target_field": "my_time", + "formats": ["EEE MMM dd HH:mm:ss zzz yyyy"] + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: timetest + type: test + id: 1 + pipeline: "my_timely_pipeline" + body: {} + +--- +"Test timestamp templating matches date-mapping defaults with ingest.new_date_format": + - skip: + version: " - 5.3.99" + reason: deprecated in 5.4.0 + features: "warnings" + + - do: + cluster.health: + wait_for_status: green + + - do: + indices.create: + index: timetest_newdateformat + body: + mappings: + test: { "properties": { "my_time": {"type": "date"}}} + + - do: + cluster.put_settings: + body: + transient: + ingest.new_date_format: true + warnings: + - "[ingest.new_date_format] setting was deprecated in Elasticsearch and will be removed in a future release! See the breaking changes documentation for the next major version." + + - match: {transient: {ingest: {new_date_format: "true"}}} + + - do: + ingest.put_pipeline: + id: "my_timely_pipeline_with_new_date_format" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field": "my_time", + "value": "{{ _ingest.timestamp }}" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: timetest + type: test + id: 1 + pipeline: "my_timely_pipeline_with_new_date_format" + body: {} + + - do: + cluster.put_settings: + body: + transient: + ingest.new_date_format: false + warnings: + - "[ingest.new_date_format] setting was deprecated in Elasticsearch and will be removed in a future release! See the breaking changes documentation for the next major version." + + - match: {transient: {ingest: {new_date_format: "false"}}}