mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Merge pull request #14964 from ywelsch/feature/persist-allocid-indexmetadata
Persist currently started allocation IDs to index metadata
This commit is contained in:
commit
0c2c7e7ef5
@ -19,9 +19,9 @@
|
||||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
@ -469,6 +469,16 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
|
||||
}
|
||||
builder.endArray();
|
||||
|
||||
builder.startObject(IndexMetaData.KEY_ACTIVE_ALLOCATIONS);
|
||||
for (IntObjectCursor<Set<String>> cursor : indexMetaData.getActiveAllocationIds()) {
|
||||
builder.startArray(String.valueOf(cursor.key));
|
||||
for (String allocationId : cursor.value) {
|
||||
builder.value(allocationId);
|
||||
}
|
||||
builder.endArray();
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
@ -584,6 +594,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
|
||||
|
||||
public Builder routingResult(RoutingAllocation.Result routingResult) {
|
||||
this.routingTable = routingResult.routingTable();
|
||||
this.metaData = routingResult.metaData();
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -759,7 +770,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
|
||||
nodes = after.nodes.diff(before.nodes);
|
||||
metaData = after.metaData.diff(before.metaData);
|
||||
blocks = after.blocks.diff(before.blocks);
|
||||
customs = DiffableUtils.diff(before.customs, after.customs);
|
||||
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer());
|
||||
}
|
||||
|
||||
public ClusterStateDiff(StreamInput in, ClusterState proto) throws IOException {
|
||||
@ -771,14 +782,15 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
|
||||
nodes = proto.nodes.readDiffFrom(in);
|
||||
metaData = proto.metaData.readDiffFrom(in);
|
||||
blocks = proto.blocks.readDiffFrom(in);
|
||||
customs = DiffableUtils.readImmutableOpenMapDiff(in, new KeyedReader<Custom>() {
|
||||
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
|
||||
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
|
||||
@Override
|
||||
public Custom readFrom(StreamInput in, String key) throws IOException {
|
||||
public Custom read(StreamInput in, String key) throws IOException {
|
||||
return lookupPrototypeSafe(key).readFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<Custom> readDiffFrom(StreamInput in, String key) throws IOException {
|
||||
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
|
||||
return lookupPrototypeSafe(key).readDiffFrom(in);
|
||||
}
|
||||
});
|
||||
|
@ -29,7 +29,7 @@ import java.io.IOException;
|
||||
public interface Diff<T> {
|
||||
|
||||
/**
|
||||
* Applies difference to the specified part and retunrs the resulted part
|
||||
* Applies difference to the specified part and returns the resulted part
|
||||
*/
|
||||
T apply(T part);
|
||||
|
||||
|
@ -19,263 +19,630 @@
|
||||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.IntCursor;
|
||||
import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public final class DiffableUtils {
|
||||
private DiffableUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map key serializer for String keys
|
||||
*/
|
||||
public static KeySerializer<String> getStringKeySerializer() {
|
||||
return StringKeySerializer.INSTANCE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map key serializer for Integer keys. Encodes as Int.
|
||||
*/
|
||||
public static KeySerializer<Integer> getIntKeySerializer() {
|
||||
return IntKeySerializer.INSTANCE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map key serializer for Integer keys. Encodes as VInt.
|
||||
*/
|
||||
public static KeySerializer<Integer> getVIntKeySerializer() {
|
||||
return VIntKeySerializer.INSTANCE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates diff between two ImmutableOpenMaps of Diffable objects
|
||||
*/
|
||||
public static <T extends Diffable<T>> Diff<ImmutableOpenMap<String, T>> diff(ImmutableOpenMap<String, T> before, ImmutableOpenMap<String, T> after) {
|
||||
public static <K, T extends Diffable<T>> MapDiff<K, T, ImmutableOpenMap<K, T>> diff(ImmutableOpenMap<K, T> before, ImmutableOpenMap<K, T> after, KeySerializer<K> keySerializer) {
|
||||
assert after != null && before != null;
|
||||
return new ImmutableOpenMapDiff<>(before, after);
|
||||
return new ImmutableOpenMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance());
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates diff between two ImmutableOpenMaps of non-diffable objects
|
||||
*/
|
||||
public static <K, T> MapDiff<K, T, ImmutableOpenMap<K, T>> diff(ImmutableOpenMap<K, T> before, ImmutableOpenMap<K, T> after, KeySerializer<K> keySerializer, NonDiffableValueSerializer<K, T> valueSerializer) {
|
||||
assert after != null && before != null;
|
||||
return new ImmutableOpenMapDiff<>(before, after, keySerializer, valueSerializer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates diff between two ImmutableOpenIntMaps of Diffable objects
|
||||
*/
|
||||
public static <T extends Diffable<T>> MapDiff<Integer, T, ImmutableOpenIntMap<T>> diff(ImmutableOpenIntMap<T> before, ImmutableOpenIntMap<T> after, KeySerializer<Integer> keySerializer) {
|
||||
assert after != null && before != null;
|
||||
return new ImmutableOpenIntMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance());
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates diff between two ImmutableOpenIntMaps of non-diffable objects
|
||||
*/
|
||||
public static <T> MapDiff<Integer, T, ImmutableOpenIntMap<T>> diff(ImmutableOpenIntMap<T> before, ImmutableOpenIntMap<T> after, KeySerializer<Integer> keySerializer, NonDiffableValueSerializer<Integer, T> valueSerializer) {
|
||||
assert after != null && before != null;
|
||||
return new ImmutableOpenIntMapDiff<>(before, after, keySerializer, valueSerializer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates diff between two Maps of Diffable objects.
|
||||
*/
|
||||
public static <T extends Diffable<T>> Diff<Map<String, T>> diff(Map<String, T> before, Map<String, T> after) {
|
||||
public static <K, T extends Diffable<T>> MapDiff<K, T, Map<K, T>> diff(Map<K, T> before, Map<K, T> after, KeySerializer<K> keySerializer) {
|
||||
assert after != null && before != null;
|
||||
return new JdkMapDiff<>(before, after);
|
||||
return new JdkMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance());
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates diff between two Maps of non-diffable objects
|
||||
*/
|
||||
public static <K, T> MapDiff<K, T, Map<K, T>> diff(Map<K, T> before, Map<K, T> after, KeySerializer<K> keySerializer, NonDiffableValueSerializer<K, T> valueSerializer) {
|
||||
assert after != null && before != null;
|
||||
return new JdkMapDiff<>(before, after, keySerializer, valueSerializer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads an object that represents difference between two ImmutableOpenMaps
|
||||
*/
|
||||
public static <T extends Diffable<T>> Diff<ImmutableOpenMap<String, T>> readImmutableOpenMapDiff(StreamInput in, KeyedReader<T> keyedReader) throws IOException {
|
||||
return new ImmutableOpenMapDiff<>(in, keyedReader);
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads an object that represents difference between two Maps.
|
||||
*/
|
||||
public static <T extends Diffable<T>> Diff<Map<String, T>> readJdkMapDiff(StreamInput in, KeyedReader<T> keyedReader) throws IOException {
|
||||
return new JdkMapDiff<>(in, keyedReader);
|
||||
public static <K, T> MapDiff<K, T, ImmutableOpenMap<K, T>> readImmutableOpenMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
|
||||
return new ImmutableOpenMapDiff<>(in, keySerializer, valueSerializer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads an object that represents difference between two ImmutableOpenMaps
|
||||
*/
|
||||
public static <T extends Diffable<T>> Diff<ImmutableOpenMap<String, T>> readImmutableOpenMapDiff(StreamInput in, T proto) throws IOException {
|
||||
return new ImmutableOpenMapDiff<>(in, new PrototypeReader<>(proto));
|
||||
public static <T> MapDiff<Integer, T, ImmutableOpenIntMap<T>> readImmutableOpenIntMapDiff(StreamInput in, KeySerializer<Integer> keySerializer, ValueSerializer<Integer, T> valueSerializer) throws IOException {
|
||||
return new ImmutableOpenIntMapDiff<>(in, keySerializer, valueSerializer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads an object that represents difference between two Maps.
|
||||
* Loads an object that represents difference between two Maps of Diffable objects
|
||||
*/
|
||||
public static <T extends Diffable<T>> Diff<Map<String, T>> readJdkMapDiff(StreamInput in, T proto) throws IOException {
|
||||
return new JdkMapDiff<>(in, new PrototypeReader<>(proto));
|
||||
public static <K, T> MapDiff<K, T, Map<K, T>> readJdkMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
|
||||
return new JdkMapDiff<>(in, keySerializer, valueSerializer);
|
||||
}
|
||||
|
||||
/**
|
||||
* A reader that can deserialize an object. The reader can select the deserialization type based on the key. It's
|
||||
* used in custom metadata deserialization.
|
||||
* Loads an object that represents difference between two ImmutableOpenMaps of Diffable objects using Diffable proto object
|
||||
*/
|
||||
public interface KeyedReader<T> {
|
||||
|
||||
/**
|
||||
* reads an object of the type T from the stream input
|
||||
*/
|
||||
T readFrom(StreamInput in, String key) throws IOException;
|
||||
|
||||
/**
|
||||
* reads an object that respresents differences between two objects with the type T from the stream input
|
||||
*/
|
||||
Diff<T> readDiffFrom(StreamInput in, String key) throws IOException;
|
||||
public static <K, T extends Diffable<T>> MapDiff<K, T, ImmutableOpenMap<K, T>> readImmutableOpenMapDiff(StreamInput in, KeySerializer<K> keySerializer, T proto) throws IOException {
|
||||
return new ImmutableOpenMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto));
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of the KeyedReader that is using a prototype object for reading operations
|
||||
*
|
||||
* Note: this implementation is ignoring the key.
|
||||
* Loads an object that represents difference between two ImmutableOpenIntMaps of Diffable objects using Diffable proto object
|
||||
*/
|
||||
public static class PrototypeReader<T extends Diffable<T>> implements KeyedReader<T> {
|
||||
private T proto;
|
||||
|
||||
public PrototypeReader(T proto) {
|
||||
this.proto = proto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T readFrom(StreamInput in, String key) throws IOException {
|
||||
return proto.readFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<T> readDiffFrom(StreamInput in, String key) throws IOException {
|
||||
return proto.readDiffFrom(in);
|
||||
}
|
||||
public static <T extends Diffable<T>> MapDiff<Integer, T, ImmutableOpenIntMap<T>> readImmutableOpenIntMapDiff(StreamInput in, KeySerializer<Integer> keySerializer, T proto) throws IOException {
|
||||
return new ImmutableOpenIntMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto));
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents differences between two Maps of Diffable objects.
|
||||
* Loads an object that represents difference between two Maps of Diffable objects using Diffable proto object
|
||||
*/
|
||||
public static <K, T extends Diffable<T>> MapDiff<K, T, Map<K, T>> readJdkMapDiff(StreamInput in, KeySerializer<K> keySerializer, T proto) throws IOException {
|
||||
return new JdkMapDiff<>(in, keySerializer, new DiffablePrototypeValueReader<>(proto));
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents differences between two Maps of (possibly diffable) objects.
|
||||
*
|
||||
* @param <T> the diffable object
|
||||
*/
|
||||
private static class JdkMapDiff<T extends Diffable<T>> extends MapDiff<T, Map<String, T>> {
|
||||
private static class JdkMapDiff<K, T> extends MapDiff<K, T, Map<K, T>> {
|
||||
|
||||
protected JdkMapDiff(StreamInput in, KeyedReader<T> reader) throws IOException {
|
||||
super(in, reader);
|
||||
protected JdkMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
|
||||
super(in, keySerializer, valueSerializer);
|
||||
}
|
||||
|
||||
public JdkMapDiff(Map<String, T> before, Map<String, T> after) {
|
||||
public JdkMapDiff(Map<K, T> before, Map<K, T> after,
|
||||
KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
|
||||
super(keySerializer, valueSerializer);
|
||||
assert after != null && before != null;
|
||||
for (String key : before.keySet()) {
|
||||
|
||||
for (K key : before.keySet()) {
|
||||
if (!after.containsKey(key)) {
|
||||
deletes.add(key);
|
||||
}
|
||||
}
|
||||
for (Map.Entry<String, T> partIter : after.entrySet()) {
|
||||
|
||||
for (Map.Entry<K, T> partIter : after.entrySet()) {
|
||||
T beforePart = before.get(partIter.getKey());
|
||||
if (beforePart == null) {
|
||||
adds.put(partIter.getKey(), partIter.getValue());
|
||||
upserts.put(partIter.getKey(), partIter.getValue());
|
||||
} else if (partIter.getValue().equals(beforePart) == false) {
|
||||
diffs.put(partIter.getKey(), partIter.getValue().diff(beforePart));
|
||||
if (valueSerializer.supportsDiffableValues()) {
|
||||
diffs.put(partIter.getKey(), valueSerializer.diff(partIter.getValue(), beforePart));
|
||||
} else {
|
||||
upserts.put(partIter.getKey(), partIter.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, T> apply(Map<String, T> map) {
|
||||
Map<String, T> builder = new HashMap<>();
|
||||
public Map<K, T> apply(Map<K, T> map) {
|
||||
Map<K, T> builder = new HashMap<>();
|
||||
builder.putAll(map);
|
||||
|
||||
for (String part : deletes) {
|
||||
for (K part : deletes) {
|
||||
builder.remove(part);
|
||||
}
|
||||
|
||||
for (Map.Entry<String, Diff<T>> diff : diffs.entrySet()) {
|
||||
for (Map.Entry<K, Diff<T>> diff : diffs.entrySet()) {
|
||||
builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey())));
|
||||
}
|
||||
|
||||
for (Map.Entry<String, T> additon : adds.entrySet()) {
|
||||
builder.put(additon.getKey(), additon.getValue());
|
||||
for (Map.Entry<K, T> upsert : upserts.entrySet()) {
|
||||
builder.put(upsert.getKey(), upsert.getValue());
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents differences between two ImmutableOpenMap of diffable objects
|
||||
* Represents differences between two ImmutableOpenMap of (possibly diffable) objects
|
||||
*
|
||||
* @param <T> the diffable object
|
||||
* @param <T> the object type
|
||||
*/
|
||||
private static class ImmutableOpenMapDiff<T extends Diffable<T>> extends MapDiff<T, ImmutableOpenMap<String, T>> {
|
||||
private static class ImmutableOpenMapDiff<K, T> extends MapDiff<K, T, ImmutableOpenMap<K, T>> {
|
||||
|
||||
protected ImmutableOpenMapDiff(StreamInput in, KeyedReader<T> reader) throws IOException {
|
||||
super(in, reader);
|
||||
protected ImmutableOpenMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
|
||||
super(in, keySerializer, valueSerializer);
|
||||
}
|
||||
|
||||
public ImmutableOpenMapDiff(ImmutableOpenMap<String, T> before, ImmutableOpenMap<String, T> after) {
|
||||
public ImmutableOpenMapDiff(ImmutableOpenMap<K, T> before, ImmutableOpenMap<K, T> after,
|
||||
KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
|
||||
super(keySerializer, valueSerializer);
|
||||
assert after != null && before != null;
|
||||
for (ObjectCursor<String> key : before.keys()) {
|
||||
|
||||
for (ObjectCursor<K> key : before.keys()) {
|
||||
if (!after.containsKey(key.value)) {
|
||||
deletes.add(key.value);
|
||||
}
|
||||
}
|
||||
for (ObjectObjectCursor<String, T> partIter : after) {
|
||||
|
||||
for (ObjectObjectCursor<K, T> partIter : after) {
|
||||
T beforePart = before.get(partIter.key);
|
||||
if (beforePart == null) {
|
||||
adds.put(partIter.key, partIter.value);
|
||||
upserts.put(partIter.key, partIter.value);
|
||||
} else if (partIter.value.equals(beforePart) == false) {
|
||||
diffs.put(partIter.key, partIter.value.diff(beforePart));
|
||||
if (valueSerializer.supportsDiffableValues()) {
|
||||
diffs.put(partIter.key, valueSerializer.diff(partIter.value, beforePart));
|
||||
} else {
|
||||
upserts.put(partIter.key, partIter.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ImmutableOpenMap<String, T> apply(ImmutableOpenMap<String, T> map) {
|
||||
ImmutableOpenMap.Builder<String, T> builder = ImmutableOpenMap.builder();
|
||||
public ImmutableOpenMap<K, T> apply(ImmutableOpenMap<K, T> map) {
|
||||
ImmutableOpenMap.Builder<K, T> builder = ImmutableOpenMap.builder();
|
||||
builder.putAll(map);
|
||||
|
||||
for (String part : deletes) {
|
||||
for (K part : deletes) {
|
||||
builder.remove(part);
|
||||
}
|
||||
|
||||
for (Map.Entry<String, Diff<T>> diff : diffs.entrySet()) {
|
||||
for (Map.Entry<K, Diff<T>> diff : diffs.entrySet()) {
|
||||
builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey())));
|
||||
}
|
||||
|
||||
for (Map.Entry<String, T> additon : adds.entrySet()) {
|
||||
builder.put(additon.getKey(), additon.getValue());
|
||||
for (Map.Entry<K, T> upsert : upserts.entrySet()) {
|
||||
builder.put(upsert.getKey(), upsert.getValue());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents differences between two maps of diffable objects
|
||||
* Represents differences between two ImmutableOpenIntMap of (possibly diffable) objects
|
||||
*
|
||||
* This class is used as base class for different map implementations
|
||||
*
|
||||
* @param <T> the diffable object
|
||||
* @param <T> the object type
|
||||
*/
|
||||
private static abstract class MapDiff<T extends Diffable<T>, M> implements Diff<M> {
|
||||
private static class ImmutableOpenIntMapDiff<T> extends MapDiff<Integer, T, ImmutableOpenIntMap<T>> {
|
||||
|
||||
protected final List<String> deletes;
|
||||
protected final Map<String, Diff<T>> diffs;
|
||||
protected final Map<String, T> adds;
|
||||
|
||||
protected MapDiff() {
|
||||
deletes = new ArrayList<>();
|
||||
diffs = new HashMap<>();
|
||||
adds = new HashMap<>();
|
||||
protected ImmutableOpenIntMapDiff(StreamInput in, KeySerializer<Integer> keySerializer, ValueSerializer<Integer, T> valueSerializer) throws IOException {
|
||||
super(in, keySerializer, valueSerializer);
|
||||
}
|
||||
|
||||
protected MapDiff(StreamInput in, KeyedReader<T> reader) throws IOException {
|
||||
public ImmutableOpenIntMapDiff(ImmutableOpenIntMap<T> before, ImmutableOpenIntMap<T> after,
|
||||
KeySerializer<Integer> keySerializer, ValueSerializer<Integer, T> valueSerializer) {
|
||||
super(keySerializer, valueSerializer);
|
||||
assert after != null && before != null;
|
||||
|
||||
for (IntCursor key : before.keys()) {
|
||||
if (!after.containsKey(key.value)) {
|
||||
deletes.add(key.value);
|
||||
}
|
||||
}
|
||||
|
||||
for (IntObjectCursor<T> partIter : after) {
|
||||
T beforePart = before.get(partIter.key);
|
||||
if (beforePart == null) {
|
||||
upserts.put(partIter.key, partIter.value);
|
||||
} else if (partIter.value.equals(beforePart) == false) {
|
||||
if (valueSerializer.supportsDiffableValues()) {
|
||||
diffs.put(partIter.key, valueSerializer.diff(partIter.value, beforePart));
|
||||
} else {
|
||||
upserts.put(partIter.key, partIter.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ImmutableOpenIntMap<T> apply(ImmutableOpenIntMap<T> map) {
|
||||
ImmutableOpenIntMap.Builder<T> builder = ImmutableOpenIntMap.builder();
|
||||
builder.putAll(map);
|
||||
|
||||
for (Integer part : deletes) {
|
||||
builder.remove(part);
|
||||
}
|
||||
|
||||
for (Map.Entry<Integer, Diff<T>> diff : diffs.entrySet()) {
|
||||
builder.put(diff.getKey(), diff.getValue().apply(builder.get(diff.getKey())));
|
||||
}
|
||||
|
||||
for (Map.Entry<Integer, T> upsert : upserts.entrySet()) {
|
||||
builder.put(upsert.getKey(), upsert.getValue());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents differences between two maps of objects and is used as base class for different map implementations.
|
||||
*
|
||||
* Implements serialization. How differences are applied is left to subclasses.
|
||||
*
|
||||
* @param <K> the type of map keys
|
||||
* @param <T> the type of map values
|
||||
* @param <M> the map implementation type
|
||||
*/
|
||||
public static abstract class MapDiff<K, T, M> implements Diff<M> {
|
||||
|
||||
protected final List<K> deletes;
|
||||
protected final Map<K, Diff<T>> diffs; // incremental updates
|
||||
protected final Map<K, T> upserts; // additions or full updates
|
||||
protected final KeySerializer<K> keySerializer;
|
||||
protected final ValueSerializer<K, T> valueSerializer;
|
||||
|
||||
protected MapDiff(KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
|
||||
this.keySerializer = keySerializer;
|
||||
this.valueSerializer = valueSerializer;
|
||||
deletes = new ArrayList<>();
|
||||
diffs = new HashMap<>();
|
||||
adds = new HashMap<>();
|
||||
upserts = new HashMap<>();
|
||||
}
|
||||
|
||||
protected MapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
|
||||
this.keySerializer = keySerializer;
|
||||
this.valueSerializer = valueSerializer;
|
||||
deletes = new ArrayList<>();
|
||||
diffs = new HashMap<>();
|
||||
upserts = new HashMap<>();
|
||||
int deletesCount = in.readVInt();
|
||||
for (int i = 0; i < deletesCount; i++) {
|
||||
deletes.add(in.readString());
|
||||
deletes.add(keySerializer.readKey(in));
|
||||
}
|
||||
|
||||
int diffsCount = in.readVInt();
|
||||
for (int i = 0; i < diffsCount; i++) {
|
||||
String key = in.readString();
|
||||
Diff<T> diff = reader.readDiffFrom(in, key);
|
||||
K key = keySerializer.readKey(in);
|
||||
Diff<T> diff = valueSerializer.readDiff(in, key);
|
||||
diffs.put(key, diff);
|
||||
}
|
||||
|
||||
int addsCount = in.readVInt();
|
||||
for (int i = 0; i < addsCount; i++) {
|
||||
String key = in.readString();
|
||||
T part = reader.readFrom(in, key);
|
||||
adds.put(key, part);
|
||||
int upsertsCount = in.readVInt();
|
||||
for (int i = 0; i < upsertsCount; i++) {
|
||||
K key = keySerializer.readKey(in);
|
||||
T newValue = valueSerializer.read(in, key);
|
||||
upserts.put(key, newValue);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The keys that, when this diff is applied to a map, should be removed from the map.
|
||||
*
|
||||
* @return the list of keys that are deleted
|
||||
*/
|
||||
public List<K> getDeletes() {
|
||||
return deletes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Map entries that, when this diff is applied to a map, should be
|
||||
* incrementally updated. The incremental update is represented using
|
||||
* the {@link Diff} interface.
|
||||
*
|
||||
* @return the map entries that are incrementally updated
|
||||
*/
|
||||
public Map<K, Diff<T>> getDiffs() {
|
||||
return diffs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Map entries that, when this diff is applied to a map, should be
|
||||
* added to the map or fully replace the previous value.
|
||||
*
|
||||
* @return the map entries that are additions or full updates
|
||||
*/
|
||||
public Map<K, T> getUpserts() {
|
||||
return upserts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(deletes.size());
|
||||
for (String delete : deletes) {
|
||||
out.writeString(delete);
|
||||
for (K delete : deletes) {
|
||||
keySerializer.writeKey(delete, out);
|
||||
}
|
||||
|
||||
out.writeVInt(diffs.size());
|
||||
for (Map.Entry<String, Diff<T>> entry : diffs.entrySet()) {
|
||||
out.writeString(entry.getKey());
|
||||
entry.getValue().writeTo(out);
|
||||
for (Map.Entry<K, Diff<T>> entry : diffs.entrySet()) {
|
||||
keySerializer.writeKey(entry.getKey(), out);
|
||||
valueSerializer.writeDiff(entry.getValue(), out);
|
||||
}
|
||||
|
||||
out.writeVInt(adds.size());
|
||||
for (Map.Entry<String, T> entry : adds.entrySet()) {
|
||||
out.writeString(entry.getKey());
|
||||
entry.getValue().writeTo(out);
|
||||
out.writeVInt(upserts.size());
|
||||
for (Map.Entry<K, T> entry : upserts.entrySet()) {
|
||||
keySerializer.writeKey(entry.getKey(), out);
|
||||
valueSerializer.write(entry.getValue(), out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides read and write operations to serialize keys of map
|
||||
* @param <K> type of key
|
||||
*/
|
||||
public interface KeySerializer<K> {
|
||||
void writeKey(K key, StreamOutput out) throws IOException;
|
||||
K readKey(StreamInput in) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes String keys of a map
|
||||
*/
|
||||
private static final class StringKeySerializer implements KeySerializer<String> {
|
||||
private static final StringKeySerializer INSTANCE = new StringKeySerializer();
|
||||
|
||||
@Override
|
||||
public void writeKey(String key, StreamOutput out) throws IOException {
|
||||
out.writeString(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readKey(StreamInput in) throws IOException {
|
||||
return in.readString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes Integer keys of a map as an Int
|
||||
*/
|
||||
private static final class IntKeySerializer implements KeySerializer<Integer> {
|
||||
public static final IntKeySerializer INSTANCE = new IntKeySerializer();
|
||||
|
||||
@Override
|
||||
public void writeKey(Integer key, StreamOutput out) throws IOException {
|
||||
out.writeInt(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer readKey(StreamInput in) throws IOException {
|
||||
return in.readInt();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes Integer keys of a map as a VInt. Requires keys to be positive.
|
||||
*/
|
||||
private static final class VIntKeySerializer implements KeySerializer<Integer> {
|
||||
public static final IntKeySerializer INSTANCE = new IntKeySerializer();
|
||||
|
||||
@Override
|
||||
public void writeKey(Integer key, StreamOutput out) throws IOException {
|
||||
if (key < 0) {
|
||||
throw new IllegalArgumentException("Map key [" + key + "] must be positive");
|
||||
}
|
||||
out.writeVInt(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer readKey(StreamInput in) throws IOException {
|
||||
return in.readVInt();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides read and write operations to serialize map values.
|
||||
* Reading of values can be made dependent on map key.
|
||||
*
|
||||
* Also provides operations to distinguish whether map values are diffable.
|
||||
*
|
||||
* Should not be directly implemented, instead implement either
|
||||
* {@link DiffableValueSerializer} or {@link NonDiffableValueSerializer}.
|
||||
*
|
||||
* @param <K> key type of map
|
||||
* @param <V> value type of map
|
||||
*/
|
||||
public interface ValueSerializer<K, V> {
|
||||
|
||||
/**
|
||||
* Writes value to stream
|
||||
*/
|
||||
void write(V value, StreamOutput out) throws IOException;
|
||||
|
||||
/**
|
||||
* Reads value from stream. Reading operation can be made dependent on map key.
|
||||
*/
|
||||
V read(StreamInput in, K key) throws IOException;
|
||||
|
||||
/**
|
||||
* Whether this serializer supports diffable values
|
||||
*/
|
||||
boolean supportsDiffableValues();
|
||||
|
||||
/**
|
||||
* Computes diff if this serializer supports diffable values
|
||||
*/
|
||||
Diff<V> diff(V value, V beforePart);
|
||||
|
||||
/**
|
||||
* Writes value as diff to stream if this serializer supports diffable values
|
||||
*/
|
||||
void writeDiff(Diff<V> value, StreamOutput out) throws IOException;
|
||||
|
||||
/**
|
||||
* Reads value as diff from stream if this serializer supports diffable values.
|
||||
* Reading operation can be made dependent on map key.
|
||||
*/
|
||||
Diff<V> readDiff(StreamInput in, K key) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializer for Diffable map values. Needs to implement read and readDiff methods.
|
||||
*
|
||||
* @param <K> type of map keys
|
||||
* @param <V> type of map values
|
||||
*/
|
||||
public static abstract class DiffableValueSerializer<K, V extends Diffable<V>> implements ValueSerializer<K, V> {
|
||||
private static final DiffableValueSerializer WRITE_ONLY_INSTANCE = new DiffableValueSerializer() {
|
||||
@Override
|
||||
public Object read(StreamInput in, Object key) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<Object> readDiff(StreamInput in, Object key) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
|
||||
private static <K, V extends Diffable<V>> DiffableValueSerializer<K, V> getWriteOnlyInstance() {
|
||||
return WRITE_ONLY_INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsDiffableValues() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<V> diff(V value, V beforePart) {
|
||||
return value.diff(beforePart);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(V value, StreamOutput out) throws IOException {
|
||||
value.writeTo(out);
|
||||
}
|
||||
|
||||
public void writeDiff(Diff<V> value, StreamOutput out) throws IOException {
|
||||
value.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializer for non-diffable map values
|
||||
*
|
||||
* @param <K> type of map keys
|
||||
* @param <V> type of map values
|
||||
*/
|
||||
public static abstract class NonDiffableValueSerializer<K, V> implements ValueSerializer<K, V> {
|
||||
@Override
|
||||
public boolean supportsDiffableValues() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<V> diff(V value, V beforePart) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeDiff(Diff<V> value, StreamOutput out) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<V> readDiff(StreamInput in, K key) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of the ValueSerializer that uses a prototype object for reading operations
|
||||
*
|
||||
* Note: this implementation is ignoring the key.
|
||||
*/
|
||||
public static class DiffablePrototypeValueReader<K, V extends Diffable<V>> extends DiffableValueSerializer<K, V> {
|
||||
private final V proto;
|
||||
|
||||
public DiffablePrototypeValueReader(V proto) {
|
||||
this.proto = proto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V read(StreamInput in, K key) throws IOException {
|
||||
return proto.readFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<V> readDiff(StreamInput in, K key) throws IOException {
|
||||
return proto.readDiffFrom(in);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of ValueSerializer that serializes immutable sets
|
||||
*
|
||||
* @param <K> type of map key
|
||||
*/
|
||||
public static class StringSetValueSerializer<K> extends NonDiffableValueSerializer<K, Set<String>> {
|
||||
private static final StringSetValueSerializer INSTANCE = new StringSetValueSerializer();
|
||||
|
||||
public static <K> StringSetValueSerializer<K> getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(Set<String> value, StreamOutput out) throws IOException {
|
||||
out.writeStringArray(value.toArray(new String[value.size()]));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> read(StreamInput in, K key) throws IOException {
|
||||
return Collections.unmodifiableSet(new HashSet<>(Arrays.asList(in.readStringArray())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.Version;
|
||||
@ -30,6 +31,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
@ -46,10 +48,13 @@ import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
|
||||
@ -168,6 +173,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
public static final String SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE = "index.shared_filesystem.recover_on_any_node";
|
||||
public static final String INDEX_UUID_NA_VALUE = "_na_";
|
||||
|
||||
public static final String KEY_ACTIVE_ALLOCATIONS = "active_allocations";
|
||||
|
||||
private final int numberOfShards;
|
||||
private final int numberOfReplicas;
|
||||
|
||||
@ -184,6 +191,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
|
||||
private final ImmutableOpenMap<String, Custom> customs;
|
||||
|
||||
private final ImmutableOpenIntMap<Set<String>> activeAllocationIds;
|
||||
|
||||
private transient final int totalNumberOfShards;
|
||||
|
||||
private final DiscoveryNodeFilters requireFilters;
|
||||
@ -194,65 +203,29 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
private final Version indexUpgradedVersion;
|
||||
private final org.apache.lucene.util.Version minimumCompatibleLuceneVersion;
|
||||
|
||||
private IndexMetaData(String index, long version, State state, Settings settings, ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases, ImmutableOpenMap<String, Custom> customs) {
|
||||
Integer maybeNumberOfShards = settings.getAsInt(SETTING_NUMBER_OF_SHARDS, null);
|
||||
if (maybeNumberOfShards == null) {
|
||||
throw new IllegalArgumentException("must specify numberOfShards for index [" + index + "]");
|
||||
}
|
||||
int numberOfShards = maybeNumberOfShards;
|
||||
if (numberOfShards <= 0) {
|
||||
throw new IllegalArgumentException("must specify positive number of shards for index [" + index + "]");
|
||||
}
|
||||
private IndexMetaData(String index, long version, State state, int numberOfShards, int numberOfReplicas, Settings settings,
|
||||
ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
|
||||
ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> activeAllocationIds,
|
||||
DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
|
||||
Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion) {
|
||||
|
||||
Integer maybeNumberOfReplicas = settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, null);
|
||||
if (maybeNumberOfReplicas == null) {
|
||||
throw new IllegalArgumentException("must specify numberOfReplicas for index [" + index + "]");
|
||||
}
|
||||
int numberOfReplicas = maybeNumberOfReplicas;
|
||||
if (numberOfReplicas < 0) {
|
||||
throw new IllegalArgumentException("must specify non-negative number of shards for index [" + index + "]");
|
||||
}
|
||||
this.index = index;
|
||||
this.version = version;
|
||||
this.state = state;
|
||||
this.settings = settings;
|
||||
this.mappings = mappings;
|
||||
this.customs = customs;
|
||||
this.numberOfShards = numberOfShards;
|
||||
this.numberOfReplicas = numberOfReplicas;
|
||||
this.totalNumberOfShards = numberOfShards * (numberOfReplicas + 1);
|
||||
this.settings = settings;
|
||||
this.mappings = mappings;
|
||||
this.customs = customs;
|
||||
this.aliases = aliases;
|
||||
|
||||
Map<String, String> requireMap = settings.getByPrefix("index.routing.allocation.require.").getAsMap();
|
||||
if (requireMap.isEmpty()) {
|
||||
requireFilters = null;
|
||||
} else {
|
||||
requireFilters = DiscoveryNodeFilters.buildFromKeyValue(AND, requireMap);
|
||||
}
|
||||
Map<String, String> includeMap = settings.getByPrefix("index.routing.allocation.include.").getAsMap();
|
||||
if (includeMap.isEmpty()) {
|
||||
includeFilters = null;
|
||||
} else {
|
||||
includeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, includeMap);
|
||||
}
|
||||
Map<String, String> excludeMap = settings.getByPrefix("index.routing.allocation.exclude.").getAsMap();
|
||||
if (excludeMap.isEmpty()) {
|
||||
excludeFilters = null;
|
||||
} else {
|
||||
excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap);
|
||||
}
|
||||
indexCreatedVersion = Version.indexCreated(settings);
|
||||
indexUpgradedVersion = settings.getAsVersion(IndexMetaData.SETTING_VERSION_UPGRADED, indexCreatedVersion);
|
||||
String stringLuceneVersion = settings.get(SETTING_VERSION_MINIMUM_COMPATIBLE);
|
||||
if (stringLuceneVersion != null) {
|
||||
try {
|
||||
this.minimumCompatibleLuceneVersion = org.apache.lucene.util.Version.parse(stringLuceneVersion);
|
||||
} catch (ParseException ex) {
|
||||
throw new IllegalStateException("Cannot parse lucene version [" + stringLuceneVersion + "] in the [" + SETTING_VERSION_MINIMUM_COMPATIBLE +"] setting", ex);
|
||||
}
|
||||
} else {
|
||||
this.minimumCompatibleLuceneVersion = null;
|
||||
}
|
||||
this.activeAllocationIds = activeAllocationIds;
|
||||
this.requireFilters = requireFilters;
|
||||
this.includeFilters = includeFilters;
|
||||
this.excludeFilters = excludeFilters;
|
||||
this.indexCreatedVersion = indexCreatedVersion;
|
||||
this.indexUpgradedVersion = indexUpgradedVersion;
|
||||
this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion;
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
@ -364,6 +337,15 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
return (T) customs.get(type);
|
||||
}
|
||||
|
||||
public ImmutableOpenIntMap<Set<String>> getActiveAllocationIds() {
|
||||
return activeAllocationIds;
|
||||
}
|
||||
|
||||
public Set<String> activeAllocationIds(int shardId) {
|
||||
assert shardId >= 0 && shardId < numberOfShards;
|
||||
return activeAllocationIds.get(shardId);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public DiscoveryNodeFilters requireFilters() {
|
||||
return requireFilters;
|
||||
@ -408,6 +390,9 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
if (!customs.equals(that.customs)) {
|
||||
return false;
|
||||
}
|
||||
if (!activeAllocationIds.equals(that.activeAllocationIds)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -418,6 +403,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
result = 31 * result + aliases.hashCode();
|
||||
result = 31 * result + settings.hashCode();
|
||||
result = 31 * result + mappings.hashCode();
|
||||
result = 31 * result + activeAllocationIds.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -450,16 +436,19 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
private final Settings settings;
|
||||
private final Diff<ImmutableOpenMap<String, MappingMetaData>> mappings;
|
||||
private final Diff<ImmutableOpenMap<String, AliasMetaData>> aliases;
|
||||
private Diff<ImmutableOpenMap<String, Custom>> customs;
|
||||
private final Diff<ImmutableOpenMap<String, Custom>> customs;
|
||||
private final Diff<ImmutableOpenIntMap<Set<String>>> activeAllocationIds;
|
||||
|
||||
public IndexMetaDataDiff(IndexMetaData before, IndexMetaData after) {
|
||||
index = after.index;
|
||||
version = after.version;
|
||||
state = after.state;
|
||||
settings = after.settings;
|
||||
mappings = DiffableUtils.diff(before.mappings, after.mappings);
|
||||
aliases = DiffableUtils.diff(before.aliases, after.aliases);
|
||||
customs = DiffableUtils.diff(before.customs, after.customs);
|
||||
mappings = DiffableUtils.diff(before.mappings, after.mappings, DiffableUtils.getStringKeySerializer());
|
||||
aliases = DiffableUtils.diff(before.aliases, after.aliases, DiffableUtils.getStringKeySerializer());
|
||||
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer());
|
||||
activeAllocationIds = DiffableUtils.diff(before.activeAllocationIds, after.activeAllocationIds,
|
||||
DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance());
|
||||
}
|
||||
|
||||
public IndexMetaDataDiff(StreamInput in) throws IOException {
|
||||
@ -467,19 +456,22 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
version = in.readLong();
|
||||
state = State.fromId(in.readByte());
|
||||
settings = Settings.readSettingsFromStream(in);
|
||||
mappings = DiffableUtils.readImmutableOpenMapDiff(in, MappingMetaData.PROTO);
|
||||
aliases = DiffableUtils.readImmutableOpenMapDiff(in, AliasMetaData.PROTO);
|
||||
customs = DiffableUtils.readImmutableOpenMapDiff(in, new DiffableUtils.KeyedReader<Custom>() {
|
||||
mappings = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), MappingMetaData.PROTO);
|
||||
aliases = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), AliasMetaData.PROTO);
|
||||
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
|
||||
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
|
||||
@Override
|
||||
public Custom readFrom(StreamInput in, String key) throws IOException {
|
||||
public Custom read(StreamInput in, String key) throws IOException {
|
||||
return lookupPrototypeSafe(key).readFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<Custom> readDiffFrom(StreamInput in, String key) throws IOException {
|
||||
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
|
||||
return lookupPrototypeSafe(key).readDiffFrom(in);
|
||||
}
|
||||
});
|
||||
activeAllocationIds = DiffableUtils.readImmutableOpenIntMapDiff(in, DiffableUtils.getVIntKeySerializer(),
|
||||
DiffableUtils.StringSetValueSerializer.getInstance());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -491,6 +483,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
mappings.writeTo(out);
|
||||
aliases.writeTo(out);
|
||||
customs.writeTo(out);
|
||||
activeAllocationIds.writeTo(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -502,6 +495,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
builder.mappings.putAll(mappings.apply(part.mappings));
|
||||
builder.aliases.putAll(aliases.apply(part.aliases));
|
||||
builder.customs.putAll(customs.apply(part.customs));
|
||||
builder.activeAllocationIds.putAll(activeAllocationIds.apply(part.activeAllocationIds));
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
@ -528,6 +522,12 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
Custom customIndexMetaData = lookupPrototypeSafe(type).readFrom(in);
|
||||
builder.putCustom(type, customIndexMetaData);
|
||||
}
|
||||
int activeAllocationIdsSize = in.readVInt();
|
||||
for (int i = 0; i < activeAllocationIdsSize; i++) {
|
||||
int key = in.readVInt();
|
||||
Set<String> allocationIds = DiffableUtils.StringSetValueSerializer.getInstance().read(in, key);
|
||||
builder.putActiveAllocationIds(key, allocationIds);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@ -550,6 +550,11 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
out.writeString(cursor.key);
|
||||
cursor.value.writeTo(out);
|
||||
}
|
||||
out.writeVInt(activeAllocationIds.size());
|
||||
for (IntObjectCursor<Set<String>> cursor : activeAllocationIds) {
|
||||
out.writeVInt(cursor.key);
|
||||
DiffableUtils.StringSetValueSerializer.getInstance().write(cursor.value, out);
|
||||
}
|
||||
}
|
||||
|
||||
public static Builder builder(String index) {
|
||||
@ -569,12 +574,14 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
private final ImmutableOpenMap.Builder<String, MappingMetaData> mappings;
|
||||
private final ImmutableOpenMap.Builder<String, AliasMetaData> aliases;
|
||||
private final ImmutableOpenMap.Builder<String, Custom> customs;
|
||||
private final ImmutableOpenIntMap.Builder<Set<String>> activeAllocationIds;
|
||||
|
||||
public Builder(String index) {
|
||||
this.index = index;
|
||||
this.mappings = ImmutableOpenMap.builder();
|
||||
this.aliases = ImmutableOpenMap.builder();
|
||||
this.customs = ImmutableOpenMap.builder();
|
||||
this.activeAllocationIds = ImmutableOpenIntMap.builder();
|
||||
}
|
||||
|
||||
public Builder(IndexMetaData indexMetaData) {
|
||||
@ -585,6 +592,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings);
|
||||
this.aliases = ImmutableOpenMap.builder(indexMetaData.aliases);
|
||||
this.customs = ImmutableOpenMap.builder(indexMetaData.customs);
|
||||
this.activeAllocationIds = ImmutableOpenIntMap.builder(indexMetaData.activeAllocationIds);
|
||||
}
|
||||
|
||||
public String index() {
|
||||
@ -693,6 +701,15 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
return this.customs.get(type);
|
||||
}
|
||||
|
||||
public Builder putActiveAllocationIds(int shardId, Set<String> allocationIds) {
|
||||
activeAllocationIds.put(shardId, new HashSet(allocationIds));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Set<String> getActiveAllocationIds(int shardId) {
|
||||
return activeAllocationIds.get(shardId);
|
||||
}
|
||||
|
||||
public long version() {
|
||||
return this.version;
|
||||
}
|
||||
@ -714,7 +731,72 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
}
|
||||
}
|
||||
|
||||
return new IndexMetaData(index, version, state, tmpSettings, mappings.build(), tmpAliases.build(), customs.build());
|
||||
Integer maybeNumberOfShards = settings.getAsInt(SETTING_NUMBER_OF_SHARDS, null);
|
||||
if (maybeNumberOfShards == null) {
|
||||
throw new IllegalArgumentException("must specify numberOfShards for index [" + index + "]");
|
||||
}
|
||||
int numberOfShards = maybeNumberOfShards;
|
||||
if (numberOfShards <= 0) {
|
||||
throw new IllegalArgumentException("must specify positive number of shards for index [" + index + "]");
|
||||
}
|
||||
|
||||
Integer maybeNumberOfReplicas = settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, null);
|
||||
if (maybeNumberOfReplicas == null) {
|
||||
throw new IllegalArgumentException("must specify numberOfReplicas for index [" + index + "]");
|
||||
}
|
||||
int numberOfReplicas = maybeNumberOfReplicas;
|
||||
if (numberOfReplicas < 0) {
|
||||
throw new IllegalArgumentException("must specify non-negative number of shards for index [" + index + "]");
|
||||
}
|
||||
|
||||
// fill missing slots in activeAllocationIds with empty set if needed and make all entries immutable
|
||||
ImmutableOpenIntMap.Builder<Set<String>> filledActiveAllocationIds = ImmutableOpenIntMap.builder();
|
||||
for (int i = 0; i < numberOfShards; i++) {
|
||||
if (activeAllocationIds.containsKey(i)) {
|
||||
filledActiveAllocationIds.put(i, Collections.unmodifiableSet(new HashSet<>(activeAllocationIds.get(i))));
|
||||
} else {
|
||||
filledActiveAllocationIds.put(i, Collections.emptySet());
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, String> requireMap = settings.getByPrefix("index.routing.allocation.require.").getAsMap();
|
||||
final DiscoveryNodeFilters requireFilters;
|
||||
if (requireMap.isEmpty()) {
|
||||
requireFilters = null;
|
||||
} else {
|
||||
requireFilters = DiscoveryNodeFilters.buildFromKeyValue(AND, requireMap);
|
||||
}
|
||||
Map<String, String> includeMap = settings.getByPrefix("index.routing.allocation.include.").getAsMap();
|
||||
final DiscoveryNodeFilters includeFilters;
|
||||
if (includeMap.isEmpty()) {
|
||||
includeFilters = null;
|
||||
} else {
|
||||
includeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, includeMap);
|
||||
}
|
||||
Map<String, String> excludeMap = settings.getByPrefix("index.routing.allocation.exclude.").getAsMap();
|
||||
final DiscoveryNodeFilters excludeFilters;
|
||||
if (excludeMap.isEmpty()) {
|
||||
excludeFilters = null;
|
||||
} else {
|
||||
excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap);
|
||||
}
|
||||
Version indexCreatedVersion = Version.indexCreated(settings);
|
||||
Version indexUpgradedVersion = settings.getAsVersion(IndexMetaData.SETTING_VERSION_UPGRADED, indexCreatedVersion);
|
||||
String stringLuceneVersion = settings.get(SETTING_VERSION_MINIMUM_COMPATIBLE);
|
||||
final org.apache.lucene.util.Version minimumCompatibleLuceneVersion;
|
||||
if (stringLuceneVersion != null) {
|
||||
try {
|
||||
minimumCompatibleLuceneVersion = org.apache.lucene.util.Version.parse(stringLuceneVersion);
|
||||
} catch (ParseException ex) {
|
||||
throw new IllegalStateException("Cannot parse lucene version [" + stringLuceneVersion + "] in the [" + SETTING_VERSION_MINIMUM_COMPATIBLE +"] setting", ex);
|
||||
}
|
||||
} else {
|
||||
minimumCompatibleLuceneVersion = null;
|
||||
}
|
||||
|
||||
return new IndexMetaData(index, version, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
|
||||
tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, includeFilters, excludeFilters,
|
||||
indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion);
|
||||
}
|
||||
|
||||
public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
@ -757,6 +839,15 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
builder.startObject(KEY_ACTIVE_ALLOCATIONS);
|
||||
for (IntObjectCursor<Set<String>> cursor : indexMetaData.activeAllocationIds) {
|
||||
builder.startArray(String.valueOf(cursor.key));
|
||||
for (String allocationId : cursor.value) {
|
||||
builder.value(allocationId);
|
||||
}
|
||||
builder.endArray();
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
builder.endObject();
|
||||
}
|
||||
@ -792,6 +883,21 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
|
||||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
|
||||
builder.putAlias(AliasMetaData.Builder.fromXContent(parser));
|
||||
}
|
||||
} else if (KEY_ACTIVE_ALLOCATIONS.equals(currentFieldName)) {
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||
String shardId = currentFieldName;
|
||||
Set<String> allocationIds = new HashSet<>();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||
if (token == XContentParser.Token.VALUE_STRING) {
|
||||
allocationIds.add(parser.text());
|
||||
}
|
||||
}
|
||||
builder.putActiveAllocationIds(Integer.valueOf(shardId), allocationIds);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// check if its a custom index metadata
|
||||
Custom proto = lookupPrototype(currentFieldName);
|
||||
|
@ -27,7 +27,6 @@ import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.Diffable;
|
||||
import org.elasticsearch.cluster.DiffableUtils;
|
||||
import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
|
||||
import org.elasticsearch.cluster.InternalClusterInfoService;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
@ -55,7 +54,6 @@ import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.indices.store.IndicesStore;
|
||||
import org.elasticsearch.indices.ttl.IndicesTTLService;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
|
||||
@ -641,9 +639,9 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, Fr
|
||||
version = after.version;
|
||||
transientSettings = after.transientSettings;
|
||||
persistentSettings = after.persistentSettings;
|
||||
indices = DiffableUtils.diff(before.indices, after.indices);
|
||||
templates = DiffableUtils.diff(before.templates, after.templates);
|
||||
customs = DiffableUtils.diff(before.customs, after.customs);
|
||||
indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer());
|
||||
templates = DiffableUtils.diff(before.templates, after.templates, DiffableUtils.getStringKeySerializer());
|
||||
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer());
|
||||
}
|
||||
|
||||
public MetaDataDiff(StreamInput in) throws IOException {
|
||||
@ -651,16 +649,17 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, Fr
|
||||
version = in.readLong();
|
||||
transientSettings = Settings.readSettingsFromStream(in);
|
||||
persistentSettings = Settings.readSettingsFromStream(in);
|
||||
indices = DiffableUtils.readImmutableOpenMapDiff(in, IndexMetaData.PROTO);
|
||||
templates = DiffableUtils.readImmutableOpenMapDiff(in, IndexTemplateMetaData.PROTO);
|
||||
customs = DiffableUtils.readImmutableOpenMapDiff(in, new KeyedReader<Custom>() {
|
||||
indices = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexMetaData.PROTO);
|
||||
templates = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexTemplateMetaData.PROTO);
|
||||
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
|
||||
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
|
||||
@Override
|
||||
public Custom readFrom(StreamInput in, String key) throws IOException {
|
||||
public Custom read(StreamInput in, String key) throws IOException {
|
||||
return lookupPrototypeSafe(key).readFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<Custom> readDiffFrom(StreamInput in, String key) throws IOException {
|
||||
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
|
||||
return lookupPrototypeSafe(key).readDiffFrom(in);
|
||||
}
|
||||
});
|
||||
|
@ -314,12 +314,12 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
|
||||
|
||||
public RoutingTableDiff(RoutingTable before, RoutingTable after) {
|
||||
version = after.version;
|
||||
indicesRouting = DiffableUtils.diff(before.indicesRouting, after.indicesRouting);
|
||||
indicesRouting = DiffableUtils.diff(before.indicesRouting, after.indicesRouting, DiffableUtils.getStringKeySerializer());
|
||||
}
|
||||
|
||||
public RoutingTableDiff(StreamInput in) throws IOException {
|
||||
version = in.readLong();
|
||||
indicesRouting = DiffableUtils.readImmutableOpenMapDiff(in, IndexRoutingTable.PROTO);
|
||||
indicesRouting = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexRoutingTable.PROTO);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -27,7 +27,14 @@ import org.elasticsearch.cluster.health.ClusterStateHealth;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.cluster.routing.AllocationId;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
@ -39,6 +46,8 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -79,24 +88,83 @@ public class AllocationService extends AbstractComponent {
|
||||
StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), startedShards, clusterInfoService.getClusterInfo());
|
||||
boolean changed = applyStartedShards(routingNodes, startedShards);
|
||||
if (!changed) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable());
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
}
|
||||
shardsAllocators.applyStartedShards(allocation);
|
||||
if (withReroute) {
|
||||
reroute(allocation);
|
||||
}
|
||||
RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
|
||||
RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable);
|
||||
final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
|
||||
|
||||
String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString());
|
||||
logClusterHealthStateChange(
|
||||
new ClusterStateHealth(clusterState),
|
||||
new ClusterStateHealth(clusterState.metaData(), routingTable),
|
||||
new ClusterStateHealth(clusterState.metaData(), result.routingTable()),
|
||||
"shards started [" + startedShardsAsString + "] ..."
|
||||
);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes) {
|
||||
return buildChangedResult(metaData, routingNodes, new RoutingExplanations());
|
||||
|
||||
}
|
||||
protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) {
|
||||
final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build();
|
||||
MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable);
|
||||
return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the current {@link MetaData} based on the newly created {@link RoutingTable}.
|
||||
*
|
||||
* @param currentMetaData {@link MetaData} object from before the routing table was changed.
|
||||
* @param newRoutingTable new {@link RoutingTable} created by the allocation change
|
||||
* @return adapted {@link MetaData}, potentially the original one if no change was needed.
|
||||
*/
|
||||
static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, RoutingTable newRoutingTable) {
|
||||
// make sure index meta data and routing tables are in sync w.r.t active allocation ids
|
||||
MetaData.Builder metaDataBuilder = null;
|
||||
for (IndexRoutingTable indexRoutingTable : newRoutingTable) {
|
||||
final IndexMetaData indexMetaData = currentMetaData.index(indexRoutingTable.getIndex());
|
||||
if (indexMetaData == null) {
|
||||
throw new IllegalStateException("no metadata found for index [" + indexRoutingTable.index() + "]");
|
||||
}
|
||||
IndexMetaData.Builder indexMetaDataBuilder = null;
|
||||
for (IndexShardRoutingTable shardRoutings : indexRoutingTable) {
|
||||
Set<String> activeAllocationIds = shardRoutings.activeShards().stream()
|
||||
.map(ShardRouting::allocationId)
|
||||
.filter(Objects::nonNull)
|
||||
.map(AllocationId::getId)
|
||||
.collect(Collectors.toSet());
|
||||
// only update active allocation ids if there is an active shard
|
||||
if (activeAllocationIds.isEmpty() == false) {
|
||||
// get currently stored allocation ids
|
||||
Set<String> storedAllocationIds = indexMetaData.activeAllocationIds(shardRoutings.shardId().id());
|
||||
if (activeAllocationIds.equals(storedAllocationIds) == false) {
|
||||
if (indexMetaDataBuilder == null) {
|
||||
indexMetaDataBuilder = IndexMetaData.builder(indexMetaData);
|
||||
}
|
||||
|
||||
indexMetaDataBuilder.putActiveAllocationIds(shardRoutings.shardId().id(), activeAllocationIds);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (indexMetaDataBuilder != null) {
|
||||
if (metaDataBuilder == null) {
|
||||
metaDataBuilder = MetaData.builder(currentMetaData);
|
||||
}
|
||||
metaDataBuilder.put(indexMetaDataBuilder);
|
||||
}
|
||||
}
|
||||
if (metaDataBuilder != null) {
|
||||
return metaDataBuilder.build();
|
||||
} else {
|
||||
return currentMetaData;
|
||||
}
|
||||
}
|
||||
|
||||
public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
|
||||
return applyFailedShards(clusterState, Collections.singletonList(new FailedRerouteAllocation.FailedShard(failedShard, null, null)));
|
||||
}
|
||||
@ -117,16 +185,15 @@ public class AllocationService extends AbstractComponent {
|
||||
System.nanoTime(), System.currentTimeMillis()));
|
||||
}
|
||||
if (!changed) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable());
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
}
|
||||
shardsAllocators.applyFailedShards(allocation);
|
||||
reroute(allocation);
|
||||
RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
|
||||
RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable);
|
||||
final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
|
||||
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
|
||||
logClusterHealthStateChange(
|
||||
new ClusterStateHealth(clusterState),
|
||||
new ClusterStateHealth(clusterState.getMetaData(), routingTable),
|
||||
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
|
||||
"shards failed [" + failedShardsAsString + "] ..."
|
||||
);
|
||||
return result;
|
||||
@ -169,11 +236,10 @@ public class AllocationService extends AbstractComponent {
|
||||
// the assumption is that commands will move / act on shards (or fail through exceptions)
|
||||
// so, there will always be shard "movements", so no need to check on reroute
|
||||
reroute(allocation);
|
||||
RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
|
||||
RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable, explanations);
|
||||
RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes, explanations);
|
||||
logClusterHealthStateChange(
|
||||
new ClusterStateHealth(clusterState),
|
||||
new ClusterStateHealth(clusterState.getMetaData(), routingTable),
|
||||
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
|
||||
"reroute commands"
|
||||
);
|
||||
return result;
|
||||
@ -200,13 +266,12 @@ public class AllocationService extends AbstractComponent {
|
||||
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime());
|
||||
allocation.debugDecision(debug);
|
||||
if (!reroute(allocation)) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable());
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
}
|
||||
RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
|
||||
RoutingAllocation.Result result = new RoutingAllocation.Result(true, routingTable);
|
||||
RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
|
||||
logClusterHealthStateChange(
|
||||
new ClusterStateHealth(clusterState),
|
||||
new ClusterStateHealth(clusterState.getMetaData(), routingTable),
|
||||
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
|
||||
reason
|
||||
);
|
||||
return result;
|
||||
|
@ -52,29 +52,33 @@ public class RoutingAllocation {
|
||||
|
||||
private final RoutingTable routingTable;
|
||||
|
||||
private final MetaData metaData;
|
||||
|
||||
private RoutingExplanations explanations = new RoutingExplanations();
|
||||
|
||||
/**
|
||||
* Creates a new {@link RoutingAllocation.Result}
|
||||
*
|
||||
* @param changed a flag to determine whether the actual {@link RoutingTable} has been changed
|
||||
* @param routingTable the {@link RoutingTable} this Result references
|
||||
* @param metaData the {@link MetaData} this Result references
|
||||
*/
|
||||
public Result(boolean changed, RoutingTable routingTable) {
|
||||
public Result(boolean changed, RoutingTable routingTable, MetaData metaData) {
|
||||
this.changed = changed;
|
||||
this.routingTable = routingTable;
|
||||
this.metaData = metaData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link RoutingAllocation.Result}
|
||||
*
|
||||
* @param changed a flag to determine whether the actual {@link RoutingTable} has been changed
|
||||
* @param routingTable the {@link RoutingTable} this Result references
|
||||
* @param metaData the {@link MetaData} this Result references
|
||||
* @param explanations Explanation for the reroute actions
|
||||
*/
|
||||
public Result(boolean changed, RoutingTable routingTable, RoutingExplanations explanations) {
|
||||
public Result(boolean changed, RoutingTable routingTable, MetaData metaData, RoutingExplanations explanations) {
|
||||
this.changed = changed;
|
||||
this.routingTable = routingTable;
|
||||
this.metaData = metaData;
|
||||
this.explanations = explanations;
|
||||
}
|
||||
|
||||
@ -85,6 +89,14 @@ public class RoutingAllocation {
|
||||
return this.changed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the {@link MetaData} referenced by this result
|
||||
* @return referenced {@link MetaData}
|
||||
*/
|
||||
public MetaData metaData() {
|
||||
return metaData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the {@link RoutingTable} referenced by this result
|
||||
* @return referenced {@link RoutingTable}
|
||||
|
@ -0,0 +1,81 @@
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.test.ESAllocationTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class ActiveAllocationIdTests extends ESAllocationTestCase {
|
||||
|
||||
public void testActiveAllocationIdsUpdated() {
|
||||
AllocationService allocation = createAllocationService();
|
||||
|
||||
logger.info("creating an index with 1 shard, 2 replicas");
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
|
||||
// add index metadata where we have no routing nodes to check that allocation ids are not removed
|
||||
.put(IndexMetaData.builder("test-old").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)
|
||||
.putActiveAllocationIds(0, new HashSet<>(Arrays.asList("x", "y"))))
|
||||
.build();
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsNew(metaData.index("test"))
|
||||
.build();
|
||||
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
|
||||
|
||||
logger.info("adding three nodes and performing rerouting");
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(
|
||||
newNode("node1")).put(newNode("node2")).put(newNode("node3"))).build();
|
||||
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState, "reroute");
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
|
||||
|
||||
assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(0));
|
||||
assertThat(clusterState.metaData().index("test-old").activeAllocationIds(0), equalTo(new HashSet<>(Arrays.asList("x", "y"))));
|
||||
|
||||
logger.info("start primary shard");
|
||||
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
|
||||
|
||||
assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).size(), equalTo(1));
|
||||
assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(1));
|
||||
assertThat(clusterState.getRoutingTable().shardsWithState(STARTED).get(0).allocationId().getId(),
|
||||
equalTo(clusterState.metaData().index("test").activeAllocationIds(0).iterator().next()));
|
||||
assertThat(clusterState.metaData().index("test-old").activeAllocationIds(0), equalTo(new HashSet<>(Arrays.asList("x", "y"))));
|
||||
|
||||
logger.info("start replica shards");
|
||||
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
|
||||
|
||||
assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(3));
|
||||
|
||||
logger.info("remove a node");
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
|
||||
.remove("node1"))
|
||||
.build();
|
||||
rerouteResult = allocation.reroute(clusterState, "reroute");
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
|
||||
|
||||
assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(2));
|
||||
|
||||
logger.info("remove all remaining nodes");
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
|
||||
.remove("node2").remove("node3"))
|
||||
.build();
|
||||
rerouteResult = allocation.reroute(clusterState, "reroute");
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
|
||||
|
||||
// active allocation ids should not be updated
|
||||
assertThat(clusterState.getRoutingTable().shardsWithState(UNASSIGNED).size(), equalTo(3));
|
||||
assertThat(clusterState.metaData().index("test").activeAllocationIds(0).size(), equalTo(2));
|
||||
}
|
||||
}
|
@ -22,68 +22,383 @@ package org.elasticsearch.cluster.serialization;
|
||||
import org.elasticsearch.cluster.AbstractDiffable;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.DiffableUtils;
|
||||
import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
|
||||
import org.elasticsearch.cluster.DiffableUtils.MapDiff;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamableReader;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
|
||||
public class DiffableTests extends ESTestCase {
|
||||
public void testJdkMapDiff() throws IOException {
|
||||
Map<String, TestDiffable> before = new HashMap<>();
|
||||
before.put("foo", new TestDiffable("1"));
|
||||
before.put("bar", new TestDiffable("2"));
|
||||
before.put("baz", new TestDiffable("3"));
|
||||
before = unmodifiableMap(before);
|
||||
Map<String, TestDiffable> map = new HashMap<>();
|
||||
map.putAll(before);
|
||||
map.remove("bar");
|
||||
map.put("baz", new TestDiffable("4"));
|
||||
map.put("new", new TestDiffable("5"));
|
||||
Map<String, TestDiffable> after = unmodifiableMap(new HashMap<>(map));
|
||||
Diff diff = DiffableUtils.diff(before, after);
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
diff.writeTo(out);
|
||||
StreamInput in = StreamInput.wrap(out.bytes());
|
||||
Map<String, TestDiffable> serialized = DiffableUtils.readJdkMapDiff(in, TestDiffable.PROTO).apply(before);
|
||||
assertThat(serialized.size(), equalTo(3));
|
||||
assertThat(serialized.get("foo").value(), equalTo("1"));
|
||||
assertThat(serialized.get("baz").value(), equalTo("4"));
|
||||
assertThat(serialized.get("new").value(), equalTo("5"));
|
||||
|
||||
public void testJKDMapDiff() throws IOException {
|
||||
new JdkMapDriver<TestDiffable>() {
|
||||
@Override
|
||||
protected boolean diffableValues() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TestDiffable createValue(Integer key, boolean before) {
|
||||
return new TestDiffable(String.valueOf(before ? key : key + 1));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff diff(Map<Integer, TestDiffable> before, Map<Integer, TestDiffable> after) {
|
||||
return DiffableUtils.diff(before, after, keySerializer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff readDiff(StreamInput in) throws IOException {
|
||||
return useProtoForDiffableSerialization
|
||||
? DiffableUtils.readJdkMapDiff(in, keySerializer, TestDiffable.PROTO)
|
||||
: DiffableUtils.readJdkMapDiff(in, keySerializer, diffableValueSerializer());
|
||||
}
|
||||
}.execute();
|
||||
|
||||
new JdkMapDriver<String>() {
|
||||
@Override
|
||||
protected boolean diffableValues() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String createValue(Integer key, boolean before) {
|
||||
return String.valueOf(before ? key : key + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff diff(Map<Integer, String> before, Map<Integer, String> after) {
|
||||
return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff readDiff(StreamInput in) throws IOException {
|
||||
return DiffableUtils.readJdkMapDiff(in, keySerializer, nonDiffableValueSerializer());
|
||||
}
|
||||
}.execute();
|
||||
}
|
||||
|
||||
public void testImmutableOpenMapDiff() throws IOException {
|
||||
ImmutableOpenMap.Builder<String, TestDiffable> builder = ImmutableOpenMap.builder();
|
||||
builder.put("foo", new TestDiffable("1"));
|
||||
builder.put("bar", new TestDiffable("2"));
|
||||
builder.put("baz", new TestDiffable("3"));
|
||||
ImmutableOpenMap<String, TestDiffable> before = builder.build();
|
||||
builder = ImmutableOpenMap.builder(before);
|
||||
builder.remove("bar");
|
||||
builder.put("baz", new TestDiffable("4"));
|
||||
builder.put("new", new TestDiffable("5"));
|
||||
ImmutableOpenMap<String, TestDiffable> after = builder.build();
|
||||
Diff diff = DiffableUtils.diff(before, after);
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
diff.writeTo(out);
|
||||
StreamInput in = StreamInput.wrap(out.bytes());
|
||||
ImmutableOpenMap<String, TestDiffable> serialized = DiffableUtils.readImmutableOpenMapDiff(in, new KeyedReader<TestDiffable>() {
|
||||
new ImmutableOpenMapDriver<TestDiffable>() {
|
||||
@Override
|
||||
public TestDiffable readFrom(StreamInput in, String key) throws IOException {
|
||||
protected boolean diffableValues() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TestDiffable createValue(Integer key, boolean before) {
|
||||
return new TestDiffable(String.valueOf(before ? key : key + 1));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff diff(ImmutableOpenMap<Integer, TestDiffable> before, ImmutableOpenMap<Integer, TestDiffable> after) {
|
||||
return DiffableUtils.diff(before, after, keySerializer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff readDiff(StreamInput in) throws IOException {
|
||||
return useProtoForDiffableSerialization
|
||||
? DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, TestDiffable.PROTO)
|
||||
: DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, diffableValueSerializer());
|
||||
}
|
||||
}.execute();
|
||||
|
||||
new ImmutableOpenMapDriver<String>() {
|
||||
@Override
|
||||
protected boolean diffableValues() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String createValue(Integer key, boolean before) {
|
||||
return String.valueOf(before ? key : key + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff diff(ImmutableOpenMap<Integer, String> before, ImmutableOpenMap<Integer, String> after) {
|
||||
return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff readDiff(StreamInput in) throws IOException {
|
||||
return DiffableUtils.readImmutableOpenMapDiff(in, keySerializer, nonDiffableValueSerializer());
|
||||
}
|
||||
}.execute();
|
||||
}
|
||||
|
||||
public void testImmutableOpenIntMapDiff() throws IOException {
|
||||
new ImmutableOpenIntMapDriver<TestDiffable>() {
|
||||
@Override
|
||||
protected boolean diffableValues() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TestDiffable createValue(Integer key, boolean before) {
|
||||
return new TestDiffable(String.valueOf(before ? key : key + 1));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff diff(ImmutableOpenIntMap<TestDiffable> before, ImmutableOpenIntMap<TestDiffable> after) {
|
||||
return DiffableUtils.diff(before, after, keySerializer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff readDiff(StreamInput in) throws IOException {
|
||||
return useProtoForDiffableSerialization
|
||||
? DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, TestDiffable.PROTO)
|
||||
: DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, diffableValueSerializer());
|
||||
}
|
||||
}.execute();
|
||||
|
||||
new ImmutableOpenIntMapDriver<String>() {
|
||||
@Override
|
||||
protected boolean diffableValues() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String createValue(Integer key, boolean before) {
|
||||
return String.valueOf(before ? key : key + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff diff(ImmutableOpenIntMap<String> before, ImmutableOpenIntMap<String> after) {
|
||||
return DiffableUtils.diff(before, after, keySerializer, nonDiffableValueSerializer());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MapDiff readDiff(StreamInput in) throws IOException {
|
||||
return DiffableUtils.readImmutableOpenIntMapDiff(in, keySerializer, nonDiffableValueSerializer());
|
||||
}
|
||||
}.execute();
|
||||
}
|
||||
|
||||
/**
|
||||
* Class that abstracts over specific map implementation type and value kind (Diffable or not)
|
||||
* @param <T> map type
|
||||
* @param <V> value type
|
||||
*/
|
||||
public abstract class MapDriver<T, V> {
|
||||
protected final Set<Integer> keys = randomPositiveIntSet();
|
||||
protected final Set<Integer> keysToRemove = new HashSet<>(randomSubsetOf(randomInt(keys.size()), keys.toArray(new Integer[keys.size()])));
|
||||
protected final Set<Integer> keysThatAreNotRemoved = Sets.difference(keys, keysToRemove);
|
||||
protected final Set<Integer> keysToOverride = new HashSet<>(randomSubsetOf(randomInt(keysThatAreNotRemoved.size()),
|
||||
keysThatAreNotRemoved.toArray(new Integer[keysThatAreNotRemoved.size()])));
|
||||
protected final Set<Integer> keysToAdd = Sets.difference(randomPositiveIntSet(), keys); // make sure keysToAdd does not contain elements in keys
|
||||
protected final Set<Integer> keysUnchanged = Sets.difference(keysThatAreNotRemoved, keysToOverride);
|
||||
|
||||
protected final DiffableUtils.KeySerializer<Integer> keySerializer = randomBoolean()
|
||||
? DiffableUtils.getIntKeySerializer()
|
||||
: DiffableUtils.getVIntKeySerializer();
|
||||
|
||||
protected final boolean useProtoForDiffableSerialization = randomBoolean();
|
||||
|
||||
private Set<Integer> randomPositiveIntSet() {
|
||||
int maxSetSize = randomInt(6);
|
||||
Set<Integer> result = new HashSet<>();
|
||||
for (int i = 0; i < maxSetSize; i++) {
|
||||
// due to duplicates, set size can be smaller than maxSetSize
|
||||
result.add(randomIntBetween(0, 100));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* whether we operate on {@link org.elasticsearch.cluster.Diffable} values
|
||||
*/
|
||||
protected abstract boolean diffableValues();
|
||||
|
||||
/**
|
||||
* functions that determines value in "before" or "after" map based on key
|
||||
*/
|
||||
protected abstract V createValue(Integer key, boolean before);
|
||||
|
||||
/**
|
||||
* creates map based on JDK-based map
|
||||
*/
|
||||
protected abstract T createMap(Map<Integer, V> values);
|
||||
|
||||
/**
|
||||
* calculates diff between two maps
|
||||
*/
|
||||
protected abstract MapDiff<Integer, V, T> diff(T before, T after);
|
||||
|
||||
/**
|
||||
* reads diff of maps from stream
|
||||
*/
|
||||
protected abstract MapDiff<Integer, V, T> readDiff(StreamInput in) throws IOException;
|
||||
|
||||
/**
|
||||
* gets element at key "key" in map "map"
|
||||
*/
|
||||
protected abstract V get(T map, Integer key);
|
||||
|
||||
/**
|
||||
* returns size of given map
|
||||
*/
|
||||
protected abstract int size(T map);
|
||||
|
||||
/**
|
||||
* executes the actual test
|
||||
*/
|
||||
public void execute() throws IOException {
|
||||
logger.debug("Keys in 'before' map: {}", keys);
|
||||
logger.debug("Keys to remove: {}", keysToRemove);
|
||||
logger.debug("Keys to override: {}", keysToOverride);
|
||||
logger.debug("Keys to add: {}", keysToAdd);
|
||||
|
||||
logger.debug("--> creating 'before' map");
|
||||
Map<Integer, V> before = new HashMap<>();
|
||||
for (Integer key : keys) {
|
||||
before.put(key, createValue(key, true));
|
||||
}
|
||||
T beforeMap = createMap(before);
|
||||
|
||||
logger.debug("--> creating 'after' map");
|
||||
Map<Integer, V> after = new HashMap<>();
|
||||
after.putAll(before);
|
||||
for (Integer key : keysToRemove) {
|
||||
after.remove(key);
|
||||
}
|
||||
for (Integer key : keysToOverride) {
|
||||
after.put(key, createValue(key, false));
|
||||
}
|
||||
for (Integer key : keysToAdd) {
|
||||
after.put(key, createValue(key, false));
|
||||
}
|
||||
T afterMap = createMap(unmodifiableMap(after));
|
||||
|
||||
MapDiff<Integer, V, T> diffMap = diff(beforeMap, afterMap);
|
||||
|
||||
// check properties of diffMap
|
||||
assertThat(new HashSet(diffMap.getDeletes()), equalTo(keysToRemove));
|
||||
if (diffableValues()) {
|
||||
assertThat(diffMap.getDiffs().keySet(), equalTo(keysToOverride));
|
||||
for (Integer key : keysToOverride) {
|
||||
assertThat(diffMap.getDiffs().get(key).apply(get(beforeMap, key)), equalTo(get(afterMap, key)));
|
||||
}
|
||||
assertThat(diffMap.getUpserts().keySet(), equalTo(keysToAdd));
|
||||
for (Integer key : keysToAdd) {
|
||||
assertThat(diffMap.getUpserts().get(key), equalTo(get(afterMap, key)));
|
||||
}
|
||||
} else {
|
||||
assertThat(diffMap.getDiffs(), equalTo(emptyMap()));
|
||||
Set<Integer> keysToAddAndOverride = Sets.union(keysToAdd, keysToOverride);
|
||||
assertThat(diffMap.getUpserts().keySet(), equalTo(keysToAddAndOverride));
|
||||
for (Integer key : keysToAddAndOverride) {
|
||||
assertThat(diffMap.getUpserts().get(key), equalTo(get(afterMap, key)));
|
||||
}
|
||||
}
|
||||
|
||||
if (randomBoolean()) {
|
||||
logger.debug("--> serializing diff");
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
diffMap.writeTo(out);
|
||||
StreamInput in = StreamInput.wrap(out.bytes());
|
||||
logger.debug("--> reading diff back");
|
||||
diffMap = readDiff(in);
|
||||
}
|
||||
T appliedDiffMap = diffMap.apply(beforeMap);
|
||||
|
||||
// check properties of appliedDiffMap
|
||||
assertThat(size(appliedDiffMap), equalTo(keys.size() - keysToRemove.size() + keysToAdd.size()));
|
||||
for (Integer key : keysToRemove) {
|
||||
assertThat(get(appliedDiffMap, key), nullValue());
|
||||
}
|
||||
for (Integer key : keysUnchanged) {
|
||||
assertThat(get(appliedDiffMap, key), equalTo(get(beforeMap, key)));
|
||||
}
|
||||
for (Integer key : keysToOverride) {
|
||||
assertThat(get(appliedDiffMap, key), not(equalTo(get(beforeMap, key))));
|
||||
assertThat(get(appliedDiffMap, key), equalTo(get(afterMap, key)));
|
||||
}
|
||||
for (Integer key : keysToAdd) {
|
||||
assertThat(get(appliedDiffMap, key), equalTo(get(afterMap, key)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
abstract class JdkMapDriver<V> extends MapDriver<Map<Integer, V>, V> {
|
||||
|
||||
@Override
|
||||
protected Map<Integer, V> createMap(Map values) {
|
||||
return values;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected V get(Map<Integer, V> map, Integer key) {
|
||||
return map.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int size(Map<Integer, V> map) {
|
||||
return map.size();
|
||||
}
|
||||
}
|
||||
|
||||
abstract class ImmutableOpenMapDriver<V> extends MapDriver<ImmutableOpenMap<Integer, V>, V> {
|
||||
|
||||
@Override
|
||||
protected ImmutableOpenMap<Integer, V> createMap(Map values) {
|
||||
return ImmutableOpenMap.<Integer, V>builder().putAll(values).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected V get(ImmutableOpenMap<Integer, V> map, Integer key) {
|
||||
return map.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int size(ImmutableOpenMap<Integer, V> map) {
|
||||
return map.size();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
abstract class ImmutableOpenIntMapDriver<V> extends MapDriver<ImmutableOpenIntMap<V>, V> {
|
||||
|
||||
@Override
|
||||
protected ImmutableOpenIntMap<V> createMap(Map values) {
|
||||
return ImmutableOpenIntMap.<V>builder().putAll(values).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected V get(ImmutableOpenIntMap<V> map, Integer key) {
|
||||
return map.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int size(ImmutableOpenIntMap<V> map) {
|
||||
return map.size();
|
||||
}
|
||||
}
|
||||
|
||||
private static <K> DiffableUtils.DiffableValueSerializer<K, TestDiffable> diffableValueSerializer() {
|
||||
return new DiffableUtils.DiffableValueSerializer<K, TestDiffable>() {
|
||||
@Override
|
||||
public TestDiffable read(StreamInput in, K key) throws IOException {
|
||||
return new TestDiffable(in.readString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<TestDiffable> readDiffFrom(StreamInput in, String key) throws IOException {
|
||||
public Diff<TestDiffable> readDiff(StreamInput in, K key) throws IOException {
|
||||
return AbstractDiffable.readDiffFrom(new StreamableReader<TestDiffable>() {
|
||||
@Override
|
||||
public TestDiffable readFrom(StreamInput in) throws IOException {
|
||||
@ -91,13 +406,23 @@ public class DiffableTests extends ESTestCase {
|
||||
}
|
||||
}, in);
|
||||
}
|
||||
}).apply(before);
|
||||
assertThat(serialized.size(), equalTo(3));
|
||||
assertThat(serialized.get("foo").value(), equalTo("1"));
|
||||
assertThat(serialized.get("baz").value(), equalTo("4"));
|
||||
assertThat(serialized.get("new").value(), equalTo("5"));
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
private static <K> DiffableUtils.NonDiffableValueSerializer<K, String> nonDiffableValueSerializer() {
|
||||
return new DiffableUtils.NonDiffableValueSerializer<K, String>() {
|
||||
@Override
|
||||
public void write(String value, StreamOutput out) throws IOException {
|
||||
out.writeString(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String read(StreamInput in, K key) throws IOException {
|
||||
return in.readString();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static class TestDiffable extends AbstractDiffable<TestDiffable> {
|
||||
|
||||
public static final TestDiffable PROTO = new TestDiffable("");
|
||||
@ -121,6 +446,22 @@ public class DiffableTests extends ESTestCase {
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
TestDiffable that = (TestDiffable) o;
|
||||
|
||||
return !(value != null ? !value.equals(that.value) : that.value != null);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return value != null ? value.hashCode() : 0;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -488,17 +488,17 @@ public class NodeJoinControllerTests extends ESTestCase {
|
||||
|
||||
@Override
|
||||
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards, boolean withReroute) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable());
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
}
|
||||
|
||||
@Override
|
||||
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable());
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RoutingAllocation.Result reroute(ClusterState clusterState, String reason, boolean debug) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable());
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user