Add capabilities for serializing map-based cluster state diffs

- Supports ImmutableOpenIntMap besides java.util.Map and ImmutableOpenMap
- Map keys can be any value (not only String)
- Map values do not have to implement Diffable interface. In that case custom value serializer needs to be provided.
This commit is contained in:
Yannick Welsch 2015-12-04 11:24:22 +01:00
parent 69fff60624
commit 342665300b
5 changed files with 876 additions and 168 deletions

View File

@ -29,7 +29,7 @@ import java.io.IOException;
public interface Diff<T> {
/**
* Applies difference to the specified part and retunrs the resulted part
* Applies difference to the specified part and returns the resulted part
*/
T apply(T part);

View File

@ -19,263 +19,630 @@
package org.elasticsearch.cluster;
import com.carrotsearch.hppc.cursors.IntCursor;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public final class DiffableUtils {
private DiffableUtils() {
}
/**
* Returns a map key serializer for String keys
*/
public static KeySerializer<String> getStringKeySerializer() {
return StringKeySerializer.INSTANCE;
}
/**
* Returns a map key serializer for Integer keys. Encodes as Int.
*/
public static KeySerializer<Integer> getIntKeySerializer() {
return IntKeySerializer.INSTANCE;
}
/**
* Returns a map key serializer for Integer keys. Encodes as VInt.
*/
public static KeySerializer<Integer> getVIntKeySerializer() {
return VIntKeySerializer.INSTANCE;
}
/**
* Calculates diff between two ImmutableOpenMaps of Diffable objects
*/
public static <T extends Diffable<T>> Diff<ImmutableOpenMap<String, T>> diff(ImmutableOpenMap<String, T> before, ImmutableOpenMap<String, T> after) {
public static <K, T extends Diffable<T>> MapDiff<K, T, ImmutableOpenMap<K, T>> diff(ImmutableOpenMap<K, T> before, ImmutableOpenMap<K, T> after, KeySerializer<K> keySerializer) {
assert after != null && before != null;
return new ImmutableOpenMapDiff<>(before, after);
return new ImmutableOpenMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance());
}
/**
* Calculates diff between two ImmutableOpenMaps of non-diffable objects
*/
public static <K, T> MapDiff<K, T, ImmutableOpenMap<K, T>> diff(ImmutableOpenMap<K, T> before, ImmutableOpenMap<K, T> after, KeySerializer<K> keySerializer, NonDiffableValueSerializer<K, T> valueSerializer) {
assert after != null && before != null;
return new ImmutableOpenMapDiff<>(before, after, keySerializer, valueSerializer);
}
/**
* Calculates diff between two ImmutableOpenIntMaps of Diffable objects
*/
public static <T extends Diffable<T>> MapDiff<Integer, T, ImmutableOpenIntMap<T>> diff(ImmutableOpenIntMap<T> before, ImmutableOpenIntMap<T> after, KeySerializer<Integer> keySerializer) {
assert after != null && before != null;
return new ImmutableOpenIntMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance());
}
/**
* Calculates diff between two ImmutableOpenIntMaps of non-diffable objects
*/
public static <T> MapDiff<Integer, T, ImmutableOpenIntMap<T>> diff(ImmutableOpenIntMap<T> before, ImmutableOpenIntMap<T> after, KeySerializer<Integer> keySerializer, NonDiffableValueSerializer<Integer, T> valueSerializer) {
assert after != null && before != null;
return new ImmutableOpenIntMapDiff<>(before, after, keySerializer, valueSerializer);
}
/**
* Calculates diff between two Maps of Diffable objects.
*/
public static <T extends Diffable<T>> Diff<Map<String, T>> diff(Map<String, T> before, Map<String, T> after) {
public static <K, T extends Diffable<T>> MapDiff<K, T, Map<K, T>> diff(Map<K, T> before, Map<K, T> after, KeySerializer<K> keySerializer) {
assert after != null && before != null;
return new JdkMapDiff<>(before, after);
return new JdkMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance());
}
/**
* Calculates diff between two Maps of non-diffable objects
*/
public static <K, T> MapDiff<K, T, Map<K, T>> diff(Map<K, T> before, Map<K, T> after, KeySerializer<K> keySerializer, NonDiffableValueSerializer<K, T> valueSerializer) {
assert after != null && before != null;
return new JdkMapDiff<>(before, after, keySerializer, valueSerializer);
}
/**
* Loads an object that represents difference between two ImmutableOpenMaps
*/
public static <T extends Diffable<T>> Diff<ImmutableOpenMap<String, T>> readImmutableOpenMapDiff(StreamInput in, KeyedReader<T> keyedReader) throws IOException {
return new ImmutableOpenMapDiff<>(in, keyedReader);
}
/**
* Loads an object that represents difference between two Maps.
*/
public static <T extends Diffable<T>> Diff<Map<String, T>> readJdkMapDiff(StreamInput in, KeyedReader<T> keyedReader) throws IOException {
return new JdkMapDiff<>(in, keyedReader);
public static <K, T> MapDiff<K, T, ImmutableOpenMap<K, T>> readImmutableOpenMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
return new ImmutableOpenMapDiff<>(in, keySerializer, valueSerializer);
}
/**
* Loads an object that represents difference between two ImmutableOpenMaps
*/
public static <T extends Diffable<T>> Diff<ImmutableOpenMap<String, T>> readImmutableOpenMapDiff(StreamInput in, T proto) throws IOException {
return new ImmutableOpenMapDiff<>(in, new PrototypeReader<>(proto));
public static <T> MapDiff<Integer, T, ImmutableOpenIntMap<T>> readImmutableOpenIntMapDiff(StreamInput in, KeySerializer<Integer> keySerializer, ValueSerializer<Integer, T> valueSerializer) throws IOException {
return new ImmutableOpenIntMapDiff<>(in, keySerializer, valueSerializer);
}
/**
* Loads an object that represents difference between two Maps.
* Loads an object that represents difference between two Maps of Diffable objects
*/
public static <T extends Diffable<T>> Diff<Map<String, T>> readJdkMapDiff(StreamInput in, T proto) throws IOException {
return new JdkMapDiff<>(in, new PrototypeReader<>(proto));
public static <K, T> MapDiff<K, T, Map<K, T>> readJdkMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
return new JdkMapDiff<>(in, keySerializer, valueSerializer);
}
/**
* A reader that can deserialize an object. The reader can select the deserialization type based on the key. It's
* used in custom metadata deserialization.
* Loads an object that represents difference between two ImmutableOpenMaps of Diffable objects using Diffable proto object
*/
public interface KeyedReader<T> {
/**
* reads an object of the type T from the stream input
*/
T readFrom(StreamInput in, String key) throws IOException;
/**
* reads an object that respresents differences between two objects with the type T from the stream input
*/
Diff<T> readDiffFrom(StreamInput in, String key) throws IOException;
public static <K, T extends Diffable<T>> MapDiff<K, T, ImmutableOpenMap<K, T>> readImmutableOpenMapDiff(StreamInput in, KeySerializer<K> keySerializer, T proto) throws IOException {
return new ImmutableOpenMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto));
}
/**
* Implementation of the KeyedReader that is using a prototype object for reading operations
*
* Note: this implementation is ignoring the key.
* Loads an object that represents difference between two ImmutableOpenIntMaps of Diffable objects using Diffable proto object
*/
public static class PrototypeReader<T extends Diffable<T>> implements KeyedReader<T> {
private T proto;
public PrototypeReader(T proto) {
this.proto = proto;
}
@Override
public T readFrom(StreamInput in, String key) throws IOException {
return proto.readFrom(in);
}
@Override
public Diff<T> readDiffFrom(StreamInput in, String key) throws IOException {
return proto.readDiffFrom(in);
}
public static <T extends Diffable<T>> MapDiff<Integer, T, ImmutableOpenIntMap<T>> readImmutableOpenIntMapDiff(StreamInput in, KeySerializer<Integer> keySerializer, T proto) throws IOException {
return new ImmutableOpenIntMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto));
}
/**
* Represents differences between two Maps of Diffable objects.
* Loads an object that represents difference between two Maps of Diffable objects using Diffable proto object
*/
public static <K, T extends Diffable<T>> MapDiff<K, T, Map<K, T>> readJdkMapDiff(StreamInput in, KeySerializer<K> keySerializer, T proto) throws IOException {
return new JdkMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto));
}
/**
* Represents differences between two Maps of (possibly diffable) objects.
*
* @param <T> the diffable object
*/
private static class JdkMapDiff<T extends Diffable<T>> extends MapDiff<T, Map<String, T>> {
private static class JdkMapDiff<K, T> extends MapDiff<K, T, Map<K, T>> {
protected JdkMapDiff(StreamInput in, KeyedReader<T> reader) throws IOException {
super(in, reader);
protected JdkMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
super(in, keySerializer, valueSerializer);
}
public JdkMapDiff(Map<String, T> before, Map<String, T> after) {
public JdkMapDiff(Map<K, T> before, Map<K, T> after,
KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
super(keySerializer, valueSerializer);
assert after != null && before != null;
for (String key : before.keySet()) {
for (K key : before.keySet()) {
if (!after.containsKey(key)) {
deletes.add(key);
}
}
for (Map.Entry<String, T> partIter : after.entrySet()) {
for (Map.Entry<K, T> partIter : after.entrySet()) {
T beforePart = before.get(partIter.getKey());
if (beforePart == null) {
adds.put(partIter.getKey(), partIter.getValue());
upserts.put(partIter.getKey(), partIter.getValue());
} else if (partIter.getValue().equals(beforePart) == false) {
diffs.put(partIter.getKey(), partIter.getValue().diff(beforePart));
if (valueSerializer.supportsDiffableValues()) {
diffs.put(partIter.getKey(), valueSerializer.diff(partIter.getValue(), beforePart));
} else {
upserts.put(partIter.getKey(), partIter.getValue());
}
}
}
}
@Override
public Map<String, T> apply(Map<String, T> map) {
Map<String, T> builder = new HashMap<>();
public Map<K, T> apply(Map<K, T> map) {
Map<K, T> builder = new HashMap<>();
builder.putAll(map);
for (String part : deletes) {
for (K part : deletes) {
builder.remove(part);
}
for (Map.Entry<String, Diff<T>> diff : diffs.entrySet()) {
for (Map.Entry<K, Diff<T>> diff : diffs.entrySet()) {
builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey())));
}
for (Map.Entry<String, T> additon : adds.entrySet()) {
builder.put(additon.getKey(), additon.getValue());
for (Map.Entry<K, T> upsert : upserts.entrySet()) {
builder.put(upsert.getKey(), upsert.getValue());
}
return builder;
}
}
/**
* Represents differences between two ImmutableOpenMap of diffable objects
* Represents differences between two ImmutableOpenMap of (possibly diffable) objects
*
* @param <T> the diffable object
* @param <T> the object type
*/
private static class ImmutableOpenMapDiff<T extends Diffable<T>> extends MapDiff<T, ImmutableOpenMap<String, T>> {
private static class ImmutableOpenMapDiff<K, T> extends MapDiff<K, T, ImmutableOpenMap<K, T>> {
protected ImmutableOpenMapDiff(StreamInput in, KeyedReader<T> reader) throws IOException {
super(in, reader);
protected ImmutableOpenMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
super(in, keySerializer, valueSerializer);
}
public ImmutableOpenMapDiff(ImmutableOpenMap<String, T> before, ImmutableOpenMap<String, T> after) {
public ImmutableOpenMapDiff(ImmutableOpenMap<K, T> before, ImmutableOpenMap<K, T> after,
KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
super(keySerializer, valueSerializer);
assert after != null && before != null;
for (ObjectCursor<String> key : before.keys()) {
for (ObjectCursor<K> key : before.keys()) {
if (!after.containsKey(key.value)) {
deletes.add(key.value);
}
}
for (ObjectObjectCursor<String, T> partIter : after) {
for (ObjectObjectCursor<K, T> partIter : after) {
T beforePart = before.get(partIter.key);
if (beforePart == null) {
adds.put(partIter.key, partIter.value);
upserts.put(partIter.key, partIter.value);
} else if (partIter.value.equals(beforePart) == false) {
diffs.put(partIter.key, partIter.value.diff(beforePart));
if (valueSerializer.supportsDiffableValues()) {
diffs.put(partIter.key, valueSerializer.diff(partIter.value, beforePart));
} else {
upserts.put(partIter.key, partIter.value);
}
}
}
}
@Override
public ImmutableOpenMap<String, T> apply(ImmutableOpenMap<String, T> map) {
ImmutableOpenMap.Builder<String, T> builder = ImmutableOpenMap.builder();
public ImmutableOpenMap<K, T> apply(ImmutableOpenMap<K, T> map) {
ImmutableOpenMap.Builder<K, T> builder = ImmutableOpenMap.builder();
builder.putAll(map);
for (String part : deletes) {
for (K part : deletes) {
builder.remove(part);
}
for (Map.Entry<String, Diff<T>> diff : diffs.entrySet()) {
for (Map.Entry<K, Diff<T>> diff : diffs.entrySet()) {
builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey())));
}
for (Map.Entry<String, T> additon : adds.entrySet()) {
builder.put(additon.getKey(), additon.getValue());
for (Map.Entry<K, T> upsert : upserts.entrySet()) {
builder.put(upsert.getKey(), upsert.getValue());
}
return builder.build();
}
}
/**
* Represents differences between two maps of diffable objects
* Represents differences between two ImmutableOpenIntMap of (possibly diffable) objects
*
* This class is used as base class for different map implementations
*
* @param <T> the diffable object
* @param <T> the object type
*/
private static abstract class MapDiff<T extends Diffable<T>, M> implements Diff<M> {
private static class ImmutableOpenIntMapDiff<T> extends MapDiff<Integer, T, ImmutableOpenIntMap<T>> {
protected final List<String> deletes;
protected final Map<String, Diff<T>> diffs;
protected final Map<String, T> adds;
protected MapDiff() {
deletes = new ArrayList<>();
diffs = new HashMap<>();
adds = new HashMap<>();
protected ImmutableOpenIntMapDiff(StreamInput in, KeySerializer<Integer> keySerializer, ValueSerializer<Integer, T> valueSerializer) throws IOException {
super(in, keySerializer, valueSerializer);
}
protected MapDiff(StreamInput in, KeyedReader<T> reader) throws IOException {
public ImmutableOpenIntMapDiff(ImmutableOpenIntMap<T> before, ImmutableOpenIntMap<T> after,
KeySerializer<Integer> keySerializer, ValueSerializer<Integer, T> valueSerializer) {
super(keySerializer, valueSerializer);
assert after != null && before != null;
for (IntCursor key : before.keys()) {
if (!after.containsKey(key.value)) {
deletes.add(key.value);
}
}
for (IntObjectCursor<T> partIter : after) {
T beforePart = before.get(partIter.key);
if (beforePart == null) {
upserts.put(partIter.key, partIter.value);
} else if (partIter.value.equals(beforePart) == false) {
if (valueSerializer.supportsDiffableValues()) {
diffs.put(partIter.key, valueSerializer.diff(partIter.value, beforePart));
} else {
upserts.put(partIter.key, partIter.value);
}
}
}
}
@Override
public ImmutableOpenIntMap<T> apply(ImmutableOpenIntMap<T> map) {
ImmutableOpenIntMap.Builder<T> builder = ImmutableOpenIntMap.builder();
builder.putAll(map);
for (Integer part : deletes) {
builder.remove(part);
}
for (Map.Entry<Integer, Diff<T>> diff : diffs.entrySet()) {
builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey())));
}
for (Map.Entry<Integer, T> upsert : upserts.entrySet()) {
builder.put(upsert.getKey(), upsert.getValue());
}
return builder.build();
}
}
/**
* Represents differences between two maps of objects and is used as base class for different map implementations.
*
* Implements serialization. How differences are applied is left to subclasses.
*
* @param <K> the type of map keys
* @param <T> the type of map values
* @param <M> the map implementation type
*/
public static abstract class MapDiff<K, T, M> implements Diff<M> {
protected final List<K> deletes;
protected final Map<K, Diff<T>> diffs; // incremental updates
protected final Map<K, T> upserts; // additions or full updates
protected final KeySerializer<K> keySerializer;
protected final ValueSerializer<K, T> valueSerializer;
protected MapDiff(KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
deletes = new ArrayList<>();
diffs = new HashMap<>();
adds = new HashMap<>();
upserts = new HashMap<>();
}
protected MapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
deletes = new ArrayList<>();
diffs = new HashMap<>();
upserts = new HashMap<>();
int deletesCount = in.readVInt();
for (int i = 0; i < deletesCount; i++) {
deletes.add(in.readString());
deletes.add(keySerializer.readKey(in));
}
int diffsCount = in.readVInt();
for (int i = 0; i < diffsCount; i++) {
String key = in.readString();
Diff<T> diff = reader.readDiffFrom(in, key);
K key = keySerializer.readKey(in);
Diff<T> diff = valueSerializer.readDiff(in, key);
diffs.put(key, diff);
}
int addsCount = in.readVInt();
for (int i = 0; i < addsCount; i++) {
String key = in.readString();
T part = reader.readFrom(in, key);
adds.put(key, part);
int upsertsCount = in.readVInt();
for (int i = 0; i < upsertsCount; i++) {
K key = keySerializer.readKey(in);
T newValue = valueSerializer.read(in, key);
upserts.put(key, newValue);
}
}
/**
* The keys that, when this diff is applied to a map, should be removed from the map.
*
* @return the list of keys that are deleted
*/
public List<K> getDeletes() {
return deletes;
}
/**
* Map entries that, when this diff is applied to a map, should be
* incrementally updated. The incremental update is represented using
* the {@link Diff} interface.
*
* @return the map entries that are incrementally updated
*/
public Map<K, Diff<T>> getDiffs() {
return diffs;
}
/**
* Map entries that, when this diff is applied to a map, should be
* added to the map or fully replace the previous value.
*
* @return the map entries that are additions or full updates
*/
public Map<K, T> getUpserts() {
return upserts;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(deletes.size());
for (String delete : deletes) {
out.writeString(delete);
for (K delete : deletes) {
keySerializer.writeKey(delete, out);
}
out.writeVInt(diffs.size());
for (Map.Entry<String, Diff<T>> entry : diffs.entrySet()) {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
for (Map.Entry<K, Diff<T>> entry : diffs.entrySet()) {
keySerializer.writeKey(entry.getKey(), out);
valueSerializer.writeDiff(entry.getValue(), out);
}
out.writeVInt(adds.size());
for (Map.Entry<String, T> entry : adds.entrySet()) {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
out.writeVInt(upserts.size());
for (Map.Entry<K, T> entry : upserts.entrySet()) {
keySerializer.writeKey(entry.getKey(), out);
valueSerializer.write(entry.getValue(), out);
}
}
}
/**
* Provides read and write operations to serialize keys of map
* @param <K> type of key
*/
public interface KeySerializer<K> {
void writeKey(K key, StreamOutput out) throws IOException;
K readKey(StreamInput in) throws IOException;
}
/**
* Serializes String keys of a map
*/
private static final class StringKeySerializer implements KeySerializer<String> {
private static final StringKeySerializer INSTANCE = new StringKeySerializer();
@Override
public void writeKey(String key, StreamOutput out) throws IOException {
out.writeString(key);
}
@Override
public String readKey(StreamInput in) throws IOException {
return in.readString();
}
}
/**
* Serializes Integer keys of a map as an Int
*/
private static final class IntKeySerializer implements KeySerializer<Integer> {
public static final IntKeySerializer INSTANCE = new IntKeySerializer();
@Override
public void writeKey(Integer key, StreamOutput out) throws IOException {
out.writeInt(key);
}
@Override
public Integer readKey(StreamInput in) throws IOException {
return in.readInt();
}
}
/**
* Serializes Integer keys of a map as a VInt. Requires keys to be positive.
*/
private static final class VIntKeySerializer implements KeySerializer<Integer> {
public static final IntKeySerializer INSTANCE = new IntKeySerializer();
@Override
public void writeKey(Integer key, StreamOutput out) throws IOException {
if (key < 0) {
throw new IllegalArgumentException("Map key [" + key + "] must be positive");
}
out.writeVInt(key);
}
@Override
public Integer readKey(StreamInput in) throws IOException {
return in.readVInt();
}
}
/**
* Provides read and write operations to serialize map values.
* Reading of values can be made dependent on map key.
*
* Also provides operations to distinguish whether map values are diffable.
*
* Should not be directly implemented, instead implement either
* {@link DiffableValueSerializer} or {@link NonDiffableValueSerializer}.
*
* @param <K> key type of map
* @param <V> value type of map
*/
public interface ValueSerializer<K, V> {
/**
* Writes value to stream
*/
void write(V value, StreamOutput out) throws IOException;
/**
* Reads value from stream. Reading operation can be made dependent on map key.
*/
V read(StreamInput in, K key) throws IOException;
/**
* Whether this serializer supports diffable values
*/
boolean supportsDiffableValues();
/**
* Computes diff if this serializer supports diffable values
*/
Diff<V> diff(V value, V beforePart);
/**
* Writes value as diff to stream if this serializer supports diffable values
*/
void writeDiff(Diff<V> value, StreamOutput out) throws IOException;
/**
* Reads value as diff from stream if this serializer supports diffable values.
* Reading operation can be made dependent on map key.
*/
Diff<V> readDiff(StreamInput in, K key) throws IOException;
}
/**
* Serializer for Diffable map values. Needs to implement read and readDiff methods.
*
* @param <K> type of map keys
* @param <V> type of map values
*/
public static abstract class DiffableValueSerializer<K, V extends Diffable<V>> implements ValueSerializer<K, V> {
private static final DiffableValueSerializer WRITE_ONLY_INSTANCE = new DiffableValueSerializer() {
@Override
public Object read(StreamInput in, Object key) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Diff<Object> readDiff(StreamInput in, Object key) throws IOException {
throw new UnsupportedOperationException();
}
};
private static <K, V extends Diffable<V>> DiffableValueSerializer<K, V> getWriteOnlyInstance() {
return WRITE_ONLY_INSTANCE;
}
@Override
public boolean supportsDiffableValues() {
return true;
}
@Override
public Diff<V> diff(V value, V beforePart) {
return value.diff(beforePart);
}
@Override
public void write(V value, StreamOutput out) throws IOException {
value.writeTo(out);
}
public void writeDiff(Diff<V> value, StreamOutput out) throws IOException {
value.writeTo(out);
}
}
/**
* Serializer for non-diffable map values
*
* @param <K> type of map keys
* @param <V> type of map values
*/
public static abstract class NonDiffableValueSerializer<K, V> implements ValueSerializer<K, V> {
@Override
public boolean supportsDiffableValues() {
return false;
}
@Override
public Diff<V> diff(V value, V beforePart) {
throw new UnsupportedOperationException();
}
@Override
public void writeDiff(Diff<V> value, StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Diff<V> readDiff(StreamInput in, K key) throws IOException {
throw new UnsupportedOperationException();
}
}
/**
* Implementation of the ValueSerializer that uses a prototype object for reading operations
*
* Note: this implementation is ignoring the key.
*/
public static class DiffablePrototypeValueReader<K, V extends Diffable<V>> extends DiffableValueSerializer<K, V> {
private final V proto;
public DiffablePrototypeValueReader(V proto) {
this.proto = proto;
}
@Override
public V read(StreamInput in, K key) throws IOException {
return proto.readFrom(in);
}
@Override
public Diff<V> readDiff(StreamInput in, K key) throws IOException {
return proto.readDiffFrom(in);
}
}
/**
* Implementation of ValueSerializer that serializes immutable sets
*
* @param <K> type of map key
*/
public static class StringSetValueSerializer<K> extends NonDiffableValueSerializer<K, Set<String>> {
private static final StringSetValueSerializer INSTANCE = new StringSetValueSerializer();
public static <K> StringSetValueSerializer<K> getInstance() {
return INSTANCE;
}
@Override
public void write(Set<String> value, StreamOutput out) throws IOException {
out.writeStringArray(value.toArray(new String[value.size()]));
}
@Override
public Set<String> read(StreamInput in, K key) throws IOException {
return Collections.unmodifiableSet(new HashSet<>(Arrays.asList(in.readStringArray())));
}
}
}

View File

@ -314,12 +314,12 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
public RoutingTableDiff(RoutingTable before, RoutingTable after) {
version = after.version;
indicesRouting = DiffableUtils.diff(before.indicesRouting, after.indicesRouting);
indicesRouting = DiffableUtils.diff(before.indicesRouting, after.indicesRouting, DiffableUtils.getStringKeySerializer());
}
public RoutingTableDiff(StreamInput in) throws IOException {
version = in.readLong();
indicesRouting = DiffableUtils.readImmutableOpenMapDiff(in, IndexRoutingTable.PROTO);
indicesRouting = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexRoutingTable.PROTO);
}
@Override

View File

@ -22,68 +22,383 @@ package org.elasticsearch.cluster.serialization;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
import org.elasticsearch.cluster.DiffableUtils.MapDiff;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.StreamableReader;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
public class DiffableTests extends ESTestCase {
public void testJdkMapDiff() throws IOException {
Map<String, TestDiffable> before = new HashMap<>();
before.put("foo", new TestDiffable("1"));
before.put("bar", new TestDiffable("2"));
before.put("baz", new TestDiffable("3"));
before = unmodifiableMap(before);
Map<String, TestDiffable> map = new HashMap<>();
map.putAll(before);
map.remove("bar");
map.put("baz", new TestDiffable("4"));
map.put("new", new TestDiffable("5"));
Map<String, TestDiffable> after = unmodifiableMap(new HashMap<>(map));
Diff diff = DiffableUtils.diff(before, after);
BytesStreamOutput out = new BytesStreamOutput();
diff.writeTo(out);
StreamInput in = StreamInput.wrap(out.bytes());
Map<String, TestDiffable> serialized = DiffableUtils.readJdkMapDiff(in, TestDiffable.PROTO).apply(before);
assertThat(serialized.size(), equalTo(3));
assertThat(serialized.get("foo").value(), equalTo("1"));
assertThat(serialized.get("baz").value(), equalTo("4"));
assertThat(serialized.get("new").value(), equalTo("5"));
public void testJKDMapDiff() throws IOException {
new JdkMapDriver<TestDiffable>() {
@Override
protected boolean diffableValues() {
return true;
}
@Override
protected TestDiffable createValue(Integer key, boolean before) {
return new TestDiffable(String.valueOf(before ? key : key + 1));
}
@Override
protected MapDiff diff(Map<Integer, TestDiffable> before, Map<Integer, TestDiffable> after) {
return DiffableUtils.diff(before, after, keySerializer);
}
@Override
protected MapDiff readDiff(StreamInput in) throws IOException {
return useProtoForDiffableSerialization
? DiffableUtils.readJdkMapDiff(in, keySerializer, TestDiffable.PROTO)
: DiffableUtils.readJdkMapDiff(in, keySerializer, diffableValueSerializer());
}
}.execute();
new JdkMapDriver<String>() {
@Override
protected boolean diffableValues() {
return false;
}
@Override
protected String createValue(Integer key, boolean before) {
return String.valueOf(before ? key : key + 1);
}
@Override
protected MapDiff diff(Map<Integer, String> before, Map<Integer, String> after) {
return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer());
}
@Override
protected MapDiff readDiff(StreamInput in) throws IOException {
return DiffableUtils.readJdkMapDiff(in, keySerializer, nonDiffableValueSerializer());
}
}.execute();
}
public void testImmutableOpenMapDiff() throws IOException {
ImmutableOpenMap.Builder<String, TestDiffable> builder = ImmutableOpenMap.builder();
builder.put("foo", new TestDiffable("1"));
builder.put("bar", new TestDiffable("2"));
builder.put("baz", new TestDiffable("3"));
ImmutableOpenMap<String, TestDiffable> before = builder.build();
builder = ImmutableOpenMap.builder(before);
builder.remove("bar");
builder.put("baz", new TestDiffable("4"));
builder.put("new", new TestDiffable("5"));
ImmutableOpenMap<String, TestDiffable> after = builder.build();
Diff diff = DiffableUtils.diff(before, after);
BytesStreamOutput out = new BytesStreamOutput();
diff.writeTo(out);
StreamInput in = StreamInput.wrap(out.bytes());
ImmutableOpenMap<String, TestDiffable> serialized = DiffableUtils.readImmutableOpenMapDiff(in, new KeyedReader<TestDiffable>() {
new ImmutableOpenMapDriver<TestDiffable>() {
@Override
public TestDiffable readFrom(StreamInput in, String key) throws IOException {
protected boolean diffableValues() {
return true;
}
@Override
protected TestDiffable createValue(Integer key, boolean before) {
return new TestDiffable(String.valueOf(before ? key : key + 1));
}
@Override
protected MapDiff diff(ImmutableOpenMap<Integer, TestDiffable> before, ImmutableOpenMap<Integer, TestDiffable> after) {
return DiffableUtils.diff(before, after, keySerializer);
}
@Override
protected MapDiff readDiff(StreamInput in) throws IOException {
return useProtoForDiffableSerialization
? DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, TestDiffable.PROTO)
: DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, diffableValueSerializer());
}
}.execute();
new ImmutableOpenMapDriver<String>() {
@Override
protected boolean diffableValues() {
return false;
}
@Override
protected String createValue(Integer key, boolean before) {
return String.valueOf(before ? key : key + 1);
}
@Override
protected MapDiff diff(ImmutableOpenMap<Integer, String> before, ImmutableOpenMap<Integer, String> after) {
return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer());
}
@Override
protected MapDiff readDiff(StreamInput in) throws IOException {
return DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, nonDiffableValueSerializer());
}
}.execute();
}
public void testImmutableOpenIntMapDiff() throws IOException {
new ImmutableOpenIntMapDriver<TestDiffable>() {
@Override
protected boolean diffableValues() {
return true;
}
@Override
protected TestDiffable createValue(Integer key, boolean before) {
return new TestDiffable(String.valueOf(before ? key : key + 1));
}
@Override
protected MapDiff diff(ImmutableOpenIntMap<TestDiffable> before, ImmutableOpenIntMap<TestDiffable> after) {
return DiffableUtils.diff(before, after, keySerializer);
}
@Override
protected MapDiff readDiff(StreamInput in) throws IOException {
return useProtoForDiffableSerialization
? DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, TestDiffable.PROTO)
: DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, diffableValueSerializer());
}
}.execute();
new ImmutableOpenIntMapDriver<String>() {
@Override
protected boolean diffableValues() {
return false;
}
@Override
protected String createValue(Integer key, boolean before) {
return String.valueOf(before ? key : key + 1);
}
@Override
protected MapDiff diff(ImmutableOpenIntMap<String> before, ImmutableOpenIntMap<String> after) {
return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer());
}
@Override
protected MapDiff readDiff(StreamInput in) throws IOException {
return DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, nonDiffableValueSerializer());
}
}.execute();
}
/**
* Class that abstracts over specific map implementation type and value kind (Diffable or not)
* @param <T> map type
* @param <V> value type
*/
public abstract class MapDriver<T, V> {
protected final Set<Integer> keys = randomPositiveIntSet();
protected final Set<Integer> keysToRemove = new HashSet<>(randomSubsetOf(randomInt(keys.size()), keys.toArray(new Integer[keys.size()])));
protected final Set<Integer> keysThatAreNotRemoved = Sets.difference(keys, keysToRemove);
protected final Set<Integer> keysToOverride = new HashSet<>(randomSubsetOf(randomInt(keysThatAreNotRemoved.size()),
keysThatAreNotRemoved.toArray(new Integer[keysThatAreNotRemoved.size()])));
protected final Set<Integer> keysToAdd = Sets.difference(randomPositiveIntSet(), keys); // make sure keysToAdd does not contain elements in keys
protected final Set<Integer> keysUnchanged = Sets.difference(keysThatAreNotRemoved, keysToOverride);
protected final DiffableUtils.KeySerializer<Integer> keySerializer = randomBoolean()
? DiffableUtils.getIntKeySerializer()
: DiffableUtils.getVIntKeySerializer();
protected final boolean useProtoForDiffableSerialization = randomBoolean();
private Set<Integer> randomPositiveIntSet() {
int maxSetSize = randomInt(6);
Set<Integer> result = new HashSet<>();
for (int i = 0; i < maxSetSize; i++) {
// due to duplicates, set size can be smaller than maxSetSize
result.add(randomIntBetween(0, 100));
}
return result;
}
/**
* whether we operate on {@link org.elasticsearch.cluster.Diffable} values
*/
protected abstract boolean diffableValues();
/**
* functions that determines value in "before" or "after" map based on key
*/
protected abstract V createValue(Integer key, boolean before);
/**
* creates map based on JDK-based map
*/
protected abstract T createMap(Map<Integer, V> values);
/**
* calculates diff between two maps
*/
protected abstract MapDiff<Integer, V, T> diff(T before, T after);
/**
* reads diff of maps from stream
*/
protected abstract MapDiff<Integer, V, T> readDiff(StreamInput in) throws IOException;
/**
* gets element at key "key" in map "map"
*/
protected abstract V get(T map, Integer key);
/**
* returns size of given map
*/
protected abstract int size(T map);
/**
* executes the actual test
*/
public void execute() throws IOException {
logger.debug("Keys in 'before' map: {}", keys);
logger.debug("Keys to remove: {}", keysToRemove);
logger.debug("Keys to override: {}", keysToOverride);
logger.debug("Keys to add: {}", keysToAdd);
logger.debug("--> creating 'before' map");
Map<Integer, V> before = new HashMap<>();
for (Integer key : keys) {
before.put(key, createValue(key, true));
}
T beforeMap = createMap(before);
logger.debug("--> creating 'after' map");
Map<Integer, V> after = new HashMap<>();
after.putAll(before);
for (Integer key : keysToRemove) {
after.remove(key);
}
for (Integer key : keysToOverride) {
after.put(key, createValue(key, false));
}
for (Integer key : keysToAdd) {
after.put(key, createValue(key, false));
}
T afterMap = createMap(unmodifiableMap(after));
MapDiff<Integer, V, T> diffMap = diff(beforeMap, afterMap);
// check properties of diffMap
assertThat(new HashSet(diffMap.getDeletes()), equalTo(keysToRemove));
if (diffableValues()) {
assertThat(diffMap.getDiffs().keySet(), equalTo(keysToOverride));
for (Integer key : keysToOverride) {
assertThat(diffMap.getDiffs().get(key).apply(get(beforeMap, key)), equalTo(get(afterMap, key)));
}
assertThat(diffMap.getUpserts().keySet(), equalTo(keysToAdd));
for (Integer key : keysToAdd) {
assertThat(diffMap.getUpserts().get(key), equalTo(get(afterMap, key)));
}
} else {
assertThat(diffMap.getDiffs(), equalTo(emptyMap()));
Set<Integer> keysToAddAndOverride = Sets.union(keysToAdd, keysToOverride);
assertThat(diffMap.getUpserts().keySet(), equalTo(keysToAddAndOverride));
for (Integer key : keysToAddAndOverride) {
assertThat(diffMap.getUpserts().get(key), equalTo(get(afterMap, key)));
}
}
if (randomBoolean()) {
logger.debug("--> serializing diff");
BytesStreamOutput out = new BytesStreamOutput();
diffMap.writeTo(out);
StreamInput in = StreamInput.wrap(out.bytes());
logger.debug("--> reading diff back");
diffMap = readDiff(in);
}
T appliedDiffMap = diffMap.apply(beforeMap);
// check properties of appliedDiffMap
assertThat(size(appliedDiffMap), equalTo(keys.size() - keysToRemove.size() + keysToAdd.size()));
for (Integer key : keysToRemove) {
assertThat(get(appliedDiffMap, key), nullValue());
}
for (Integer key : keysUnchanged) {
assertThat(get(appliedDiffMap, key), equalTo(get(beforeMap, key)));
}
for (Integer key : keysToOverride) {
assertThat(get(appliedDiffMap, key), not(equalTo(get(beforeMap, key))));
assertThat(get(appliedDiffMap, key), equalTo(get(afterMap, key)));
}
for (Integer key : keysToAdd) {
assertThat(get(appliedDiffMap, key), equalTo(get(afterMap, key)));
}
}
}
abstract class JdkMapDriver<V> extends MapDriver<Map<Integer, V>, V> {
@Override
protected Map<Integer, V> createMap(Map values) {
return values;
}
@Override
protected V get(Map<Integer, V> map, Integer key) {
return map.get(key);
}
@Override
protected int size(Map<Integer, V> map) {
return map.size();
}
}
abstract class ImmutableOpenMapDriver<V> extends MapDriver<ImmutableOpenMap<Integer, V>, V> {
@Override
protected ImmutableOpenMap<Integer, V> createMap(Map values) {
return ImmutableOpenMap.<Integer, V>builder().putAll(values).build();
}
@Override
protected V get(ImmutableOpenMap<Integer, V> map, Integer key) {
return map.get(key);
}
@Override
protected int size(ImmutableOpenMap<Integer, V> map) {
return map.size();
}
}
abstract class ImmutableOpenIntMapDriver<V> extends MapDriver<ImmutableOpenIntMap<V>, V> {
@Override
protected ImmutableOpenIntMap<V> createMap(Map values) {
return ImmutableOpenIntMap.<V>builder().putAll(values).build();
}
@Override
protected V get(ImmutableOpenIntMap<V> map, Integer key) {
return map.get(key);
}
@Override
protected int size(ImmutableOpenIntMap<V> map) {
return map.size();
}
}
private static <K> DiffableUtils.DiffableValueSerializer<K, TestDiffable> diffableValueSerializer() {
return new DiffableUtils.DiffableValueSerializer<K, TestDiffable>() {
@Override
public TestDiffable read(StreamInput in, K key) throws IOException {
return new TestDiffable(in.readString());
}
@Override
public Diff<TestDiffable> readDiffFrom(StreamInput in, String key) throws IOException {
public Diff<TestDiffable> readDiff(StreamInput in, K key) throws IOException {
return AbstractDiffable.readDiffFrom(new StreamableReader<TestDiffable>() {
@Override
public TestDiffable readFrom(StreamInput in) throws IOException {
@ -91,13 +406,23 @@ public class DiffableTests extends ESTestCase {
}
}, in);
}
}).apply(before);
assertThat(serialized.size(), equalTo(3));
assertThat(serialized.get("foo").value(), equalTo("1"));
assertThat(serialized.get("baz").value(), equalTo("4"));
assertThat(serialized.get("new").value(), equalTo("5"));
};
}
private static <K> DiffableUtils.NonDiffableValueSerializer<K, String> nonDiffableValueSerializer() {
return new DiffableUtils.NonDiffableValueSerializer<K, String>() {
@Override
public void write(String value, StreamOutput out) throws IOException {
out.writeString(value);
}
@Override
public String read(StreamInput in, K key) throws IOException {
return in.readString();
}
};
}
public static class TestDiffable extends AbstractDiffable<TestDiffable> {
public static final TestDiffable PROTO = new TestDiffable("");
@ -121,6 +446,22 @@ public class DiffableTests extends ESTestCase {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(value);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestDiffable that = (TestDiffable) o;
return !(value != null ? !value.equals(that.value) : that.value != null);
}
@Override
public int hashCode() {
return value != null ? value.hashCode() : 0;
}
}
}

View File

@ -488,17 +488,17 @@ public class NodeJoinControllerTests extends ESTestCase {
@Override
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards, boolean withReroute) {
return new RoutingAllocation.Result(false, clusterState.routingTable());
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
@Override
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
return new RoutingAllocation.Result(false, clusterState.routingTable());
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
@Override
protected RoutingAllocation.Result reroute(ClusterState clusterState, String reason, boolean debug) {
return new RoutingAllocation.Result(false, clusterState.routingTable());
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
}