From 342665300b7b2f7eee16d6b2dae800294510bba9 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 4 Dec 2015 11:24:22 +0100 Subject: [PATCH] Add capabilities for serializing map-based cluster state diffs - Supports ImmutableOpenIntMap besides java.util.Map and ImmutableOpenMap - Map keys can be any value (not only String) - Map values do not have to implement Diffable interface. In that case custom value serializer needs to be provided. --- .../java/org/elasticsearch/cluster/Diff.java | 2 +- .../elasticsearch/cluster/DiffableUtils.java | 601 ++++++++++++++---- .../cluster/routing/RoutingTable.java | 4 +- .../cluster/serialization/DiffableTests.java | 431 +++++++++++-- .../zen/NodeJoinControllerTests.java | 6 +- 5 files changed, 876 insertions(+), 168 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/Diff.java b/core/src/main/java/org/elasticsearch/cluster/Diff.java index 1a9fff246a9..76535a4b763 100644 --- a/core/src/main/java/org/elasticsearch/cluster/Diff.java +++ b/core/src/main/java/org/elasticsearch/cluster/Diff.java @@ -29,7 +29,7 @@ import java.io.IOException; public interface Diff { /** - * Applies difference to the specified part and retunrs the resulted part + * Applies difference to the specified part and returns the resulted part */ T apply(T part); diff --git a/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java b/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java index 84e0021ee00..1488f059437 100644 --- a/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java +++ b/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java @@ -19,263 +19,630 @@ package org.elasticsearch.cluster; +import com.carrotsearch.hppc.cursors.IntCursor; +import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public final class DiffableUtils { private DiffableUtils() { } + /** + * Returns a map key serializer for String keys + */ + public static KeySerializer getStringKeySerializer() { + return StringKeySerializer.INSTANCE; + } + + /** + * Returns a map key serializer for Integer keys. Encodes as Int. + */ + public static KeySerializer getIntKeySerializer() { + return IntKeySerializer.INSTANCE; + } + + /** + * Returns a map key serializer for Integer keys. Encodes as VInt. + */ + public static KeySerializer getVIntKeySerializer() { + return VIntKeySerializer.INSTANCE; + } + /** * Calculates diff between two ImmutableOpenMaps of Diffable objects */ - public static > Diff> diff(ImmutableOpenMap before, ImmutableOpenMap after) { + public static > MapDiff> diff(ImmutableOpenMap before, ImmutableOpenMap after, KeySerializer keySerializer) { assert after != null && before != null; - return new ImmutableOpenMapDiff<>(before, after); + return new ImmutableOpenMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance()); + } + + /** + * Calculates diff between two ImmutableOpenMaps of non-diffable objects + */ + public static MapDiff> diff(ImmutableOpenMap before, ImmutableOpenMap after, KeySerializer keySerializer, NonDiffableValueSerializer valueSerializer) { + assert after != null && before != null; + return new ImmutableOpenMapDiff<>(before, after, keySerializer, valueSerializer); + } + + /** + * Calculates diff between two ImmutableOpenIntMaps of Diffable objects + */ + public static > MapDiff> diff(ImmutableOpenIntMap before, ImmutableOpenIntMap after, KeySerializer keySerializer) { + assert after != null && before != null; + return new ImmutableOpenIntMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance()); + } + + /** + * Calculates diff between two ImmutableOpenIntMaps of non-diffable objects + */ + public static MapDiff> diff(ImmutableOpenIntMap before, ImmutableOpenIntMap after, KeySerializer keySerializer, NonDiffableValueSerializer valueSerializer) { + assert after != null && before != null; + return new ImmutableOpenIntMapDiff<>(before, after, keySerializer, valueSerializer); } /** * Calculates diff between two Maps of Diffable objects. */ - public static > Diff> diff(Map before, Map after) { + public static > MapDiff> diff(Map before, Map after, KeySerializer keySerializer) { assert after != null && before != null; - return new JdkMapDiff<>(before, after); + return new JdkMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance()); + } + + /** + * Calculates diff between two Maps of non-diffable objects + */ + public static MapDiff> diff(Map before, Map after, KeySerializer keySerializer, NonDiffableValueSerializer valueSerializer) { + assert after != null && before != null; + return new JdkMapDiff<>(before, after, keySerializer, valueSerializer); } /** * Loads an object that represents difference between two ImmutableOpenMaps */ - public static > Diff> readImmutableOpenMapDiff(StreamInput in, KeyedReader keyedReader) throws IOException { - return new ImmutableOpenMapDiff<>(in, keyedReader); - } - - /** - * Loads an object that represents difference between two Maps. - */ - public static > Diff> readJdkMapDiff(StreamInput in, KeyedReader keyedReader) throws IOException { - return new JdkMapDiff<>(in, keyedReader); + public static MapDiff> readImmutableOpenMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + return new ImmutableOpenMapDiff<>(in, keySerializer, valueSerializer); } /** * Loads an object that represents difference between two ImmutableOpenMaps */ - public static > Diff> readImmutableOpenMapDiff(StreamInput in, T proto) throws IOException { - return new ImmutableOpenMapDiff<>(in, new PrototypeReader<>(proto)); + public static MapDiff> readImmutableOpenIntMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + return new ImmutableOpenIntMapDiff<>(in, keySerializer, valueSerializer); } /** - * Loads an object that represents difference between two Maps. + * Loads an object that represents difference between two Maps of Diffable objects */ - public static > Diff> readJdkMapDiff(StreamInput in, T proto) throws IOException { - return new JdkMapDiff<>(in, new PrototypeReader<>(proto)); + public static MapDiff> readJdkMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + return new JdkMapDiff<>(in, keySerializer, valueSerializer); } /** - * A reader that can deserialize an object. The reader can select the deserialization type based on the key. It's - * used in custom metadata deserialization. + * Loads an object that represents difference between two ImmutableOpenMaps of Diffable objects using Diffable proto object */ - public interface KeyedReader { - - /** - * reads an object of the type T from the stream input - */ - T readFrom(StreamInput in, String key) throws IOException; - - /** - * reads an object that respresents differences between two objects with the type T from the stream input - */ - Diff readDiffFrom(StreamInput in, String key) throws IOException; + public static > MapDiff> readImmutableOpenMapDiff(StreamInput in, KeySerializer keySerializer, T proto) throws IOException { + return new ImmutableOpenMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto)); } /** - * Implementation of the KeyedReader that is using a prototype object for reading operations - * - * Note: this implementation is ignoring the key. + * Loads an object that represents difference between two ImmutableOpenIntMaps of Diffable objects using Diffable proto object */ - public static class PrototypeReader> implements KeyedReader { - private T proto; - - public PrototypeReader(T proto) { - this.proto = proto; - } - - @Override - public T readFrom(StreamInput in, String key) throws IOException { - return proto.readFrom(in); - } - - @Override - public Diff readDiffFrom(StreamInput in, String key) throws IOException { - return proto.readDiffFrom(in); - } + public static > MapDiff> readImmutableOpenIntMapDiff(StreamInput in, KeySerializer keySerializer, T proto) throws IOException { + return new ImmutableOpenIntMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto)); } /** - * Represents differences between two Maps of Diffable objects. + * Loads an object that represents difference between two Maps of Diffable objects using Diffable proto object + */ + public static > MapDiff> readJdkMapDiff(StreamInput in, KeySerializer keySerializer, T proto) throws IOException { + return new JdkMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto)); + } + + /** + * Represents differences between two Maps of (possibly diffable) objects. * * @param the diffable object */ - private static class JdkMapDiff> extends MapDiff> { + private static class JdkMapDiff extends MapDiff> { - protected JdkMapDiff(StreamInput in, KeyedReader reader) throws IOException { - super(in, reader); + protected JdkMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + super(in, keySerializer, valueSerializer); } - public JdkMapDiff(Map before, Map after) { + public JdkMapDiff(Map before, Map after, + KeySerializer keySerializer, ValueSerializer valueSerializer) { + super(keySerializer, valueSerializer); assert after != null && before != null; - for (String key : before.keySet()) { + + for (K key : before.keySet()) { if (!after.containsKey(key)) { deletes.add(key); } } - for (Map.Entry partIter : after.entrySet()) { + + for (Map.Entry partIter : after.entrySet()) { T beforePart = before.get(partIter.getKey()); if (beforePart == null) { - adds.put(partIter.getKey(), partIter.getValue()); + upserts.put(partIter.getKey(), partIter.getValue()); } else if (partIter.getValue().equals(beforePart) == false) { - diffs.put(partIter.getKey(), partIter.getValue().diff(beforePart)); + if (valueSerializer.supportsDiffableValues()) { + diffs.put(partIter.getKey(), valueSerializer.diff(partIter.getValue(), beforePart)); + } else { + upserts.put(partIter.getKey(), partIter.getValue()); + } } } } @Override - public Map apply(Map map) { - Map builder = new HashMap<>(); + public Map apply(Map map) { + Map builder = new HashMap<>(); builder.putAll(map); - for (String part : deletes) { + for (K part : deletes) { builder.remove(part); } - for (Map.Entry> diff : diffs.entrySet()) { + for (Map.Entry> diff : diffs.entrySet()) { builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey()))); } - for (Map.Entry additon : adds.entrySet()) { - builder.put(additon.getKey(), additon.getValue()); + for (Map.Entry upsert : upserts.entrySet()) { + builder.put(upsert.getKey(), upsert.getValue()); } return builder; } } /** - * Represents differences between two ImmutableOpenMap of diffable objects + * Represents differences between two ImmutableOpenMap of (possibly diffable) objects * - * @param the diffable object + * @param the object type */ - private static class ImmutableOpenMapDiff> extends MapDiff> { + private static class ImmutableOpenMapDiff extends MapDiff> { - protected ImmutableOpenMapDiff(StreamInput in, KeyedReader reader) throws IOException { - super(in, reader); + protected ImmutableOpenMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + super(in, keySerializer, valueSerializer); } - public ImmutableOpenMapDiff(ImmutableOpenMap before, ImmutableOpenMap after) { + public ImmutableOpenMapDiff(ImmutableOpenMap before, ImmutableOpenMap after, + KeySerializer keySerializer, ValueSerializer valueSerializer) { + super(keySerializer, valueSerializer); assert after != null && before != null; - for (ObjectCursor key : before.keys()) { + + for (ObjectCursor key : before.keys()) { if (!after.containsKey(key.value)) { deletes.add(key.value); } } - for (ObjectObjectCursor partIter : after) { + + for (ObjectObjectCursor partIter : after) { T beforePart = before.get(partIter.key); if (beforePart == null) { - adds.put(partIter.key, partIter.value); + upserts.put(partIter.key, partIter.value); } else if (partIter.value.equals(beforePart) == false) { - diffs.put(partIter.key, partIter.value.diff(beforePart)); + if (valueSerializer.supportsDiffableValues()) { + diffs.put(partIter.key, valueSerializer.diff(partIter.value, beforePart)); + } else { + upserts.put(partIter.key, partIter.value); + } } } } @Override - public ImmutableOpenMap apply(ImmutableOpenMap map) { - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + public ImmutableOpenMap apply(ImmutableOpenMap map) { + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); builder.putAll(map); - for (String part : deletes) { + for (K part : deletes) { builder.remove(part); } - for (Map.Entry> diff : diffs.entrySet()) { + for (Map.Entry> diff : diffs.entrySet()) { builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey()))); } - for (Map.Entry additon : adds.entrySet()) { - builder.put(additon.getKey(), additon.getValue()); + for (Map.Entry upsert : upserts.entrySet()) { + builder.put(upsert.getKey(), upsert.getValue()); } return builder.build(); } } /** - * Represents differences between two maps of diffable objects + * Represents differences between two ImmutableOpenIntMap of (possibly diffable) objects * - * This class is used as base class for different map implementations - * - * @param the diffable object + * @param the object type */ - private static abstract class MapDiff, M> implements Diff { + private static class ImmutableOpenIntMapDiff extends MapDiff> { - protected final List deletes; - protected final Map> diffs; - protected final Map adds; - - protected MapDiff() { - deletes = new ArrayList<>(); - diffs = new HashMap<>(); - adds = new HashMap<>(); + protected ImmutableOpenIntMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + super(in, keySerializer, valueSerializer); } - protected MapDiff(StreamInput in, KeyedReader reader) throws IOException { + public ImmutableOpenIntMapDiff(ImmutableOpenIntMap before, ImmutableOpenIntMap after, + KeySerializer keySerializer, ValueSerializer valueSerializer) { + super(keySerializer, valueSerializer); + assert after != null && before != null; + + for (IntCursor key : before.keys()) { + if (!after.containsKey(key.value)) { + deletes.add(key.value); + } + } + + for (IntObjectCursor partIter : after) { + T beforePart = before.get(partIter.key); + if (beforePart == null) { + upserts.put(partIter.key, partIter.value); + } else if (partIter.value.equals(beforePart) == false) { + if (valueSerializer.supportsDiffableValues()) { + diffs.put(partIter.key, valueSerializer.diff(partIter.value, beforePart)); + } else { + upserts.put(partIter.key, partIter.value); + } + } + } + } + + @Override + public ImmutableOpenIntMap apply(ImmutableOpenIntMap map) { + ImmutableOpenIntMap.Builder builder = ImmutableOpenIntMap.builder(); + builder.putAll(map); + + for (Integer part : deletes) { + builder.remove(part); + } + + for (Map.Entry> diff : diffs.entrySet()) { + builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey()))); + } + + for (Map.Entry upsert : upserts.entrySet()) { + builder.put(upsert.getKey(), upsert.getValue()); + } + return builder.build(); + } + } + + /** + * Represents differences between two maps of objects and is used as base class for different map implementations. + * + * Implements serialization. How differences are applied is left to subclasses. + * + * @param the type of map keys + * @param the type of map values + * @param the map implementation type + */ + public static abstract class MapDiff implements Diff { + + protected final List deletes; + protected final Map> diffs; // incremental updates + protected final Map upserts; // additions or full updates + protected final KeySerializer keySerializer; + protected final ValueSerializer valueSerializer; + + protected MapDiff(KeySerializer keySerializer, ValueSerializer valueSerializer) { + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; deletes = new ArrayList<>(); diffs = new HashMap<>(); - adds = new HashMap<>(); + upserts = new HashMap<>(); + } + + protected MapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + deletes = new ArrayList<>(); + diffs = new HashMap<>(); + upserts = new HashMap<>(); int deletesCount = in.readVInt(); for (int i = 0; i < deletesCount; i++) { - deletes.add(in.readString()); + deletes.add(keySerializer.readKey(in)); } - int diffsCount = in.readVInt(); for (int i = 0; i < diffsCount; i++) { - String key = in.readString(); - Diff diff = reader.readDiffFrom(in, key); + K key = keySerializer.readKey(in); + Diff diff = valueSerializer.readDiff(in, key); diffs.put(key, diff); } - - int addsCount = in.readVInt(); - for (int i = 0; i < addsCount; i++) { - String key = in.readString(); - T part = reader.readFrom(in, key); - adds.put(key, part); + int upsertsCount = in.readVInt(); + for (int i = 0; i < upsertsCount; i++) { + K key = keySerializer.readKey(in); + T newValue = valueSerializer.read(in, key); + upserts.put(key, newValue); } } + + /** + * The keys that, when this diff is applied to a map, should be removed from the map. + * + * @return the list of keys that are deleted + */ + public List getDeletes() { + return deletes; + } + + /** + * Map entries that, when this diff is applied to a map, should be + * incrementally updated. The incremental update is represented using + * the {@link Diff} interface. + * + * @return the map entries that are incrementally updated + */ + public Map> getDiffs() { + return diffs; + } + + /** + * Map entries that, when this diff is applied to a map, should be + * added to the map or fully replace the previous value. + * + * @return the map entries that are additions or full updates + */ + public Map getUpserts() { + return upserts; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(deletes.size()); - for (String delete : deletes) { - out.writeString(delete); + for (K delete : deletes) { + keySerializer.writeKey(delete, out); } - out.writeVInt(diffs.size()); - for (Map.Entry> entry : diffs.entrySet()) { - out.writeString(entry.getKey()); - entry.getValue().writeTo(out); + for (Map.Entry> entry : diffs.entrySet()) { + keySerializer.writeKey(entry.getKey(), out); + valueSerializer.writeDiff(entry.getValue(), out); } - - out.writeVInt(adds.size()); - for (Map.Entry entry : adds.entrySet()) { - out.writeString(entry.getKey()); - entry.getValue().writeTo(out); + out.writeVInt(upserts.size()); + for (Map.Entry entry : upserts.entrySet()) { + keySerializer.writeKey(entry.getKey(), out); + valueSerializer.write(entry.getValue(), out); } } } + + /** + * Provides read and write operations to serialize keys of map + * @param type of key + */ + public interface KeySerializer { + void writeKey(K key, StreamOutput out) throws IOException; + K readKey(StreamInput in) throws IOException; + } + + /** + * Serializes String keys of a map + */ + private static final class StringKeySerializer implements KeySerializer { + private static final StringKeySerializer INSTANCE = new StringKeySerializer(); + + @Override + public void writeKey(String key, StreamOutput out) throws IOException { + out.writeString(key); + } + + @Override + public String readKey(StreamInput in) throws IOException { + return in.readString(); + } + } + + /** + * Serializes Integer keys of a map as an Int + */ + private static final class IntKeySerializer implements KeySerializer { + public static final IntKeySerializer INSTANCE = new IntKeySerializer(); + + @Override + public void writeKey(Integer key, StreamOutput out) throws IOException { + out.writeInt(key); + } + + @Override + public Integer readKey(StreamInput in) throws IOException { + return in.readInt(); + } + } + + /** + * Serializes Integer keys of a map as a VInt. Requires keys to be positive. + */ + private static final class VIntKeySerializer implements KeySerializer { + public static final IntKeySerializer INSTANCE = new IntKeySerializer(); + + @Override + public void writeKey(Integer key, StreamOutput out) throws IOException { + if (key < 0) { + throw new IllegalArgumentException("Map key [" + key + "] must be positive"); + } + out.writeVInt(key); + } + + @Override + public Integer readKey(StreamInput in) throws IOException { + return in.readVInt(); + } + } + + /** + * Provides read and write operations to serialize map values. + * Reading of values can be made dependent on map key. + * + * Also provides operations to distinguish whether map values are diffable. + * + * Should not be directly implemented, instead implement either + * {@link DiffableValueSerializer} or {@link NonDiffableValueSerializer}. + * + * @param key type of map + * @param value type of map + */ + public interface ValueSerializer { + + /** + * Writes value to stream + */ + void write(V value, StreamOutput out) throws IOException; + + /** + * Reads value from stream. Reading operation can be made dependent on map key. + */ + V read(StreamInput in, K key) throws IOException; + + /** + * Whether this serializer supports diffable values + */ + boolean supportsDiffableValues(); + + /** + * Computes diff if this serializer supports diffable values + */ + Diff diff(V value, V beforePart); + + /** + * Writes value as diff to stream if this serializer supports diffable values + */ + void writeDiff(Diff value, StreamOutput out) throws IOException; + + /** + * Reads value as diff from stream if this serializer supports diffable values. + * Reading operation can be made dependent on map key. + */ + Diff readDiff(StreamInput in, K key) throws IOException; + } + + /** + * Serializer for Diffable map values. Needs to implement read and readDiff methods. + * + * @param type of map keys + * @param type of map values + */ + public static abstract class DiffableValueSerializer> implements ValueSerializer { + private static final DiffableValueSerializer WRITE_ONLY_INSTANCE = new DiffableValueSerializer() { + @Override + public Object read(StreamInput in, Object key) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Diff readDiff(StreamInput in, Object key) throws IOException { + throw new UnsupportedOperationException(); + } + }; + + private static > DiffableValueSerializer getWriteOnlyInstance() { + return WRITE_ONLY_INSTANCE; + } + + @Override + public boolean supportsDiffableValues() { + return true; + } + + @Override + public Diff diff(V value, V beforePart) { + return value.diff(beforePart); + } + + @Override + public void write(V value, StreamOutput out) throws IOException { + value.writeTo(out); + } + + public void writeDiff(Diff value, StreamOutput out) throws IOException { + value.writeTo(out); + } + } + + /** + * Serializer for non-diffable map values + * + * @param type of map keys + * @param type of map values + */ + public static abstract class NonDiffableValueSerializer implements ValueSerializer { + @Override + public boolean supportsDiffableValues() { + return false; + } + + @Override + public Diff diff(V value, V beforePart) { + throw new UnsupportedOperationException(); + } + + @Override + public void writeDiff(Diff value, StreamOutput out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Diff readDiff(StreamInput in, K key) throws IOException { + throw new UnsupportedOperationException(); + } + } + + /** + * Implementation of the ValueSerializer that uses a prototype object for reading operations + * + * Note: this implementation is ignoring the key. + */ + public static class DiffablePrototypeValueReader> extends DiffableValueSerializer { + private final V proto; + + public DiffablePrototypeValueReader(V proto) { + this.proto = proto; + } + + @Override + public V read(StreamInput in, K key) throws IOException { + return proto.readFrom(in); + } + + @Override + public Diff readDiff(StreamInput in, K key) throws IOException { + return proto.readDiffFrom(in); + } + } + + /** + * Implementation of ValueSerializer that serializes immutable sets + * + * @param type of map key + */ + public static class StringSetValueSerializer extends NonDiffableValueSerializer> { + private static final StringSetValueSerializer INSTANCE = new StringSetValueSerializer(); + + public static StringSetValueSerializer getInstance() { + return INSTANCE; + } + + @Override + public void write(Set value, StreamOutput out) throws IOException { + out.writeStringArray(value.toArray(new String[value.size()])); + } + + @Override + public Set read(StreamInput in, K key) throws IOException { + return Collections.unmodifiableSet(new HashSet<>(Arrays.asList(in.readStringArray()))); + } + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 1a582e63ad2..c210539bc58 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -314,12 +314,12 @@ public class RoutingTable implements Iterable, Diffable before = new HashMap<>(); - before.put("foo", new TestDiffable("1")); - before.put("bar", new TestDiffable("2")); - before.put("baz", new TestDiffable("3")); - before = unmodifiableMap(before); - Map map = new HashMap<>(); - map.putAll(before); - map.remove("bar"); - map.put("baz", new TestDiffable("4")); - map.put("new", new TestDiffable("5")); - Map after = unmodifiableMap(new HashMap<>(map)); - Diff diff = DiffableUtils.diff(before, after); - BytesStreamOutput out = new BytesStreamOutput(); - diff.writeTo(out); - StreamInput in = StreamInput.wrap(out.bytes()); - Map serialized = DiffableUtils.readJdkMapDiff(in, TestDiffable.PROTO).apply(before); - assertThat(serialized.size(), equalTo(3)); - assertThat(serialized.get("foo").value(), equalTo("1")); - assertThat(serialized.get("baz").value(), equalTo("4")); - assertThat(serialized.get("new").value(), equalTo("5")); + + public void testJKDMapDiff() throws IOException { + new JdkMapDriver() { + @Override + protected boolean diffableValues() { + return true; + } + + @Override + protected TestDiffable createValue(Integer key, boolean before) { + return new TestDiffable(String.valueOf(before ? key : key + 1)); + } + + @Override + protected MapDiff diff(Map before, Map after) { + return DiffableUtils.diff(before, after, keySerializer); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return useProtoForDiffableSerialization + ? DiffableUtils.readJdkMapDiff(in, keySerializer, TestDiffable.PROTO) + : DiffableUtils.readJdkMapDiff(in, keySerializer, diffableValueSerializer()); + } + }.execute(); + + new JdkMapDriver() { + @Override + protected boolean diffableValues() { + return false; + } + + @Override + protected String createValue(Integer key, boolean before) { + return String.valueOf(before ? key : key + 1); + } + + @Override + protected MapDiff diff(Map before, Map after) { + return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer()); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return DiffableUtils.readJdkMapDiff(in, keySerializer, nonDiffableValueSerializer()); + } + }.execute(); } public void testImmutableOpenMapDiff() throws IOException { - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); - builder.put("foo", new TestDiffable("1")); - builder.put("bar", new TestDiffable("2")); - builder.put("baz", new TestDiffable("3")); - ImmutableOpenMap before = builder.build(); - builder = ImmutableOpenMap.builder(before); - builder.remove("bar"); - builder.put("baz", new TestDiffable("4")); - builder.put("new", new TestDiffable("5")); - ImmutableOpenMap after = builder.build(); - Diff diff = DiffableUtils.diff(before, after); - BytesStreamOutput out = new BytesStreamOutput(); - diff.writeTo(out); - StreamInput in = StreamInput.wrap(out.bytes()); - ImmutableOpenMap serialized = DiffableUtils.readImmutableOpenMapDiff(in, new KeyedReader() { + new ImmutableOpenMapDriver() { @Override - public TestDiffable readFrom(StreamInput in, String key) throws IOException { + protected boolean diffableValues() { + return true; + } + + @Override + protected TestDiffable createValue(Integer key, boolean before) { + return new TestDiffable(String.valueOf(before ? key : key + 1)); + } + + @Override + protected MapDiff diff(ImmutableOpenMap before, ImmutableOpenMap after) { + return DiffableUtils.diff(before, after, keySerializer); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return useProtoForDiffableSerialization + ? DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, TestDiffable.PROTO) + : DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, diffableValueSerializer()); + } + }.execute(); + + new ImmutableOpenMapDriver() { + @Override + protected boolean diffableValues() { + return false; + } + + @Override + protected String createValue(Integer key, boolean before) { + return String.valueOf(before ? key : key + 1); + } + + @Override + protected MapDiff diff(ImmutableOpenMap before, ImmutableOpenMap after) { + return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer()); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, nonDiffableValueSerializer()); + } + }.execute(); + } + + public void testImmutableOpenIntMapDiff() throws IOException { + new ImmutableOpenIntMapDriver() { + @Override + protected boolean diffableValues() { + return true; + } + + @Override + protected TestDiffable createValue(Integer key, boolean before) { + return new TestDiffable(String.valueOf(before ? key : key + 1)); + } + + @Override + protected MapDiff diff(ImmutableOpenIntMap before, ImmutableOpenIntMap after) { + return DiffableUtils.diff(before, after, keySerializer); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return useProtoForDiffableSerialization + ? DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, TestDiffable.PROTO) + : DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, diffableValueSerializer()); + } + }.execute(); + + new ImmutableOpenIntMapDriver() { + @Override + protected boolean diffableValues() { + return false; + } + + @Override + protected String createValue(Integer key, boolean before) { + return String.valueOf(before ? key : key + 1); + } + + @Override + protected MapDiff diff(ImmutableOpenIntMap before, ImmutableOpenIntMap after) { + return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer()); + } + + @Override + protected MapDiff readDiff(StreamInput in) throws IOException { + return DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, nonDiffableValueSerializer()); + } + }.execute(); + } + + /** + * Class that abstracts over specific map implementation type and value kind (Diffable or not) + * @param map type + * @param value type + */ + public abstract class MapDriver { + protected final Set keys = randomPositiveIntSet(); + protected final Set keysToRemove = new HashSet<>(randomSubsetOf(randomInt(keys.size()), keys.toArray(new Integer[keys.size()]))); + protected final Set keysThatAreNotRemoved = Sets.difference(keys, keysToRemove); + protected final Set keysToOverride = new HashSet<>(randomSubsetOf(randomInt(keysThatAreNotRemoved.size()), + keysThatAreNotRemoved.toArray(new Integer[keysThatAreNotRemoved.size()]))); + protected final Set keysToAdd = Sets.difference(randomPositiveIntSet(), keys); // make sure keysToAdd does not contain elements in keys + protected final Set keysUnchanged = Sets.difference(keysThatAreNotRemoved, keysToOverride); + + protected final DiffableUtils.KeySerializer keySerializer = randomBoolean() + ? DiffableUtils.getIntKeySerializer() + : DiffableUtils.getVIntKeySerializer(); + + protected final boolean useProtoForDiffableSerialization = randomBoolean(); + + private Set randomPositiveIntSet() { + int maxSetSize = randomInt(6); + Set result = new HashSet<>(); + for (int i = 0; i < maxSetSize; i++) { + // due to duplicates, set size can be smaller than maxSetSize + result.add(randomIntBetween(0, 100)); + } + return result; + } + + /** + * whether we operate on {@link org.elasticsearch.cluster.Diffable} values + */ + protected abstract boolean diffableValues(); + + /** + * functions that determines value in "before" or "after" map based on key + */ + protected abstract V createValue(Integer key, boolean before); + + /** + * creates map based on JDK-based map + */ + protected abstract T createMap(Map values); + + /** + * calculates diff between two maps + */ + protected abstract MapDiff diff(T before, T after); + + /** + * reads diff of maps from stream + */ + protected abstract MapDiff readDiff(StreamInput in) throws IOException; + + /** + * gets element at key "key" in map "map" + */ + protected abstract V get(T map, Integer key); + + /** + * returns size of given map + */ + protected abstract int size(T map); + + /** + * executes the actual test + */ + public void execute() throws IOException { + logger.debug("Keys in 'before' map: {}", keys); + logger.debug("Keys to remove: {}", keysToRemove); + logger.debug("Keys to override: {}", keysToOverride); + logger.debug("Keys to add: {}", keysToAdd); + + logger.debug("--> creating 'before' map"); + Map before = new HashMap<>(); + for (Integer key : keys) { + before.put(key, createValue(key, true)); + } + T beforeMap = createMap(before); + + logger.debug("--> creating 'after' map"); + Map after = new HashMap<>(); + after.putAll(before); + for (Integer key : keysToRemove) { + after.remove(key); + } + for (Integer key : keysToOverride) { + after.put(key, createValue(key, false)); + } + for (Integer key : keysToAdd) { + after.put(key, createValue(key, false)); + } + T afterMap = createMap(unmodifiableMap(after)); + + MapDiff diffMap = diff(beforeMap, afterMap); + + // check properties of diffMap + assertThat(new HashSet(diffMap.getDeletes()), equalTo(keysToRemove)); + if (diffableValues()) { + assertThat(diffMap.getDiffs().keySet(), equalTo(keysToOverride)); + for (Integer key : keysToOverride) { + assertThat(diffMap.getDiffs().get(key).apply(get(beforeMap, key)), equalTo(get(afterMap, key))); + } + assertThat(diffMap.getUpserts().keySet(), equalTo(keysToAdd)); + for (Integer key : keysToAdd) { + assertThat(diffMap.getUpserts().get(key), equalTo(get(afterMap, key))); + } + } else { + assertThat(diffMap.getDiffs(), equalTo(emptyMap())); + Set keysToAddAndOverride = Sets.union(keysToAdd, keysToOverride); + assertThat(diffMap.getUpserts().keySet(), equalTo(keysToAddAndOverride)); + for (Integer key : keysToAddAndOverride) { + assertThat(diffMap.getUpserts().get(key), equalTo(get(afterMap, key))); + } + } + + if (randomBoolean()) { + logger.debug("--> serializing diff"); + BytesStreamOutput out = new BytesStreamOutput(); + diffMap.writeTo(out); + StreamInput in = StreamInput.wrap(out.bytes()); + logger.debug("--> reading diff back"); + diffMap = readDiff(in); + } + T appliedDiffMap = diffMap.apply(beforeMap); + + // check properties of appliedDiffMap + assertThat(size(appliedDiffMap), equalTo(keys.size() - keysToRemove.size() + keysToAdd.size())); + for (Integer key : keysToRemove) { + assertThat(get(appliedDiffMap, key), nullValue()); + } + for (Integer key : keysUnchanged) { + assertThat(get(appliedDiffMap, key), equalTo(get(beforeMap, key))); + } + for (Integer key : keysToOverride) { + assertThat(get(appliedDiffMap, key), not(equalTo(get(beforeMap, key)))); + assertThat(get(appliedDiffMap, key), equalTo(get(afterMap, key))); + } + for (Integer key : keysToAdd) { + assertThat(get(appliedDiffMap, key), equalTo(get(afterMap, key))); + } + } + } + + abstract class JdkMapDriver extends MapDriver, V> { + + @Override + protected Map createMap(Map values) { + return values; + } + + @Override + protected V get(Map map, Integer key) { + return map.get(key); + } + + @Override + protected int size(Map map) { + return map.size(); + } + } + + abstract class ImmutableOpenMapDriver extends MapDriver, V> { + + @Override + protected ImmutableOpenMap createMap(Map values) { + return ImmutableOpenMap.builder().putAll(values).build(); + } + + @Override + protected V get(ImmutableOpenMap map, Integer key) { + return map.get(key); + } + + @Override + protected int size(ImmutableOpenMap map) { + return map.size(); + } + } + + + abstract class ImmutableOpenIntMapDriver extends MapDriver, V> { + + @Override + protected ImmutableOpenIntMap createMap(Map values) { + return ImmutableOpenIntMap.builder().putAll(values).build(); + } + + @Override + protected V get(ImmutableOpenIntMap map, Integer key) { + return map.get(key); + } + + @Override + protected int size(ImmutableOpenIntMap map) { + return map.size(); + } + } + + private static DiffableUtils.DiffableValueSerializer diffableValueSerializer() { + return new DiffableUtils.DiffableValueSerializer() { + @Override + public TestDiffable read(StreamInput in, K key) throws IOException { return new TestDiffable(in.readString()); } @Override - public Diff readDiffFrom(StreamInput in, String key) throws IOException { + public Diff readDiff(StreamInput in, K key) throws IOException { return AbstractDiffable.readDiffFrom(new StreamableReader() { @Override public TestDiffable readFrom(StreamInput in) throws IOException { @@ -91,13 +406,23 @@ public class DiffableTests extends ESTestCase { } }, in); } - }).apply(before); - assertThat(serialized.size(), equalTo(3)); - assertThat(serialized.get("foo").value(), equalTo("1")); - assertThat(serialized.get("baz").value(), equalTo("4")); - assertThat(serialized.get("new").value(), equalTo("5")); - + }; } + + private static DiffableUtils.NonDiffableValueSerializer nonDiffableValueSerializer() { + return new DiffableUtils.NonDiffableValueSerializer() { + @Override + public void write(String value, StreamOutput out) throws IOException { + out.writeString(value); + } + + @Override + public String read(StreamInput in, K key) throws IOException { + return in.readString(); + } + }; + } + public static class TestDiffable extends AbstractDiffable { public static final TestDiffable PROTO = new TestDiffable(""); @@ -121,6 +446,22 @@ public class DiffableTests extends ESTestCase { public void writeTo(StreamOutput out) throws IOException { out.writeString(value); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TestDiffable that = (TestDiffable) o; + + return !(value != null ? !value.equals(that.value) : that.value != null); + + } + + @Override + public int hashCode() { + return value != null ? value.hashCode() : 0; + } } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index 76e6317c1fc..ea590756a8b 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -488,17 +488,17 @@ public class NodeJoinControllerTests extends ESTestCase { @Override public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards, boolean withReroute) { - return new RoutingAllocation.Result(false, clusterState.routingTable()); + return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } @Override public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List failedShards) { - return new RoutingAllocation.Result(false, clusterState.routingTable()); + return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } @Override protected RoutingAllocation.Result reroute(ClusterState clusterState, String reason, boolean debug) { - return new RoutingAllocation.Result(false, clusterState.routingTable()); + return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } }