Fix interoperability with < 6.3 transport clients (#30971)
With the default distribution changing in 6.3, clusters might now contain custom metadata that a pure OSS transport client cannot deserialize. As this can break transport clients when accessing the cluster state or reroute APIs, we've decided to exclude any custom metadata that the transport client might not be able to deserialize. This will ensure compatibility between a < 6.3 transport client and a 6.3 default distribution cluster. Note that this PR only covers interoperability with older clients, another follow-up PR will cover full interoperability for >= 6.3 transport clients where we will make it possible again to get the custom metadata from the cluster state. Relates to #30731
This commit is contained in:
parent
0791f93dbd
commit
fb671adfd6
|
@ -19,7 +19,9 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.reroute;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.cluster.ClusterModule;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -70,7 +72,11 @@ public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXC
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
state.writeTo(out);
|
||||
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||
state.writeTo(out);
|
||||
} else {
|
||||
ClusterModule.filterCustomsForPre63Clients(state).writeTo(out);
|
||||
}
|
||||
writeAcknowledged(out);
|
||||
RoutingExplanations.writeTo(explanations, out);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.state;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.cluster.ClusterModule;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -94,7 +95,11 @@ public class ClusterStateResponse extends ActionResponse {
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
clusterName.writeTo(out);
|
||||
clusterState.writeTo(out);
|
||||
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||
clusterState.writeTo(out);
|
||||
} else {
|
||||
ClusterModule.filterCustomsForPre63Clients(clusterState).writeTo(out);
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
|
||||
totalCompressedSize.writeTo(out);
|
||||
}
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
import org.elasticsearch.ingest.IngestMetadata;
|
||||
|
@ -84,6 +85,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -150,6 +152,35 @@ public class ClusterModule extends AbstractModule {
|
|||
return entries;
|
||||
}
|
||||
|
||||
static final Set<String> PRE_6_3_METADATA_CUSTOMS_WHITE_LIST = Collections.unmodifiableSet(Sets.newHashSet(
|
||||
IndexGraveyard.TYPE, IngestMetadata.TYPE, RepositoriesMetaData.TYPE, ScriptMetaData.TYPE));
|
||||
|
||||
static final Set<String> PRE_6_3_CLUSTER_CUSTOMS_WHITE_LIST = Collections.unmodifiableSet(Sets.newHashSet(
|
||||
RestoreInProgress.TYPE, SnapshotDeletionsInProgress.TYPE, SnapshotsInProgress.TYPE));
|
||||
|
||||
/**
|
||||
* For interoperability with transport clients older than 6.3, we need to strip customs
|
||||
* from the cluster state that the client might not be able to deserialize
|
||||
*
|
||||
* @param clusterState the cluster state to filter the customs from
|
||||
* @return the adapted cluster state
|
||||
*/
|
||||
public static ClusterState filterCustomsForPre63Clients(ClusterState clusterState) {
|
||||
final ClusterState.Builder builder = ClusterState.builder(clusterState);
|
||||
clusterState.customs().keysIt().forEachRemaining(name -> {
|
||||
if (PRE_6_3_CLUSTER_CUSTOMS_WHITE_LIST.contains(name) == false) {
|
||||
builder.removeCustom(name);
|
||||
}
|
||||
});
|
||||
final MetaData.Builder metaBuilder = MetaData.builder(clusterState.metaData());
|
||||
clusterState.metaData().customs().keysIt().forEachRemaining(name -> {
|
||||
if (PRE_6_3_METADATA_CUSTOMS_WHITE_LIST.contains(name) == false) {
|
||||
metaBuilder.removeCustom(name);
|
||||
}
|
||||
});
|
||||
return builder.metaData(metaBuilder).build();
|
||||
}
|
||||
|
||||
public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
|
||||
List<NamedXContentRegistry.Entry> entries = new ArrayList<>();
|
||||
// Metadata
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
|
||||
|
@ -251,4 +253,29 @@ public class ClusterModuleTests extends ModuleTestCase {
|
|||
assertEquals(ise.getMessage(), "custom supplier key [foo] is registered more than once");
|
||||
}
|
||||
}
|
||||
|
||||
public void testPre63CustomsFiltering() {
|
||||
final String whiteListedClusterCustom = randomFrom(ClusterModule.PRE_6_3_CLUSTER_CUSTOMS_WHITE_LIST);
|
||||
final String whiteListedMetaDataCustom = randomFrom(ClusterModule.PRE_6_3_METADATA_CUSTOMS_WHITE_LIST);
|
||||
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.putCustom(whiteListedClusterCustom, new RestoreInProgress())
|
||||
.putCustom("other", new RestoreInProgress())
|
||||
.metaData(MetaData.builder()
|
||||
.putCustom(whiteListedMetaDataCustom, new RepositoriesMetaData(Collections.emptyList()))
|
||||
.putCustom("other", new RepositoriesMetaData(Collections.emptyList()))
|
||||
.build())
|
||||
.build();
|
||||
|
||||
assertNotNull(clusterState.custom(whiteListedClusterCustom));
|
||||
assertNotNull(clusterState.custom("other"));
|
||||
assertNotNull(clusterState.metaData().custom(whiteListedMetaDataCustom));
|
||||
assertNotNull(clusterState.metaData().custom("other"));
|
||||
|
||||
final ClusterState fixedClusterState = ClusterModule.filterCustomsForPre63Clients(clusterState);
|
||||
|
||||
assertNotNull(fixedClusterState.custom(whiteListedClusterCustom));
|
||||
assertNull(fixedClusterState.custom("other"));
|
||||
assertNotNull(fixedClusterState.metaData().custom(whiteListedMetaDataCustom));
|
||||
assertNull(fixedClusterState.metaData().custom("other"));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue