Prior to this commit (and after 6.5.0), if an ingest node changes the _index in a pipeline, the original target index would be created. For daily indexes this could create an extra, empty index per day. This commit changes the TransportBulkAction to execute the ingest node pipeline before attempting to create the index. This ensures that the only index created is the original or one set by the ingest node pipeline. This was the execution order prior to 6.5.0 (#32786). The execution order was changed in 6.5 to better support default pipelines. Specifically the execution order was changed to be able to read the settings from the index meta data. This commit also includes a change in logic such that if the target index does not exist when ingest node pipeline runs, it will now pull the default pipeline (if one exists) from the settings of the best matched of the index template. Relates #32786 Relates #32758 Closes #36545
This commit is contained in:
parent
8ec456b5df
commit
797d6b8a66
|
@ -271,28 +271,6 @@ POST test/_doc/1?pipeline=drop_guests_network
|
|||
// CONSOLE
|
||||
// TEST[continued]
|
||||
|
||||
////
|
||||
Hidden example assertion:
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
GET test/_doc/1
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[continued]
|
||||
// TEST[catch:missing]
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"_index": "test",
|
||||
"_type": "_doc",
|
||||
"_id": "1",
|
||||
"found": false
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE
|
||||
////
|
||||
|
||||
Thanks to the `?.` operator the following document will not throw an error.
|
||||
If the pipeline used a `.` the following document would throw a NullPointerException
|
||||
since the `network` object is not part of the source document.
|
||||
|
@ -392,28 +370,6 @@ POST test/_doc/3?pipeline=drop_guests_network
|
|||
// CONSOLE
|
||||
// TEST[continued]
|
||||
|
||||
////
|
||||
Hidden example assertion:
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
GET test/_doc/3
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[continued]
|
||||
// TEST[catch:missing]
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"_index": "test",
|
||||
"_type": "_doc",
|
||||
"_id": "3",
|
||||
"found": false
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE
|
||||
////
|
||||
|
||||
The `?.` operators works well for use in the `if` conditional
|
||||
because the {painless}/painless-operators-reference.html#null-safe-operator[null safe operator]
|
||||
returns null if the object is null and `==` is null safe (as well as many other
|
||||
|
@ -511,28 +467,6 @@ POST test/_doc/1?pipeline=not_prod_dropper
|
|||
The document is <<drop-processor,dropped>> since `prod` (case insensitive)
|
||||
is not found in the tags.
|
||||
|
||||
////
|
||||
Hidden example assertion:
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
GET test/_doc/1
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[continued]
|
||||
// TEST[catch:missing]
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"_index": "test",
|
||||
"_type": "_doc",
|
||||
"_id": "1",
|
||||
"found": false
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE
|
||||
////
|
||||
|
||||
The following document is indexed (i.e. not dropped) since
|
||||
`prod` (case insensitive) is found in the tags.
|
||||
|
||||
|
|
|
@ -91,4 +91,4 @@ teardown:
|
|||
get:
|
||||
index: test
|
||||
id: 3
|
||||
- match: { found: false }
|
||||
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
---
|
||||
teardown:
|
||||
- do:
|
||||
ingest.delete_pipeline:
|
||||
id: "retarget"
|
||||
ignore: 404
|
||||
|
||||
- do:
|
||||
indices.delete:
|
||||
index: foo
|
||||
|
||||
---
|
||||
"Test Change Target Index with Explicit Pipeline":
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "retarget"
|
||||
body: >
|
||||
{
|
||||
"processors": [
|
||||
{
|
||||
"set" : {
|
||||
"field" : "_index",
|
||||
"value" : "foo"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
# no indices
|
||||
- do:
|
||||
cat.indices: {}
|
||||
|
||||
- match:
|
||||
$body: |
|
||||
/^$/
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
id: 1
|
||||
pipeline: "retarget"
|
||||
body: {
|
||||
a: true
|
||||
}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: foo
|
||||
id: 1
|
||||
- match: { _source.a: true }
|
||||
|
||||
# only the foo index
|
||||
- do:
|
||||
cat.indices:
|
||||
h: i
|
||||
|
||||
- match:
|
||||
$body: |
|
||||
/^foo\n$/
|
||||
|
||||
---
|
||||
"Test Change Target Index with Default Pipeline":
|
||||
|
||||
- do:
|
||||
indices.put_template:
|
||||
name: index_template
|
||||
body:
|
||||
index_patterns: test
|
||||
settings:
|
||||
default_pipeline: "retarget"
|
||||
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "retarget"
|
||||
body: >
|
||||
{
|
||||
"processors": [
|
||||
{
|
||||
"set" : {
|
||||
"field" : "_index",
|
||||
"value" : "foo"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
# no indices
|
||||
- do:
|
||||
cat.indices: {}
|
||||
|
||||
- match:
|
||||
$body: |
|
||||
/^$/
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
id: 1
|
||||
body: {
|
||||
a: true
|
||||
}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: foo
|
||||
id: 1
|
||||
- match: { _source.a: true }
|
||||
|
||||
# only the foo index
|
||||
- do:
|
||||
cat.indices:
|
||||
h: i
|
||||
|
||||
- match:
|
||||
$body: |
|
||||
/^foo\n$/
|
|
@ -47,11 +47,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|||
import org.elasticsearch.cluster.metadata.AliasOrIndex;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
|
@ -151,6 +154,72 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
final long startTime = relativeTime();
|
||||
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
|
||||
|
||||
boolean hasIndexRequestsWithPipelines = false;
|
||||
final MetaData metaData = clusterService.state().getMetaData();
|
||||
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
|
||||
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
|
||||
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
|
||||
if (indexRequest != null) {
|
||||
// get pipeline from request
|
||||
String pipeline = indexRequest.getPipeline();
|
||||
if (pipeline == null) {
|
||||
// start to look for default pipeline via settings found in the index meta data
|
||||
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
|
||||
if (indexMetaData == null && indexRequest.index() != null) {
|
||||
// if the write request if through an alias use the write index's meta data
|
||||
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
|
||||
if (indexOrAlias != null && indexOrAlias.isAlias()) {
|
||||
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
|
||||
indexMetaData = alias.getWriteIndex();
|
||||
}
|
||||
}
|
||||
if (indexMetaData != null) {
|
||||
// Find the the default pipeline if one is defined from and existing index.
|
||||
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
|
||||
indexRequest.setPipeline(defaultPipeline);
|
||||
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
|
||||
hasIndexRequestsWithPipelines = true;
|
||||
}
|
||||
} else if (indexRequest.index() != null) {
|
||||
// No index exists yet (and is valid request), so matching index templates to look for a default pipeline
|
||||
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
|
||||
assert (templates != null);
|
||||
String defaultPipeline = IngestService.NOOP_PIPELINE_NAME;
|
||||
// order of templates are highest order first, break if we find a default_pipeline
|
||||
for (IndexTemplateMetaData template : templates) {
|
||||
final Settings settings = template.settings();
|
||||
if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
|
||||
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
|
||||
break;
|
||||
}
|
||||
}
|
||||
indexRequest.setPipeline(defaultPipeline);
|
||||
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
|
||||
hasIndexRequestsWithPipelines = true;
|
||||
}
|
||||
}
|
||||
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
|
||||
hasIndexRequestsWithPipelines = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hasIndexRequestsWithPipelines) {
|
||||
// this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
|
||||
// also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
|
||||
// this path is never taken.
|
||||
try {
|
||||
if (clusterService.localNode().isIngestNode()) {
|
||||
processBulkIndexIngestRequest(task, bulkRequest, listener);
|
||||
} else {
|
||||
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (needToCheck()) {
|
||||
// Attempt to create all the indices that we're going to need during the bulk before we start.
|
||||
// Step 1: collect all the indices in the request
|
||||
|
@ -181,7 +250,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
}
|
||||
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
|
||||
if (autoCreateIndices.isEmpty()) {
|
||||
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
|
||||
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
|
||||
} else {
|
||||
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
|
||||
for (String index : autoCreateIndices) {
|
||||
|
@ -189,7 +258,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
@Override
|
||||
public void onResponse(CreateIndexResponse result) {
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
|
||||
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -205,7 +274,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
}
|
||||
}
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
executeIngestAndBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
|
||||
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
|
||||
inner.addSuppressed(e);
|
||||
listener.onFailure(inner);
|
||||
}), responses, indicesThatCannotBeCreated);
|
||||
|
@ -215,56 +284,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
}
|
||||
}
|
||||
} else {
|
||||
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
|
||||
}
|
||||
}
|
||||
|
||||
private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos,
|
||||
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
|
||||
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
|
||||
boolean hasIndexRequestsWithPipelines = false;
|
||||
final MetaData metaData = clusterService.state().getMetaData();
|
||||
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
|
||||
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
|
||||
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
|
||||
if(indexRequest != null){
|
||||
String pipeline = indexRequest.getPipeline();
|
||||
if (pipeline == null) {
|
||||
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
|
||||
if (indexMetaData == null && indexRequest.index() != null) {
|
||||
//check the alias
|
||||
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
|
||||
if (indexOrAlias != null && indexOrAlias.isAlias()) {
|
||||
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
|
||||
indexMetaData = alias.getWriteIndex();
|
||||
}
|
||||
}
|
||||
if (indexMetaData == null) {
|
||||
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
|
||||
} else {
|
||||
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
|
||||
indexRequest.setPipeline(defaultPipeline);
|
||||
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
|
||||
hasIndexRequestsWithPipelines = true;
|
||||
}
|
||||
}
|
||||
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
|
||||
hasIndexRequestsWithPipelines = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (hasIndexRequestsWithPipelines) {
|
||||
try {
|
||||
if (clusterService.localNode().isIngestNode()) {
|
||||
processBulkIndexIngestRequest(task, bulkRequest, listener);
|
||||
} else {
|
||||
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
} else {
|
||||
executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
|
||||
executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.cluster.ClusterStateApplier;
|
|||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -57,6 +58,7 @@ import org.mockito.ArgumentCaptor;
|
|||
import org.mockito.Captor;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
@ -460,7 +462,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
|||
verifyZeroInteractions(transportService);
|
||||
}
|
||||
|
||||
public void testCreateIndexBeforeRunPipeline() throws Exception {
|
||||
public void testDoExecuteCalledTwiceCorrectly() throws Exception {
|
||||
Exception exception = new Exception("fake exception");
|
||||
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
|
||||
indexRequest.setPipeline("testpipeline");
|
||||
|
@ -478,20 +480,76 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
|||
|
||||
// check failure works, and passes through to the listener
|
||||
assertFalse(action.isExecuted); // haven't executed yet
|
||||
assertFalse(action.indexCreated); // no index yet
|
||||
assertFalse(responseCalled.get());
|
||||
assertFalse(failureCalled.get());
|
||||
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
|
||||
completionHandler.getValue().accept(exception);
|
||||
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
|
||||
assertTrue(failureCalled.get());
|
||||
|
||||
// now check success
|
||||
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
|
||||
completionHandler.getValue().accept(null);
|
||||
assertTrue(action.isExecuted);
|
||||
assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path.
|
||||
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
|
||||
verifyZeroInteractions(transportService);
|
||||
}
|
||||
|
||||
public void testNotFindDefaultPipelineFromTemplateMatches(){
|
||||
Exception exception = new Exception("fake exception");
|
||||
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
AtomicBoolean responseCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean failureCalled = new AtomicBoolean(false);
|
||||
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
|
||||
response -> responseCalled.set(true),
|
||||
e -> {
|
||||
assertThat(e, sameInstance(exception));
|
||||
failureCalled.set(true);
|
||||
}));
|
||||
assertEquals(IngestService.NOOP_PIPELINE_NAME, indexRequest.getPipeline());
|
||||
verifyZeroInteractions(ingestService);
|
||||
|
||||
}
|
||||
|
||||
public void testFindDefaultPipelineFromTemplateMatch(){
|
||||
Exception exception = new Exception("fake exception");
|
||||
ClusterState state = clusterService.state();
|
||||
|
||||
ImmutableOpenMap.Builder<String, IndexTemplateMetaData> templateMetaDataBuilder = ImmutableOpenMap.builder();
|
||||
templateMetaDataBuilder.put("template1", IndexTemplateMetaData.builder("template1").patterns(Arrays.asList("missing_index"))
|
||||
.order(1).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline1").build()).build());
|
||||
templateMetaDataBuilder.put("template2", IndexTemplateMetaData.builder("template2").patterns(Arrays.asList("missing_*"))
|
||||
.order(2).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline2").build()).build());
|
||||
templateMetaDataBuilder.put("template3", IndexTemplateMetaData.builder("template3").patterns(Arrays.asList("missing*"))
|
||||
.order(3).build());
|
||||
templateMetaDataBuilder.put("template4", IndexTemplateMetaData.builder("template4").patterns(Arrays.asList("nope"))
|
||||
.order(4).settings(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline4").build()).build());
|
||||
|
||||
MetaData metaData = mock(MetaData.class);
|
||||
when(state.metaData()).thenReturn(metaData);
|
||||
when(state.getMetaData()).thenReturn(metaData);
|
||||
when(metaData.templates()).thenReturn(templateMetaDataBuilder.build());
|
||||
when(metaData.getTemplates()).thenReturn(templateMetaDataBuilder.build());
|
||||
when(metaData.indices()).thenReturn(ImmutableOpenMap.of());
|
||||
|
||||
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
AtomicBoolean responseCalled = new AtomicBoolean(false);
|
||||
AtomicBoolean failureCalled = new AtomicBoolean(false);
|
||||
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
|
||||
response -> responseCalled.set(true),
|
||||
e -> {
|
||||
assertThat(e, sameInstance(exception));
|
||||
failureCalled.set(true);
|
||||
}));
|
||||
|
||||
assertEquals("pipeline2", indexRequest.getPipeline());
|
||||
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
|
||||
}
|
||||
|
||||
private void validateDefaultPipeline(IndexRequest indexRequest) {
|
||||
Exception exception = new Exception("fake exception");
|
||||
indexRequest.source(Collections.emptyMap());
|
||||
|
|
Loading…
Reference in New Issue