ingest: use diffs for ingest metadata

This commit is contained in:
Martijn van Groningen 2016-02-24 15:54:57 -08:00
parent d928352df5
commit a5c0f77f1d
3 changed files with 149 additions and 7 deletions

View File

@ -19,7 +19,9 @@
package org.elasticsearch.ingest;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
@ -39,7 +41,7 @@ import java.util.Map;
/**
* Holds the ingest pipelines that are available in the cluster
*/
public final class IngestMetadata extends AbstractDiffable<MetaData.Custom> implements MetaData.Custom {
public final class IngestMetadata implements MetaData.Custom {
public final static String TYPE = "ingest";
public final static IngestMetadata PROTO = new IngestMetadata();
@ -50,7 +52,6 @@ public final class IngestMetadata extends AbstractDiffable<MetaData.Custom> impl
INGEST_METADATA_PARSER.declareObjectArray(List::addAll , PipelineConfiguration.getParser(), PIPELINES_FIELD);
}
// We can't use Pipeline class directly in cluster state, because we don't have the processor factories around when
// IngestMetadata is registered as custom metadata.
private final Map<String, PipelineConfiguration> pipelines;
@ -73,7 +74,7 @@ public final class IngestMetadata extends AbstractDiffable<MetaData.Custom> impl
}
@Override
public MetaData.Custom readFrom(StreamInput in) throws IOException {
public IngestMetadata readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
Map<String, PipelineConfiguration> pipelines = new HashMap<>(size);
for (int i = 0; i < size; i++) {
@ -92,7 +93,7 @@ public final class IngestMetadata extends AbstractDiffable<MetaData.Custom> impl
}
@Override
public MetaData.Custom fromXContent(XContentParser parser) throws IOException {
public IngestMetadata fromXContent(XContentParser parser) throws IOException {
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
List<PipelineConfiguration> configs = INGEST_METADATA_PARSER.parse(parser);
for (PipelineConfiguration pipeline : configs) {
@ -116,4 +117,52 @@ public final class IngestMetadata extends AbstractDiffable<MetaData.Custom> impl
return MetaData.API_AND_GATEWAY;
}
@Override
public Diff<MetaData.Custom> diff(MetaData.Custom before) {
return new IngestMetadataDiff((IngestMetadata) before, this);
}
@Override
public Diff<MetaData.Custom> readDiffFrom(StreamInput in) throws IOException {
return new IngestMetadataDiff(in);
}
static class IngestMetadataDiff implements Diff<MetaData.Custom> {
final Diff<Map<String, PipelineConfiguration>> pipelines;
IngestMetadataDiff(IngestMetadata before, IngestMetadata after) {
this.pipelines = DiffableUtils.diff(before.pipelines, after.pipelines, DiffableUtils.getStringKeySerializer());
}
public IngestMetadataDiff(StreamInput in) throws IOException {
pipelines = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), PipelineConfiguration.PROTOTYPE);
}
@Override
public MetaData.Custom apply(MetaData.Custom part) {
return new IngestMetadata(pipelines.apply(((IngestMetadata) part).pipelines));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
pipelines.writeTo(out);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IngestMetadata that = (IngestMetadata) o;
return pipelines.equals(that.pipelines);
}
@Override
public int hashCode() {
return pipelines.hashCode();
}
}

View File

@ -19,6 +19,9 @@
package org.elasticsearch.ingest;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -31,15 +34,20 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
/**
* Encapsulates a pipeline's id and configuration as a blob
*/
public final class PipelineConfiguration implements Writeable<PipelineConfiguration>, ToXContent {
public final class PipelineConfiguration extends AbstractDiffable<PipelineConfiguration>
implements Writeable<PipelineConfiguration>, ToXContent {
private final static PipelineConfiguration PROTOTYPE = new PipelineConfiguration(null, null);
final static PipelineConfiguration PROTOTYPE = new PipelineConfiguration(null, null);
public static PipelineConfiguration readPipelineConfiguration(StreamInput in) throws IOException {
return PROTOTYPE.readFrom(in);
@ -61,6 +69,7 @@ public final class PipelineConfiguration implements Writeable<PipelineConfigurat
private String id;
private BytesReference config;
private Map<String, MetaData.Custom> customs = Collections.emptyMap();
void setId(String id) {
this.id = id;
@ -113,4 +122,22 @@ public final class PipelineConfiguration implements Writeable<PipelineConfigurat
out.writeBytesReference(config);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PipelineConfiguration that = (PipelineConfiguration) o;
if (!id.equals(that.id)) return false;
return config.equals(that.config);
}
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + config.hashCode();
return result;
}
}

View File

@ -19,8 +19,11 @@
package org.elasticsearch.ingest;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -32,6 +35,10 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
public class IngestMetadataTests extends ESTestCase {
public void testFromXContent() throws IOException {
@ -61,4 +68,63 @@ public class IngestMetadataTests extends ESTestCase {
assertEquals(pipeline.getConfigAsMap(), m.getPipelines().get("1").getConfigAsMap());
assertEquals(pipeline2.getConfigAsMap(), m.getPipelines().get("2").getConfigAsMap());
}
public void testDiff() throws Exception {
BytesReference pipelineConfig = new BytesArray("{}");
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
pipelines.put("1", new PipelineConfiguration("1", pipelineConfig));
pipelines.put("2", new PipelineConfiguration("2", pipelineConfig));
IngestMetadata ingestMetadata1 = new IngestMetadata(pipelines);
pipelines = new HashMap<>();
pipelines.put("1", new PipelineConfiguration("1", pipelineConfig));
pipelines.put("3", new PipelineConfiguration("3", pipelineConfig));
pipelines.put("4", new PipelineConfiguration("4", pipelineConfig));
IngestMetadata ingestMetadata2 = new IngestMetadata(pipelines);
IngestMetadata.IngestMetadataDiff diff = (IngestMetadata.IngestMetadataDiff) ingestMetadata2.diff(ingestMetadata1);
assertThat(((DiffableUtils.MapDiff)diff.pipelines).getDeletes().size(), equalTo(1));
assertThat(((DiffableUtils.MapDiff)diff.pipelines).getDeletes().get(0), equalTo("2"));
assertThat(((DiffableUtils.MapDiff)diff.pipelines).getUpserts().size(), equalTo(2));
assertThat(((DiffableUtils.MapDiff)diff.pipelines).getUpserts().containsKey("3"), is(true));
assertThat(((DiffableUtils.MapDiff)diff.pipelines).getUpserts().containsKey("4"), is(true));
IngestMetadata endResult = (IngestMetadata) diff.apply(ingestMetadata2);
assertThat(endResult, not(equalTo(ingestMetadata1)));
assertThat(endResult.getPipelines().size(), equalTo(3));
assertThat(endResult.getPipelines().get("1"), equalTo(new PipelineConfiguration("1", pipelineConfig)));
assertThat(endResult.getPipelines().get("3"), equalTo(new PipelineConfiguration("3", pipelineConfig)));
assertThat(endResult.getPipelines().get("4"), equalTo(new PipelineConfiguration("4", pipelineConfig)));
pipelines = new HashMap<>();
pipelines.put("1", new PipelineConfiguration("1", new BytesArray("{}")));
pipelines.put("2", new PipelineConfiguration("2", new BytesArray("{}")));
IngestMetadata ingestMetadata3 = new IngestMetadata(pipelines);
diff = (IngestMetadata.IngestMetadataDiff) ingestMetadata3.diff(ingestMetadata1);
assertThat(((DiffableUtils.MapDiff)diff.pipelines).getDeletes().size(), equalTo(0));
assertThat(((DiffableUtils.MapDiff)diff.pipelines).getUpserts().size(), equalTo(0));
endResult = (IngestMetadata) diff.apply(ingestMetadata3);
assertThat(endResult, equalTo(ingestMetadata1));
assertThat(endResult.getPipelines().size(), equalTo(2));
assertThat(endResult.getPipelines().get("1"), equalTo(new PipelineConfiguration("1", pipelineConfig)));
assertThat(endResult.getPipelines().get("2"), equalTo(new PipelineConfiguration("2", pipelineConfig)));
pipelines = new HashMap<>();
pipelines.put("1", new PipelineConfiguration("1", new BytesArray("{}")));
pipelines.put("2", new PipelineConfiguration("2", new BytesArray("{\"key\" : \"value\"}")));
IngestMetadata ingestMetadata4 = new IngestMetadata(pipelines);
diff = (IngestMetadata.IngestMetadataDiff) ingestMetadata4.diff(ingestMetadata1);
assertThat(((DiffableUtils.MapDiff)diff.pipelines).getDiffs().size(), equalTo(1));
assertThat(((DiffableUtils.MapDiff)diff.pipelines).getDiffs().containsKey("2"), is(true));
endResult = (IngestMetadata) diff.apply(ingestMetadata4);
assertThat(endResult, not(equalTo(ingestMetadata1)));
assertThat(endResult.getPipelines().size(), equalTo(2));
assertThat(endResult.getPipelines().get("1"), equalTo(new PipelineConfiguration("1", pipelineConfig)));
assertThat(endResult.getPipelines().get("2"), equalTo(new PipelineConfiguration("2", new BytesArray("{\"key\" : \"value\"}"))));
}
}