add option for _ingest.timestamp to use new ZonedDateTime (#24030)
Previously, Mustache would call `toString` on the `_ingest.timestamp` field and return a date format that did not match Elasticsearch's defaults for date-mapping parsing. The new ZonedDateTime class in Java 8 happens to do format itself in the same way ES is expecting. This commit adds support for a feature flag that enables the usage of this new date format that has more native behavior. Fixes #23168. This new fix can be found in the form of a cluster setting called `ingest.new_date_format`. By default, in 5.x, the existing behavior will remain the same. One will set this property to `true` in order to take advantage of this update for ingest-pipeline convenience.
This commit is contained in:
parent
fec1802e2f
commit
423b0f5e3d
|
@ -162,18 +162,18 @@ public class SimulatePipelineRequest extends ActionRequest {
|
||||||
if (pipeline == null) {
|
if (pipeline == null) {
|
||||||
throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist");
|
throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist");
|
||||||
}
|
}
|
||||||
List<IngestDocument> ingestDocumentList = parseDocs(config);
|
List<IngestDocument> ingestDocumentList = parseDocs(config, pipelineStore.isNewIngestDateFormat());
|
||||||
return new Parsed(pipeline, ingestDocumentList, verbose);
|
return new Parsed(pipeline, ingestDocumentList, verbose);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
|
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
|
||||||
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
|
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
|
||||||
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
|
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
|
||||||
List<IngestDocument> ingestDocumentList = parseDocs(config);
|
List<IngestDocument> ingestDocumentList = parseDocs(config, pipelineStore.isNewIngestDateFormat());
|
||||||
return new Parsed(pipeline, ingestDocumentList, verbose);
|
return new Parsed(pipeline, ingestDocumentList, verbose);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<IngestDocument> parseDocs(Map<String, Object> config) {
|
private static List<IngestDocument> parseDocs(Map<String, Object> config, boolean newDateFormat) {
|
||||||
List<Map<String, Object>> docs = ConfigurationUtils.readList(null, null, config, Fields.DOCS);
|
List<Map<String, Object>> docs = ConfigurationUtils.readList(null, null, config, Fields.DOCS);
|
||||||
List<IngestDocument> ingestDocumentList = new ArrayList<>();
|
List<IngestDocument> ingestDocumentList = new ArrayList<>();
|
||||||
for (Map<String, Object> dataMap : docs) {
|
for (Map<String, Object> dataMap : docs) {
|
||||||
|
@ -183,7 +183,7 @@ public class SimulatePipelineRequest extends ActionRequest {
|
||||||
ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.ID.getFieldName(), "_id"),
|
ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.ID.getFieldName(), "_id"),
|
||||||
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.ROUTING.getFieldName()),
|
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.ROUTING.getFieldName()),
|
||||||
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.PARENT.getFieldName()),
|
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.PARENT.getFieldName()),
|
||||||
document);
|
document, newDateFormat);
|
||||||
ingestDocumentList.add(ingestDocument);
|
ingestDocumentList.add(ingestDocument);
|
||||||
}
|
}
|
||||||
return ingestDocumentList;
|
return ingestDocumentList;
|
||||||
|
|
|
@ -73,6 +73,7 @@ import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||||
import org.elasticsearch.indices.store.IndicesStore;
|
import org.elasticsearch.indices.store.IndicesStore;
|
||||||
|
import org.elasticsearch.ingest.IngestService;
|
||||||
import org.elasticsearch.monitor.fs.FsService;
|
import org.elasticsearch.monitor.fs.FsService;
|
||||||
import org.elasticsearch.monitor.jvm.JvmGcMonitorService;
|
import org.elasticsearch.monitor.jvm.JvmGcMonitorService;
|
||||||
import org.elasticsearch.monitor.jvm.JvmService;
|
import org.elasticsearch.monitor.jvm.JvmService;
|
||||||
|
@ -404,6 +405,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING,
|
SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING,
|
||||||
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
|
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
|
||||||
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
|
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
|
||||||
Node.BREAKER_TYPE_KEY
|
Node.BREAKER_TYPE_KEY,
|
||||||
|
IngestService.NEW_INGEST_DATE_FORMAT
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.ingest;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.IndexFieldMapper;
|
import org.elasticsearch.index.mapper.IndexFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.ParentFieldMapper;
|
import org.elasticsearch.index.mapper.ParentFieldMapper;
|
||||||
|
@ -27,6 +28,8 @@ import org.elasticsearch.index.mapper.RoutingFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.TypeFieldMapper;
|
import org.elasticsearch.index.mapper.TypeFieldMapper;
|
||||||
|
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
|
@ -52,6 +55,11 @@ public final class IngestDocument {
|
||||||
private final Map<String, Object> ingestMetadata;
|
private final Map<String, Object> ingestMetadata;
|
||||||
|
|
||||||
public IngestDocument(String index, String type, String id, String routing, String parent, Map<String, Object> source) {
|
public IngestDocument(String index, String type, String id, String routing, String parent, Map<String, Object> source) {
|
||||||
|
this(index, type, id, routing, parent, source, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public IngestDocument(String index, String type, String id, String routing, String parent, Map<String, Object> source,
|
||||||
|
boolean newDateFormat) {
|
||||||
this.sourceAndMetadata = new HashMap<>();
|
this.sourceAndMetadata = new HashMap<>();
|
||||||
this.sourceAndMetadata.putAll(source);
|
this.sourceAndMetadata.putAll(source);
|
||||||
this.sourceAndMetadata.put(MetaData.INDEX.getFieldName(), index);
|
this.sourceAndMetadata.put(MetaData.INDEX.getFieldName(), index);
|
||||||
|
@ -65,8 +73,12 @@ public final class IngestDocument {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.ingestMetadata = new HashMap<>();
|
this.ingestMetadata = new HashMap<>();
|
||||||
|
if (newDateFormat) {
|
||||||
|
this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC));
|
||||||
|
} else {
|
||||||
this.ingestMetadata.put(TIMESTAMP, new Date());
|
this.ingestMetadata.put(TIMESTAMP, new Date());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided as argument
|
* Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided as argument
|
||||||
|
@ -608,6 +620,9 @@ public final class IngestDocument {
|
||||||
return value;
|
return value;
|
||||||
} else if (value instanceof Date) {
|
} else if (value instanceof Date) {
|
||||||
return ((Date) value).clone();
|
return ((Date) value).clone();
|
||||||
|
} else if (value instanceof ZonedDateTime) {
|
||||||
|
ZonedDateTime zonedDateTime = (ZonedDateTime) value;
|
||||||
|
return ZonedDateTime.of(zonedDateTime.toLocalDate(), zonedDateTime.toLocalTime(), zonedDateTime.getZone());
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]");
|
throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]");
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,8 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||||
|
@ -32,15 +34,19 @@ import org.elasticsearch.plugins.IngestPlugin;
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
|
import static org.elasticsearch.common.settings.Setting.Property;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holder class for several ingest related services.
|
* Holder class for several ingest related services.
|
||||||
*/
|
*/
|
||||||
public class IngestService {
|
public class IngestService {
|
||||||
|
public static final Setting<Boolean> NEW_INGEST_DATE_FORMAT =
|
||||||
|
Setting.boolSetting("ingest.new_date_format", false, Property.NodeScope, Property.Dynamic, Property.Deprecated);
|
||||||
|
|
||||||
private final PipelineStore pipelineStore;
|
private final PipelineStore pipelineStore;
|
||||||
private final PipelineExecutionService pipelineExecutionService;
|
private final PipelineExecutionService pipelineExecutionService;
|
||||||
|
|
||||||
public IngestService(Settings settings, ThreadPool threadPool,
|
public IngestService(ClusterSettings clusterSettings, Settings settings, ThreadPool threadPool,
|
||||||
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
|
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
|
||||||
List<IngestPlugin> ingestPlugins) {
|
List<IngestPlugin> ingestPlugins) {
|
||||||
|
|
||||||
|
@ -56,7 +62,7 @@ public class IngestService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.pipelineStore = new PipelineStore(settings, Collections.unmodifiableMap(processorFactories));
|
this.pipelineStore = new PipelineStore(clusterSettings, settings, Collections.unmodifiableMap(processorFactories));
|
||||||
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
|
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -160,7 +160,8 @@ public class PipelineExecutionService implements ClusterStateApplier {
|
||||||
String routing = indexRequest.routing();
|
String routing = indexRequest.routing();
|
||||||
String parent = indexRequest.parent();
|
String parent = indexRequest.parent();
|
||||||
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
|
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
|
||||||
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, sourceAsMap);
|
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent,
|
||||||
|
sourceAsMap, store.isNewIngestDateFormat());
|
||||||
pipeline.execute(ingestDocument);
|
pipeline.execute(ingestDocument);
|
||||||
|
|
||||||
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
|
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.regex.Regex;
|
import org.elasticsearch.common.regex.Regex;
|
||||||
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
|
|
||||||
|
@ -51,6 +52,7 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl
|
||||||
|
|
||||||
private final Pipeline.Factory factory = new Pipeline.Factory();
|
private final Pipeline.Factory factory = new Pipeline.Factory();
|
||||||
private final Map<String, Processor.Factory> processorFactories;
|
private final Map<String, Processor.Factory> processorFactories;
|
||||||
|
private volatile boolean newIngestDateFormat;
|
||||||
|
|
||||||
// Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
|
// Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
|
||||||
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
|
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
|
||||||
|
@ -58,9 +60,15 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl
|
||||||
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
|
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
|
||||||
volatile Map<String, Pipeline> pipelines = new HashMap<>();
|
volatile Map<String, Pipeline> pipelines = new HashMap<>();
|
||||||
|
|
||||||
public PipelineStore(Settings settings, Map<String, Processor.Factory> processorFactories) {
|
public PipelineStore(ClusterSettings clusterSettings, Settings settings, Map<String, Processor.Factory> processorFactories) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.processorFactories = processorFactories;
|
this.processorFactories = processorFactories;
|
||||||
|
this.newIngestDateFormat = IngestService.NEW_INGEST_DATE_FORMAT.get(settings);
|
||||||
|
clusterSettings.addSettingsUpdateConsumer(IngestService.NEW_INGEST_DATE_FORMAT, this::setNewIngestDateFormat);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setNewIngestDateFormat(Boolean newIngestDateFormat) {
|
||||||
|
this.newIngestDateFormat = newIngestDateFormat;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -204,6 +212,10 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl
|
||||||
return processorFactories;
|
return processorFactories;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isNewIngestDateFormat() {
|
||||||
|
return newIngestDateFormat;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines
|
* @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines
|
||||||
* may be returned
|
* may be returned
|
||||||
|
|
|
@ -339,7 +339,7 @@ public class Node implements Closeable {
|
||||||
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
|
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
|
||||||
clusterService.addListener(scriptModule.getScriptService());
|
clusterService.addListener(scriptModule.getScriptService());
|
||||||
resourcesToClose.add(clusterService);
|
resourcesToClose.add(clusterService);
|
||||||
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
|
final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment,
|
||||||
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
|
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
|
||||||
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
|
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,8 @@ package org.elasticsearch.ingest;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -45,13 +47,18 @@ import static org.hamcrest.Matchers.sameInstance;
|
||||||
public class IngestDocumentTests extends ESTestCase {
|
public class IngestDocumentTests extends ESTestCase {
|
||||||
|
|
||||||
private static final Date BOGUS_TIMESTAMP = new Date(0L);
|
private static final Date BOGUS_TIMESTAMP = new Date(0L);
|
||||||
|
private static final ZonedDateTime BOGUS_TIMESTAMP_NEW_DATE_FORMAT = ZonedDateTime.of(2016, 10, 23, 0, 0, 0, 0, ZoneOffset.UTC);
|
||||||
private IngestDocument ingestDocument;
|
private IngestDocument ingestDocument;
|
||||||
|
private IngestDocument ingestDocumentWithNewDateFormat;
|
||||||
|
|
||||||
@Before
|
public IngestDocument getTestIngestDocument(boolean newDateFormat) {
|
||||||
public void setIngestDocument() {
|
|
||||||
Map<String, Object> document = new HashMap<>();
|
Map<String, Object> document = new HashMap<>();
|
||||||
Map<String, Object> ingestMap = new HashMap<>();
|
Map<String, Object> ingestMap = new HashMap<>();
|
||||||
|
if (newDateFormat) {
|
||||||
|
ingestMap.put("timestamp", BOGUS_TIMESTAMP_NEW_DATE_FORMAT);
|
||||||
|
} else {
|
||||||
ingestMap.put("timestamp", BOGUS_TIMESTAMP);
|
ingestMap.put("timestamp", BOGUS_TIMESTAMP);
|
||||||
|
}
|
||||||
document.put("_ingest", ingestMap);
|
document.put("_ingest", ingestMap);
|
||||||
document.put("foo", "bar");
|
document.put("foo", "bar");
|
||||||
document.put("int", 123);
|
document.put("int", 123);
|
||||||
|
@ -72,7 +79,18 @@ public class IngestDocumentTests extends ESTestCase {
|
||||||
list.add(null);
|
list.add(null);
|
||||||
|
|
||||||
document.put("list", list);
|
document.put("list", list);
|
||||||
ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
|
return new IngestDocument("index", "type", "id", null, null, document, newDateFormat);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setIngestDocuments() {
|
||||||
|
ingestDocument = getTestIngestDocument(false);
|
||||||
|
ingestDocumentWithNewDateFormat = getTestIngestDocument(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDefaultConstructorUsesDateClass() {
|
||||||
|
IngestDocument ingestDocument = new IngestDocument("foo", "bar", "baz", "fuzz", "buzz", Collections.emptyMap());
|
||||||
|
assertThat(ingestDocument.getFieldValue("_ingest.timestamp", Object.class).getClass(), equalTo(Date.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSimpleGetFieldValue() {
|
public void testSimpleGetFieldValue() {
|
||||||
|
@ -88,6 +106,13 @@ public class IngestDocumentTests extends ESTestCase {
|
||||||
assertThat(ingestDocument.getFieldValue("_source._ingest.timestamp", Date.class), equalTo(BOGUS_TIMESTAMP));
|
assertThat(ingestDocument.getFieldValue("_source._ingest.timestamp", Date.class), equalTo(BOGUS_TIMESTAMP));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testNewDateFormat() {
|
||||||
|
assertThat(ingestDocumentWithNewDateFormat.getFieldValue("_ingest.timestamp", ZonedDateTime.class),
|
||||||
|
both(notNullValue()).and(not(equalTo(BOGUS_TIMESTAMP_NEW_DATE_FORMAT))));
|
||||||
|
assertThat(ingestDocumentWithNewDateFormat.getFieldValue("_source._ingest.timestamp", ZonedDateTime.class),
|
||||||
|
equalTo(BOGUS_TIMESTAMP_NEW_DATE_FORMAT));
|
||||||
|
}
|
||||||
|
|
||||||
public void testGetSourceObject() {
|
public void testGetSourceObject() {
|
||||||
try {
|
try {
|
||||||
ingestDocument.getFieldValue("_source", Object.class);
|
ingestDocument.getFieldValue("_source", Object.class);
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.plugins.IngestPlugin;
|
import org.elasticsearch.plugins.IngestPlugin;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -39,7 +40,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||||
|
|
||||||
public void testIngestPlugin() {
|
public void testIngestPlugin() {
|
||||||
ThreadPool tp = Mockito.mock(ThreadPool.class);
|
ThreadPool tp = Mockito.mock(ThreadPool.class);
|
||||||
IngestService ingestService = new IngestService(Settings.EMPTY, tp, null, null, null, Collections.singletonList(DUMMY_PLUGIN));
|
IngestService ingestService = new IngestService(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
||||||
|
Settings.EMPTY, tp, null, null, null, Collections.singletonList(DUMMY_PLUGIN));
|
||||||
Map<String, Processor.Factory> factories = ingestService.getPipelineStore().getProcessorFactories();
|
Map<String, Processor.Factory> factories = ingestService.getPipelineStore().getProcessorFactories();
|
||||||
assertTrue(factories.containsKey("foo"));
|
assertTrue(factories.containsKey("foo"));
|
||||||
assertEquals(1, factories.size());
|
assertEquals(1, factories.size());
|
||||||
|
@ -48,7 +50,8 @@ public class IngestServiceTests extends ESTestCase {
|
||||||
public void testIngestPluginDuplicate() {
|
public void testIngestPluginDuplicate() {
|
||||||
ThreadPool tp = Mockito.mock(ThreadPool.class);
|
ThreadPool tp = Mockito.mock(ThreadPool.class);
|
||||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
||||||
new IngestService(Settings.EMPTY, tp, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))
|
new IngestService(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
||||||
|
Settings.EMPTY, tp, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))
|
||||||
);
|
);
|
||||||
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
|
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -49,6 +50,7 @@ import static org.hamcrest.Matchers.nullValue;
|
||||||
|
|
||||||
public class PipelineStoreTests extends ESTestCase {
|
public class PipelineStoreTests extends ESTestCase {
|
||||||
|
|
||||||
|
private ClusterSettings clusterSettings;
|
||||||
private PipelineStore store;
|
private PipelineStore store;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -93,7 +95,8 @@ public class PipelineStoreTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
store = new PipelineStore(Settings.EMPTY, processorFactories);
|
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||||
|
store = new PipelineStore(clusterSettings, Settings.EMPTY, processorFactories);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUpdatePipelines() {
|
public void testUpdatePipelines() {
|
||||||
|
@ -369,4 +372,11 @@ public class PipelineStoreTests extends ESTestCase {
|
||||||
store.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest);
|
store.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testUpdateIngestNewDateFormatSetting() throws Exception {
|
||||||
|
assertFalse(store.isNewIngestDateFormat());
|
||||||
|
clusterSettings.applySettings(Settings.builder().put(IngestService.NEW_INGEST_DATE_FORMAT.getKey(), true).build());
|
||||||
|
assertTrue(store.isNewIngestDateFormat());
|
||||||
|
assertWarnings("[ingest.new_date_format] setting was deprecated in Elasticsearch and will be " +
|
||||||
|
"removed in a future release! See the breaking changes documentation for the next major version.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,107 @@
|
||||||
|
---
|
||||||
|
"Test timestamp templating does not match date-mapping defaults":
|
||||||
|
- do:
|
||||||
|
cluster.health:
|
||||||
|
wait_for_status: green
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.create:
|
||||||
|
index: timetest
|
||||||
|
body:
|
||||||
|
mappings:
|
||||||
|
test: { "properties": { "my_time": {"type": "date"}}}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
ingest.put_pipeline:
|
||||||
|
id: "my_timely_pipeline"
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"description": "_description",
|
||||||
|
"processors": [
|
||||||
|
{
|
||||||
|
"set" : {
|
||||||
|
"field": "my_time",
|
||||||
|
"value": "{{ _ingest.timestamp }}"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"date" : {
|
||||||
|
"field" : "my_time",
|
||||||
|
"target_field": "my_time",
|
||||||
|
"formats": ["EEE MMM dd HH:mm:ss zzz yyyy"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
- match: { acknowledged: true }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: timetest
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
pipeline: "my_timely_pipeline"
|
||||||
|
body: {}
|
||||||
|
|
||||||
|
---
|
||||||
|
"Test timestamp templating matches date-mapping defaults with ingest.new_date_format":
|
||||||
|
- skip:
|
||||||
|
version: " - 5.3.99"
|
||||||
|
reason: deprecated in 5.4.0
|
||||||
|
features: "warnings"
|
||||||
|
|
||||||
|
- do:
|
||||||
|
cluster.health:
|
||||||
|
wait_for_status: green
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.create:
|
||||||
|
index: timetest_newdateformat
|
||||||
|
body:
|
||||||
|
mappings:
|
||||||
|
test: { "properties": { "my_time": {"type": "date"}}}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
cluster.put_settings:
|
||||||
|
body:
|
||||||
|
transient:
|
||||||
|
ingest.new_date_format: true
|
||||||
|
warnings:
|
||||||
|
- "[ingest.new_date_format] setting was deprecated in Elasticsearch and will be removed in a future release! See the breaking changes documentation for the next major version."
|
||||||
|
|
||||||
|
- match: {transient: {ingest: {new_date_format: "true"}}}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
ingest.put_pipeline:
|
||||||
|
id: "my_timely_pipeline_with_new_date_format"
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"description": "_description",
|
||||||
|
"processors": [
|
||||||
|
{
|
||||||
|
"set" : {
|
||||||
|
"field": "my_time",
|
||||||
|
"value": "{{ _ingest.timestamp }}"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
- match: { acknowledged: true }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: timetest
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
pipeline: "my_timely_pipeline_with_new_date_format"
|
||||||
|
body: {}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
cluster.put_settings:
|
||||||
|
body:
|
||||||
|
transient:
|
||||||
|
ingest.new_date_format: false
|
||||||
|
warnings:
|
||||||
|
- "[ingest.new_date_format] setting was deprecated in Elasticsearch and will be removed in a future release! See the breaking changes documentation for the next major version."
|
||||||
|
|
||||||
|
- match: {transient: {ingest: {new_date_format: "false"}}}
|
Loading…
Reference in New Issue