diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java b/core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java index 32fade45e9d..b93eff73140 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java @@ -19,7 +19,9 @@ package org.elasticsearch.ingest; -import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.Diffable; +import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; @@ -39,7 +41,7 @@ import java.util.Map; /** * Holds the ingest pipelines that are available in the cluster */ -public final class IngestMetadata extends AbstractDiffable implements MetaData.Custom { +public final class IngestMetadata implements MetaData.Custom { public final static String TYPE = "ingest"; public final static IngestMetadata PROTO = new IngestMetadata(); @@ -50,7 +52,6 @@ public final class IngestMetadata extends AbstractDiffable impl INGEST_METADATA_PARSER.declareObjectArray(List::addAll , PipelineConfiguration.getParser(), PIPELINES_FIELD); } - // We can't use Pipeline class directly in cluster state, because we don't have the processor factories around when // IngestMetadata is registered as custom metadata. private final Map pipelines; @@ -73,7 +74,7 @@ public final class IngestMetadata extends AbstractDiffable impl } @Override - public MetaData.Custom readFrom(StreamInput in) throws IOException { + public IngestMetadata readFrom(StreamInput in) throws IOException { int size = in.readVInt(); Map pipelines = new HashMap<>(size); for (int i = 0; i < size; i++) { @@ -92,7 +93,7 @@ public final class IngestMetadata extends AbstractDiffable impl } @Override - public MetaData.Custom fromXContent(XContentParser parser) throws IOException { + public IngestMetadata fromXContent(XContentParser parser) throws IOException { Map pipelines = new HashMap<>(); List configs = INGEST_METADATA_PARSER.parse(parser); for (PipelineConfiguration pipeline : configs) { @@ -116,4 +117,52 @@ public final class IngestMetadata extends AbstractDiffable impl return MetaData.API_AND_GATEWAY; } + @Override + public Diff diff(MetaData.Custom before) { + return new IngestMetadataDiff((IngestMetadata) before, this); + } + + @Override + public Diff readDiffFrom(StreamInput in) throws IOException { + return new IngestMetadataDiff(in); + } + + static class IngestMetadataDiff implements Diff { + + final Diff> pipelines; + + IngestMetadataDiff(IngestMetadata before, IngestMetadata after) { + this.pipelines = DiffableUtils.diff(before.pipelines, after.pipelines, DiffableUtils.getStringKeySerializer()); + } + + public IngestMetadataDiff(StreamInput in) throws IOException { + pipelines = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), PipelineConfiguration.PROTOTYPE); + } + + @Override + public MetaData.Custom apply(MetaData.Custom part) { + return new IngestMetadata(pipelines.apply(((IngestMetadata) part).pipelines)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + pipelines.writeTo(out); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + IngestMetadata that = (IngestMetadata) o; + + return pipelines.equals(that.pipelines); + + } + + @Override + public int hashCode() { + return pipelines.hashCode(); + } } diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java b/core/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java index 3bd80edc306..99980f6d82d 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineConfiguration.java @@ -19,6 +19,9 @@ package org.elasticsearch.ingest; +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.Diffable; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -31,15 +34,20 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.function.BiFunction; /** * Encapsulates a pipeline's id and configuration as a blob */ -public final class PipelineConfiguration implements Writeable, ToXContent { +public final class PipelineConfiguration extends AbstractDiffable + implements Writeable, ToXContent { - private final static PipelineConfiguration PROTOTYPE = new PipelineConfiguration(null, null); + final static PipelineConfiguration PROTOTYPE = new PipelineConfiguration(null, null); public static PipelineConfiguration readPipelineConfiguration(StreamInput in) throws IOException { return PROTOTYPE.readFrom(in); @@ -61,6 +69,7 @@ public final class PipelineConfiguration implements Writeable customs = Collections.emptyMap(); void setId(String id) { this.id = id; @@ -113,4 +122,22 @@ public final class PipelineConfiguration implements Writeable pipelines = new HashMap<>(); + pipelines.put("1", new PipelineConfiguration("1", pipelineConfig)); + pipelines.put("2", new PipelineConfiguration("2", pipelineConfig)); + IngestMetadata ingestMetadata1 = new IngestMetadata(pipelines); + + pipelines = new HashMap<>(); + pipelines.put("1", new PipelineConfiguration("1", pipelineConfig)); + pipelines.put("3", new PipelineConfiguration("3", pipelineConfig)); + pipelines.put("4", new PipelineConfiguration("4", pipelineConfig)); + IngestMetadata ingestMetadata2 = new IngestMetadata(pipelines); + + IngestMetadata.IngestMetadataDiff diff = (IngestMetadata.IngestMetadataDiff) ingestMetadata2.diff(ingestMetadata1); + assertThat(((DiffableUtils.MapDiff)diff.pipelines).getDeletes().size(), equalTo(1)); + assertThat(((DiffableUtils.MapDiff)diff.pipelines).getDeletes().get(0), equalTo("2")); + assertThat(((DiffableUtils.MapDiff)diff.pipelines).getUpserts().size(), equalTo(2)); + assertThat(((DiffableUtils.MapDiff)diff.pipelines).getUpserts().containsKey("3"), is(true)); + assertThat(((DiffableUtils.MapDiff)diff.pipelines).getUpserts().containsKey("4"), is(true)); + + IngestMetadata endResult = (IngestMetadata) diff.apply(ingestMetadata2); + assertThat(endResult, not(equalTo(ingestMetadata1))); + assertThat(endResult.getPipelines().size(), equalTo(3)); + assertThat(endResult.getPipelines().get("1"), equalTo(new PipelineConfiguration("1", pipelineConfig))); + assertThat(endResult.getPipelines().get("3"), equalTo(new PipelineConfiguration("3", pipelineConfig))); + assertThat(endResult.getPipelines().get("4"), equalTo(new PipelineConfiguration("4", pipelineConfig))); + + pipelines = new HashMap<>(); + pipelines.put("1", new PipelineConfiguration("1", new BytesArray("{}"))); + pipelines.put("2", new PipelineConfiguration("2", new BytesArray("{}"))); + IngestMetadata ingestMetadata3 = new IngestMetadata(pipelines); + + diff = (IngestMetadata.IngestMetadataDiff) ingestMetadata3.diff(ingestMetadata1); + assertThat(((DiffableUtils.MapDiff)diff.pipelines).getDeletes().size(), equalTo(0)); + assertThat(((DiffableUtils.MapDiff)diff.pipelines).getUpserts().size(), equalTo(0)); + + endResult = (IngestMetadata) diff.apply(ingestMetadata3); + assertThat(endResult, equalTo(ingestMetadata1)); + assertThat(endResult.getPipelines().size(), equalTo(2)); + assertThat(endResult.getPipelines().get("1"), equalTo(new PipelineConfiguration("1", pipelineConfig))); + assertThat(endResult.getPipelines().get("2"), equalTo(new PipelineConfiguration("2", pipelineConfig))); + + pipelines = new HashMap<>(); + pipelines.put("1", new PipelineConfiguration("1", new BytesArray("{}"))); + pipelines.put("2", new PipelineConfiguration("2", new BytesArray("{\"key\" : \"value\"}"))); + IngestMetadata ingestMetadata4 = new IngestMetadata(pipelines); + + diff = (IngestMetadata.IngestMetadataDiff) ingestMetadata4.diff(ingestMetadata1); + assertThat(((DiffableUtils.MapDiff)diff.pipelines).getDiffs().size(), equalTo(1)); + assertThat(((DiffableUtils.MapDiff)diff.pipelines).getDiffs().containsKey("2"), is(true)); + + endResult = (IngestMetadata) diff.apply(ingestMetadata4); + assertThat(endResult, not(equalTo(ingestMetadata1))); + assertThat(endResult.getPipelines().size(), equalTo(2)); + assertThat(endResult.getPipelines().get("1"), equalTo(new PipelineConfiguration("1", pipelineConfig))); + assertThat(endResult.getPipelines().get("2"), equalTo(new PipelineConfiguration("2", new BytesArray("{\"key\" : \"value\"}")))); + } }