Add a generic way of checking version before serializing custom cluster object
In #22313 we added a check that prevents the SnapshotDeletionsInProgress custom cluster state objects from being sent to older elasticsearch nodes. This commits make this check generic and available to other cluster state custom objects if needed.
This commit is contained in:
parent
74acffaae9
commit
f985638bba
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -30,43 +31,52 @@ import java.io.IOException;
|
|||
* Abstract diffable object with simple diffs implementation that sends the entire object if object has changed or
|
||||
* nothing is object remained the same. Comparing to AbstractDiffable, this class also works with NamedWriteables
|
||||
*/
|
||||
public abstract class AbstractNamedDiffable<T extends Diffable<T> & NamedWriteable> implements Diffable<T>, NamedWriteable {
|
||||
public abstract class AbstractNamedDiffable<T extends NamedDiffable<T>> implements Diffable<T>, NamedWriteable {
|
||||
|
||||
@Override
|
||||
public Diff<T> diff(T previousState) {
|
||||
if (this.get().equals(previousState)) {
|
||||
return new CompleteNamedDiff<>(previousState.getWriteableName());
|
||||
return new CompleteNamedDiff<>(previousState.getWriteableName(), previousState.getMinimalSupportedVersion());
|
||||
} else {
|
||||
return new CompleteNamedDiff<>(get());
|
||||
}
|
||||
}
|
||||
|
||||
public static <T extends Diffable<T> & NamedWriteable> NamedDiff<T> readDiffFrom(Class<? extends T> tClass, String name, StreamInput in)
|
||||
public static <T extends NamedDiffable<T>> NamedDiff<T> readDiffFrom(Class<? extends T> tClass, String name, StreamInput in)
|
||||
throws IOException {
|
||||
return new CompleteNamedDiff<>(tClass, name, in);
|
||||
}
|
||||
|
||||
private static class CompleteNamedDiff<T extends Diffable<T> & NamedWriteable> implements NamedDiff<T> {
|
||||
private static class CompleteNamedDiff<T extends NamedDiffable<T>> implements NamedDiff<T> {
|
||||
|
||||
@Nullable
|
||||
private final T part;
|
||||
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* A non-null value is only required for write operation, if the diff was just read from the stream the version
|
||||
* is unnecessary.
|
||||
*/
|
||||
@Nullable
|
||||
private final Version minimalSupportedVersion;
|
||||
|
||||
/**
|
||||
* Creates simple diff with changes
|
||||
*/
|
||||
public CompleteNamedDiff(T part) {
|
||||
this.part = part;
|
||||
this.name = part.getWriteableName();
|
||||
this.minimalSupportedVersion = part.getMinimalSupportedVersion();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates simple diff without changes
|
||||
*/
|
||||
public CompleteNamedDiff(String name) {
|
||||
public CompleteNamedDiff(String name, Version minimalSupportedVersion) {
|
||||
this.part = null;
|
||||
this.name = name;
|
||||
this.minimalSupportedVersion = minimalSupportedVersion;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -75,14 +85,17 @@ public abstract class AbstractNamedDiffable<T extends Diffable<T> & NamedWriteab
|
|||
public CompleteNamedDiff(Class<? extends T> tClass, String name, StreamInput in) throws IOException {
|
||||
if (in.readBoolean()) {
|
||||
this.part = in.readNamedWriteable(tClass, name);
|
||||
this.minimalSupportedVersion = part.getMinimalSupportedVersion();
|
||||
} else {
|
||||
this.part = null;
|
||||
this.minimalSupportedVersion = null; // We just read this diff, so it's not going to be written
|
||||
}
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
assert minimalSupportedVersion != null : "shouldn't be called on diff that was de-serialized from the stream";
|
||||
if (part != null) {
|
||||
out.writeBoolean(true);
|
||||
part.writeTo(out);
|
||||
|
@ -104,6 +117,12 @@ public abstract class AbstractNamedDiffable<T extends Diffable<T> & NamedWriteab
|
|||
public String getWriteableName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
assert minimalSupportedVersion != null : "shouldn't be called on the diff that was de-serialized from the stream";
|
||||
return minimalSupportedVersion;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.cluster;
|
|||
import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -37,16 +38,13 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
|
|||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -92,10 +90,11 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
|
|||
|
||||
public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
|
||||
|
||||
public interface Custom extends Diffable<Custom>, ToXContent, NamedWriteable {
|
||||
|
||||
public interface Custom extends NamedDiffable<Custom>, ToXContent {
|
||||
}
|
||||
|
||||
private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
|
||||
|
||||
public static final String UNKNOWN_UUID = "_na_";
|
||||
|
||||
public static final long UNKNOWN_VERSION = -1;
|
||||
|
@ -679,19 +678,18 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
|
|||
routingTable.writeTo(out);
|
||||
nodes.writeTo(out);
|
||||
blocks.writeTo(out);
|
||||
boolean omitSnapshotDeletions = false;
|
||||
if (out.getVersion().before(SnapshotDeletionsInProgress.VERSION_INTRODUCED)
|
||||
&& customs.containsKey(SnapshotDeletionsInProgress.TYPE)) {
|
||||
// before the stated version, there were no SnapshotDeletionsInProgress, so
|
||||
// don't transfer over the wire protocol
|
||||
omitSnapshotDeletions = true;
|
||||
}
|
||||
out.writeVInt(omitSnapshotDeletions ? customs.size() - 1 : customs.size());
|
||||
for (ObjectObjectCursor<String, Custom> cursor : customs) {
|
||||
if (omitSnapshotDeletions && cursor.key.equals(SnapshotDeletionsInProgress.TYPE)) {
|
||||
continue;
|
||||
// filter out custom states not supported by the other node
|
||||
int numberOfCustoms = 0;
|
||||
for (ObjectCursor<Custom> cursor : customs.values()) {
|
||||
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
|
||||
numberOfCustoms++;
|
||||
}
|
||||
}
|
||||
out.writeVInt(numberOfCustoms);
|
||||
for (ObjectCursor<Custom> cursor : customs.values()) {
|
||||
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
|
||||
out.writeNamedWriteable(cursor.value);
|
||||
}
|
||||
out.writeNamedWriteable(cursor.value);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -724,7 +722,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
|
|||
nodes = after.nodes.diff(before.nodes);
|
||||
metaData = after.metaData.diff(before.metaData);
|
||||
blocks = after.blocks.diff(before.blocks);
|
||||
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer());
|
||||
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
|
||||
}
|
||||
|
||||
public ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException {
|
||||
|
@ -736,19 +734,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
|
|||
nodes = DiscoveryNodes.readDiffFrom(in, localNode);
|
||||
metaData = MetaData.readDiffFrom(in);
|
||||
blocks = ClusterBlocks.readDiffFrom(in);
|
||||
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
|
||||
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
|
||||
@Override
|
||||
public Custom read(StreamInput in, String key) throws IOException {
|
||||
return in.readNamedWriteable(Custom.class, key);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
|
||||
return in.readNamedWriteable(NamedDiff.class, key);
|
||||
}
|
||||
});
|
||||
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -761,21 +747,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
|
|||
nodes.writeTo(out);
|
||||
metaData.writeTo(out);
|
||||
blocks.writeTo(out);
|
||||
Diff<ImmutableOpenMap<String, Custom>> customsDiff = customs;
|
||||
if (out.getVersion().before(SnapshotDeletionsInProgress.VERSION_INTRODUCED)) {
|
||||
customsDiff = removeSnapshotDeletionsCustomDiff(customsDiff);
|
||||
}
|
||||
customsDiff.writeTo(out);
|
||||
}
|
||||
|
||||
private Diff<ImmutableOpenMap<String, Custom>> removeSnapshotDeletionsCustomDiff(Diff<ImmutableOpenMap<String, Custom>> customs) {
|
||||
if (customs instanceof DiffableUtils.ImmutableOpenMapDiff) {
|
||||
@SuppressWarnings("unchecked")
|
||||
DiffableUtils.ImmutableOpenMapDiff customsDiff = ((DiffableUtils.ImmutableOpenMapDiff) customs)
|
||||
.withKeyRemoved(SnapshotDeletionsInProgress.TYPE);
|
||||
return customsDiff;
|
||||
}
|
||||
return customs;
|
||||
customs.writeTo(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.IntCursor;
|
|||
import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -75,7 +76,7 @@ public final class DiffableUtils {
|
|||
/**
|
||||
* Calculates diff between two ImmutableOpenMaps of non-diffable objects
|
||||
*/
|
||||
public static <K, T> MapDiff<K, T, ImmutableOpenMap<K, T>> diff(ImmutableOpenMap<K, T> before, ImmutableOpenMap<K, T> after, KeySerializer<K> keySerializer, NonDiffableValueSerializer<K, T> valueSerializer) {
|
||||
public static <K, T> MapDiff<K, T, ImmutableOpenMap<K, T>> diff(ImmutableOpenMap<K, T> before, ImmutableOpenMap<K, T> after, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
|
||||
assert after != null && before != null;
|
||||
return new ImmutableOpenMapDiff<>(before, after, keySerializer, valueSerializer);
|
||||
}
|
||||
|
@ -91,7 +92,7 @@ public final class DiffableUtils {
|
|||
/**
|
||||
* Calculates diff between two ImmutableOpenIntMaps of non-diffable objects
|
||||
*/
|
||||
public static <T> MapDiff<Integer, T, ImmutableOpenIntMap<T>> diff(ImmutableOpenIntMap<T> before, ImmutableOpenIntMap<T> after, KeySerializer<Integer> keySerializer, NonDiffableValueSerializer<Integer, T> valueSerializer) {
|
||||
public static <T> MapDiff<Integer, T, ImmutableOpenIntMap<T>> diff(ImmutableOpenIntMap<T> before, ImmutableOpenIntMap<T> after, KeySerializer<Integer> keySerializer, ValueSerializer<Integer, T> valueSerializer) {
|
||||
assert after != null && before != null;
|
||||
return new ImmutableOpenIntMapDiff<>(before, after, keySerializer, valueSerializer);
|
||||
}
|
||||
|
@ -107,7 +108,7 @@ public final class DiffableUtils {
|
|||
/**
|
||||
* Calculates diff between two Maps of non-diffable objects
|
||||
*/
|
||||
public static <K, T> MapDiff<K, T, Map<K, T>> diff(Map<K, T> before, Map<K, T> after, KeySerializer<K> keySerializer, NonDiffableValueSerializer<K, T> valueSerializer) {
|
||||
public static <K, T> MapDiff<K, T, Map<K, T>> diff(Map<K, T> before, Map<K, T> after, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
|
||||
assert after != null && before != null;
|
||||
return new JdkMapDiff<>(before, after, keySerializer, valueSerializer);
|
||||
}
|
||||
|
@ -436,12 +437,29 @@ public final class DiffableUtils {
|
|||
for (K delete : deletes) {
|
||||
keySerializer.writeKey(delete, out);
|
||||
}
|
||||
out.writeVInt(diffs.size());
|
||||
for (Map.Entry<K, Diff<T>> entry : diffs.entrySet()) {
|
||||
keySerializer.writeKey(entry.getKey(), out);
|
||||
valueSerializer.writeDiff(entry.getValue(), out);
|
||||
Version version = out.getVersion();
|
||||
// filter out custom states not supported by the other node
|
||||
int diffCount = 0;
|
||||
for (Diff<T> diff : diffs.values()) {
|
||||
if(valueSerializer.supportsVersion(diff, version)) {
|
||||
diffCount++;
|
||||
}
|
||||
}
|
||||
out.writeVInt(upserts.size());
|
||||
out.writeVInt(diffCount);
|
||||
for (Map.Entry<K, Diff<T>> entry : diffs.entrySet()) {
|
||||
if(valueSerializer.supportsVersion(entry.getValue(), version)) {
|
||||
keySerializer.writeKey(entry.getKey(), out);
|
||||
valueSerializer.writeDiff(entry.getValue(), out);
|
||||
}
|
||||
}
|
||||
// filter out custom states not supported by the other node
|
||||
int upsertsCount = 0;
|
||||
for (T upsert : upserts.values()) {
|
||||
if(valueSerializer.supportsVersion(upsert, version)) {
|
||||
upsertsCount++;
|
||||
}
|
||||
}
|
||||
out.writeVInt(upsertsCount);
|
||||
for (Map.Entry<K, T> entry : upserts.entrySet()) {
|
||||
keySerializer.writeKey(entry.getKey(), out);
|
||||
valueSerializer.write(entry.getValue(), out);
|
||||
|
@ -541,6 +559,20 @@ public final class DiffableUtils {
|
|||
*/
|
||||
boolean supportsDiffableValues();
|
||||
|
||||
/**
|
||||
* Whether this serializer supports the version of the output stream
|
||||
*/
|
||||
default boolean supportsVersion(Diff<V> value, Version version) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether this serializer supports the version of the output stream
|
||||
*/
|
||||
default boolean supportsVersion(V value, Version version) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes diff if this serializer supports diffable values
|
||||
*/
|
||||
|
|
|
@ -19,11 +19,18 @@
|
|||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
|
||||
/**
|
||||
* Diff that also support NamedWriteable interface
|
||||
*/
|
||||
public interface NamedDiff<T extends Diffable<T>> extends Diff<T>, NamedWriteable {
|
||||
/**
|
||||
* The minimal version of the recipient this custom object can be sent to
|
||||
*/
|
||||
default Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
|
||||
/**
|
||||
* Diff that also support NamedWriteable interface
|
||||
*/
|
||||
public interface NamedDiffable<T> extends Diffable<T>, NamedWriteable {
|
||||
/**
|
||||
* The minimal version of the recipient this custom object can be sent to
|
||||
*/
|
||||
default Version getMinimalSupportedVersion() {
|
||||
return Version.CURRENT.minimumCompatibilityVersion();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Value Serializer for named diffables
|
||||
*/
|
||||
public class NamedDiffableValueSerializer<T extends NamedDiffable<T>> extends DiffableUtils.DiffableValueSerializer<String, T> {
|
||||
|
||||
private final Class<T> tClass;
|
||||
|
||||
public NamedDiffableValueSerializer(Class<T> tClass) {
|
||||
this.tClass = tClass;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T read(StreamInput in, String key) throws IOException {
|
||||
return in.readNamedWriteable(tClass, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsVersion(Diff<T> value, Version version) {
|
||||
return version.onOrAfter(((NamedDiff<?>)value).getMinimalSupportedVersion());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsVersion(T value, Version version) {
|
||||
return version.onOrAfter(value.getMinimalSupportedVersion());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Diff<T> readDiff(StreamInput in, String key) throws IOException {
|
||||
return in.readNamedWriteable(NamedDiff.class, key);
|
||||
}
|
||||
}
|
|
@ -128,6 +128,11 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
|
|||
return readDiffFrom(Custom.class, TYPE, in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return VERSION_INTRODUCED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startArray(TYPE);
|
||||
|
|
|
@ -24,17 +24,18 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.Diffable;
|
||||
import org.elasticsearch.cluster.DiffableUtils;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.cluster.NamedDiffable;
|
||||
import org.elasticsearch.cluster.NamedDiffableValueSerializer;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.HppcMaps;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
|
@ -110,7 +111,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
*/
|
||||
public static EnumSet<XContentContext> ALL_CONTEXTS = EnumSet.allOf(XContentContext.class);
|
||||
|
||||
public interface Custom extends Diffable<Custom>, ToXContent, NamedWriteable {
|
||||
public interface Custom extends NamedDiffable<Custom>, ToXContent {
|
||||
|
||||
EnumSet<XContentContext> context();
|
||||
}
|
||||
|
@ -130,6 +131,8 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
|
||||
public static final String GLOBAL_STATE_FILE_PREFIX = "global-";
|
||||
|
||||
private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
|
||||
|
||||
private final String clusterUUID;
|
||||
private final long version;
|
||||
|
||||
|
@ -604,7 +607,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
persistentSettings = after.persistentSettings;
|
||||
indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer());
|
||||
templates = DiffableUtils.diff(before.templates, after.templates, DiffableUtils.getStringKeySerializer());
|
||||
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer());
|
||||
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
|
||||
}
|
||||
|
||||
public MetaDataDiff(StreamInput in) throws IOException {
|
||||
|
@ -616,19 +619,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
IndexMetaData::readDiffFrom);
|
||||
templates = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), IndexTemplateMetaData::readFrom,
|
||||
IndexTemplateMetaData::readDiffFrom);
|
||||
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
|
||||
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
|
||||
@Override
|
||||
public Custom read(StreamInput in, String key) throws IOException {
|
||||
return in.readNamedWriteable(Custom.class, key);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
|
||||
return in.readNamedWriteable(NamedDiff.class, key);
|
||||
}
|
||||
});
|
||||
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -692,9 +683,18 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
for (ObjectCursor<IndexTemplateMetaData> cursor : templates.values()) {
|
||||
cursor.value.writeTo(out);
|
||||
}
|
||||
out.writeVInt(customs.size());
|
||||
for (ObjectObjectCursor<String, Custom> cursor : customs) {
|
||||
out.writeNamedWriteable(cursor.value);
|
||||
// filter out custom states not supported by the other node
|
||||
int numberOfCustoms = 0;
|
||||
for (ObjectCursor<Custom> cursor : customs.values()) {
|
||||
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
|
||||
numberOfCustoms++;
|
||||
}
|
||||
}
|
||||
out.writeVInt(numberOfCustoms);
|
||||
for (ObjectCursor<Custom> cursor : customs.values()) {
|
||||
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
|
||||
out.writeNamedWriteable(cursor.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,20 +23,31 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.cluster.ClusterModule;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.ESAllocationTestCase;
|
||||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class ClusterSerializationTests extends ESAllocationTestCase {
|
||||
|
||||
|
@ -89,4 +100,64 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
|
|||
assertThat(target.toString(), equalTo(source.toString()));
|
||||
}
|
||||
|
||||
public void testSnapshotDeletionsInProgressSerialization() throws Exception {
|
||||
|
||||
boolean includeRestore = randomBoolean();
|
||||
|
||||
ClusterState.Builder builder = ClusterState.builder(ClusterState.EMPTY_STATE)
|
||||
.putCustom(SnapshotDeletionsInProgress.TYPE,
|
||||
SnapshotDeletionsInProgress.newInstance(
|
||||
new SnapshotDeletionsInProgress.Entry(
|
||||
new Snapshot("repo1", new SnapshotId("snap1", UUIDs.randomBase64UUID())),
|
||||
randomNonNegativeLong(), randomNonNegativeLong())
|
||||
));
|
||||
if (includeRestore) {
|
||||
builder.putCustom(RestoreInProgress.TYPE,
|
||||
new RestoreInProgress(
|
||||
new RestoreInProgress.Entry(
|
||||
new Snapshot("repo2", new SnapshotId("snap2", UUIDs.randomBase64UUID())),
|
||||
RestoreInProgress.State.STARTED,
|
||||
Collections.singletonList("index_name"),
|
||||
ImmutableOpenMap.of()
|
||||
)
|
||||
));
|
||||
}
|
||||
|
||||
ClusterState clusterState = builder.incrementVersion().build();
|
||||
|
||||
Diff<ClusterState> diffs = clusterState.diff(ClusterState.EMPTY_STATE);
|
||||
|
||||
// serialize with current version
|
||||
BytesStreamOutput outStream = new BytesStreamOutput();
|
||||
diffs.writeTo(outStream);
|
||||
StreamInput inStream = outStream.bytes().streamInput();
|
||||
inStream = new NamedWriteableAwareStreamInput(inStream, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
|
||||
Diff<ClusterState> serializedDiffs = ClusterState.readDiffFrom(inStream, clusterState.nodes().getLocalNode());
|
||||
ClusterState stateAfterDiffs = serializedDiffs.apply(ClusterState.EMPTY_STATE);
|
||||
assertThat(stateAfterDiffs.custom(RestoreInProgress.TYPE), includeRestore ? notNullValue() : nullValue());
|
||||
assertThat(stateAfterDiffs.custom(SnapshotDeletionsInProgress.TYPE), notNullValue());
|
||||
|
||||
// serialize with old version
|
||||
outStream = new BytesStreamOutput();
|
||||
outStream.setVersion(Version.CURRENT.minimumCompatibilityVersion());
|
||||
diffs.writeTo(outStream);
|
||||
inStream = outStream.bytes().streamInput();
|
||||
inStream = new NamedWriteableAwareStreamInput(inStream, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
|
||||
serializedDiffs = ClusterState.readDiffFrom(inStream, clusterState.nodes().getLocalNode());
|
||||
stateAfterDiffs = serializedDiffs.apply(ClusterState.EMPTY_STATE);
|
||||
assertThat(stateAfterDiffs.custom(RestoreInProgress.TYPE), includeRestore ? notNullValue() : nullValue());
|
||||
assertThat(stateAfterDiffs.custom(SnapshotDeletionsInProgress.TYPE), nullValue());
|
||||
|
||||
// remove the custom and try serializing again with old version
|
||||
clusterState = ClusterState.builder(clusterState).removeCustom(SnapshotDeletionsInProgress.TYPE).incrementVersion().build();
|
||||
outStream = new BytesStreamOutput();
|
||||
diffs.writeTo(outStream);
|
||||
inStream = outStream.bytes().streamInput();
|
||||
inStream = new NamedWriteableAwareStreamInput(inStream, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
|
||||
serializedDiffs = ClusterState.readDiffFrom(inStream, clusterState.nodes().getLocalNode());
|
||||
stateAfterDiffs = serializedDiffs.apply(stateAfterDiffs);
|
||||
assertThat(stateAfterDiffs.custom(RestoreInProgress.TYPE), includeRestore ? notNullValue() : nullValue());
|
||||
assertThat(stateAfterDiffs.custom(SnapshotDeletionsInProgress.TYPE), nullValue());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue