This commit is adding an ability to remove pipelines with wildcards.
This commit is contained in:
parent
5dae10db11
commit
f6b6e4e376
|
@ -41,9 +41,11 @@ import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public class PipelineStore extends AbstractComponent implements ClusterStateApplier {
|
public class PipelineStore extends AbstractComponent implements ClusterStateApplier {
|
||||||
|
|
||||||
|
@ -111,17 +113,26 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl
|
||||||
return currentState;
|
return currentState;
|
||||||
}
|
}
|
||||||
Map<String, PipelineConfiguration> pipelines = currentIngestMetadata.getPipelines();
|
Map<String, PipelineConfiguration> pipelines = currentIngestMetadata.getPipelines();
|
||||||
if (pipelines.containsKey(request.getId()) == false) {
|
Set<String> toRemove = new HashSet<>();
|
||||||
throw new ResourceNotFoundException("pipeline [{}] is missing", request.getId());
|
for (String pipelineKey : pipelines.keySet()) {
|
||||||
} else {
|
if (Regex.simpleMatch(request.getId(), pipelineKey)) {
|
||||||
pipelines = new HashMap<>(pipelines);
|
toRemove.add(pipelineKey);
|
||||||
pipelines.remove(request.getId());
|
}
|
||||||
ClusterState.Builder newState = ClusterState.builder(currentState);
|
|
||||||
newState.metaData(MetaData.builder(currentState.getMetaData())
|
|
||||||
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
|
|
||||||
.build());
|
|
||||||
return newState.build();
|
|
||||||
}
|
}
|
||||||
|
if (toRemove.isEmpty() && Regex.isMatchAllPattern(request.getId()) == false) {
|
||||||
|
throw new ResourceNotFoundException("pipeline [{}] is missing", request.getId());
|
||||||
|
} else if (toRemove.isEmpty()) {
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
|
final Map<String, PipelineConfiguration> pipelinesCopy = new HashMap<>(pipelines);
|
||||||
|
for (String key : toRemove) {
|
||||||
|
pipelinesCopy.remove(key);
|
||||||
|
}
|
||||||
|
ClusterState.Builder newState = ClusterState.builder(currentState);
|
||||||
|
newState.metaData(MetaData.builder(currentState.getMetaData())
|
||||||
|
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelinesCopy))
|
||||||
|
.build());
|
||||||
|
return newState.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -192,6 +192,77 @@ public class PipelineStoreTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDeleteUsingWildcard() {
|
||||||
|
HashMap<String, PipelineConfiguration> pipelines = new HashMap<>();
|
||||||
|
BytesArray definition = new BytesArray(
|
||||||
|
"{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"
|
||||||
|
);
|
||||||
|
pipelines.put("p1", new PipelineConfiguration("p1", definition));
|
||||||
|
pipelines.put("p2", new PipelineConfiguration("p2", definition));
|
||||||
|
pipelines.put("q1", new PipelineConfiguration("q1", definition));
|
||||||
|
IngestMetadata ingestMetadata = new IngestMetadata(pipelines);
|
||||||
|
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
|
||||||
|
ClusterState previousClusterState = clusterState;
|
||||||
|
clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder()
|
||||||
|
.putCustom(IngestMetadata.TYPE, ingestMetadata)).build();
|
||||||
|
store.innerUpdatePipelines(previousClusterState, clusterState);
|
||||||
|
assertThat(store.get("p1"), notNullValue());
|
||||||
|
assertThat(store.get("p2"), notNullValue());
|
||||||
|
assertThat(store.get("q1"), notNullValue());
|
||||||
|
|
||||||
|
// Delete pipeline matching wildcard
|
||||||
|
DeletePipelineRequest deleteRequest = new DeletePipelineRequest("p*");
|
||||||
|
previousClusterState = clusterState;
|
||||||
|
clusterState = store.innerDelete(deleteRequest, clusterState);
|
||||||
|
store.innerUpdatePipelines(previousClusterState, clusterState);
|
||||||
|
assertThat(store.get("p1"), nullValue());
|
||||||
|
assertThat(store.get("p2"), nullValue());
|
||||||
|
assertThat(store.get("q1"), notNullValue());
|
||||||
|
|
||||||
|
// Exception if we used name which does not exist
|
||||||
|
try {
|
||||||
|
store.innerDelete(new DeletePipelineRequest("unknown"), clusterState);
|
||||||
|
fail("exception expected");
|
||||||
|
} catch (ResourceNotFoundException e) {
|
||||||
|
assertThat(e.getMessage(), equalTo("pipeline [unknown] is missing"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// match all wildcard works on last remaining pipeline
|
||||||
|
DeletePipelineRequest matchAllDeleteRequest = new DeletePipelineRequest("*");
|
||||||
|
previousClusterState = clusterState;
|
||||||
|
clusterState = store.innerDelete(matchAllDeleteRequest, clusterState);
|
||||||
|
store.innerUpdatePipelines(previousClusterState, clusterState);
|
||||||
|
assertThat(store.get("p1"), nullValue());
|
||||||
|
assertThat(store.get("p2"), nullValue());
|
||||||
|
assertThat(store.get("q1"), nullValue());
|
||||||
|
|
||||||
|
// match all wildcard does not throw exception if none match
|
||||||
|
store.innerDelete(matchAllDeleteRequest, clusterState);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDeleteWithExistingUnmatchedPipelines() {
|
||||||
|
HashMap<String, PipelineConfiguration> pipelines = new HashMap<>();
|
||||||
|
BytesArray definition = new BytesArray(
|
||||||
|
"{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"
|
||||||
|
);
|
||||||
|
pipelines.put("p1", new PipelineConfiguration("p1", definition));
|
||||||
|
IngestMetadata ingestMetadata = new IngestMetadata(pipelines);
|
||||||
|
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
|
||||||
|
ClusterState previousClusterState = clusterState;
|
||||||
|
clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder()
|
||||||
|
.putCustom(IngestMetadata.TYPE, ingestMetadata)).build();
|
||||||
|
store.innerUpdatePipelines(previousClusterState, clusterState);
|
||||||
|
assertThat(store.get("p1"), notNullValue());
|
||||||
|
|
||||||
|
DeletePipelineRequest deleteRequest = new DeletePipelineRequest("z*");
|
||||||
|
try {
|
||||||
|
store.innerDelete(deleteRequest, clusterState);
|
||||||
|
fail("exception expected");
|
||||||
|
} catch (ResourceNotFoundException e) {
|
||||||
|
assertThat(e.getMessage(), equalTo("pipeline [z*] is missing"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testGetPipelines() {
|
public void testGetPipelines() {
|
||||||
Map<String, PipelineConfiguration> configs = new HashMap<>();
|
Map<String, PipelineConfiguration> configs = new HashMap<>();
|
||||||
configs.put("_id1", new PipelineConfiguration(
|
configs.put("_id1", new PipelineConfiguration(
|
||||||
|
|
|
@ -50,6 +50,46 @@ teardown:
|
||||||
catch: missing
|
catch: missing
|
||||||
ingest.get_pipeline:
|
ingest.get_pipeline:
|
||||||
id: "my_pipeline"
|
id: "my_pipeline"
|
||||||
|
---
|
||||||
|
"Test wildcard pipeline delete":
|
||||||
|
- do:
|
||||||
|
ingest.put_pipeline:
|
||||||
|
id: "my_pipeline"
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"description": "_description",
|
||||||
|
"processors": [
|
||||||
|
{
|
||||||
|
"set" : {
|
||||||
|
"field" : "field2",
|
||||||
|
"value": "_value"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
- match: { acknowledged: true }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
ingest.get_pipeline:
|
||||||
|
id: "my_pipeline"
|
||||||
|
- match: { my_pipeline.description: "_description" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
ingest.delete_pipeline:
|
||||||
|
id: "my_*"
|
||||||
|
- match: { acknowledged: true }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
catch: missing
|
||||||
|
ingest.get_pipeline:
|
||||||
|
id: "my_pipeline"
|
||||||
|
|
||||||
|
- do:
|
||||||
|
catch: missing
|
||||||
|
ingest.delete_pipeline:
|
||||||
|
id: "my_*"
|
||||||
|
- match: { "error.type": "resource_not_found_exception" }
|
||||||
|
- match: { "error.reason": "pipeline [my_*] is missing" }
|
||||||
|
|
||||||
---
|
---
|
||||||
"Test Get All Pipelines":
|
"Test Get All Pipelines":
|
||||||
|
|
Loading…
Reference in New Issue