diff --git a/core/src/main/java/org/elasticsearch/common/transport/DummyTransportAddress.java b/core/src/main/java/org/elasticsearch/common/transport/DummyTransportAddress.java index a5912250154..dae4a37765d 100644 --- a/core/src/main/java/org/elasticsearch/common/transport/DummyTransportAddress.java +++ b/core/src/main/java/org/elasticsearch/common/transport/DummyTransportAddress.java @@ -19,7 +19,6 @@ package org.elasticsearch.common.transport; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; @@ -64,11 +63,6 @@ public class DummyTransportAddress implements TransportAddress { return 42; } - @Override - public DummyTransportAddress readFrom(StreamInput in) throws IOException { - return INSTANCE; - } - @Override public void writeTo(StreamOutput out) throws IOException { } diff --git a/core/src/main/java/org/elasticsearch/common/transport/InetSocketTransportAddress.java b/core/src/main/java/org/elasticsearch/common/transport/InetSocketTransportAddress.java index d6db17e7e24..94c1a2390ac 100644 --- a/core/src/main/java/org/elasticsearch/common/transport/InetSocketTransportAddress.java +++ b/core/src/main/java/org/elasticsearch/common/transport/InetSocketTransportAddress.java @@ -31,24 +31,10 @@ import java.net.InetSocketAddress; * A transport address used for IP socket address (wraps {@link java.net.InetSocketAddress}). */ public final class InetSocketTransportAddress implements TransportAddress { - - public static final InetSocketTransportAddress PROTO = new InetSocketTransportAddress(); + public static final short TYPE_ID = 1; private final InetSocketAddress address; - public InetSocketTransportAddress(StreamInput in) throws IOException { - final int len = in.readByte(); - final byte[] a = new byte[len]; // 4 bytes (IPv4) or 16 bytes (IPv6) - in.readFully(a); - InetAddress inetAddress = InetAddress.getByAddress(a); - int port = in.readInt(); - this.address = new InetSocketAddress(inetAddress, port); - } - - private InetSocketTransportAddress() { - address = null; - } - public InetSocketTransportAddress(InetAddress address, int port) { this(new InetSocketAddress(address, port)); } @@ -63,9 +49,32 @@ public final class InetSocketTransportAddress implements TransportAddress { this.address = address; } + /** + * Read from a stream. + */ + public InetSocketTransportAddress(StreamInput in) throws IOException { + final int len = in.readByte(); + final byte[] a = new byte[len]; // 4 bytes (IPv4) or 16 bytes (IPv6) + in.readFully(a); + InetAddress inetAddress = InetAddress.getByAddress(a); + int port = in.readInt(); + this.address = new InetSocketAddress(inetAddress, port); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + byte[] bytes = address().getAddress().getAddress(); // 4 bytes (IPv4) or 16 bytes (IPv6) + out.writeByte((byte) bytes.length); // 1 byte + out.write(bytes, 0, bytes.length); + // don't serialize scope ids over the network!!!! + // these only make sense with respect to the local machine, and will only formulate + // the address incorrectly remotely. + out.writeInt(address.getPort()); + } + @Override public short uniqueAddressTypeId() { - return 1; + return TYPE_ID; } @Override @@ -98,23 +107,6 @@ public final class InetSocketTransportAddress implements TransportAddress { return this.address; } - @Override - public TransportAddress readFrom(StreamInput in) throws IOException { - return new InetSocketTransportAddress(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - byte[] bytes = address().getAddress().getAddress(); // 4 bytes (IPv4) or 16 bytes (IPv6) - out.writeByte((byte) bytes.length); // 1 byte - out.write(bytes, 0, bytes.length); - // don't serialize scope ids over the network!!!! - // these only make sense with respect to the local machine, and will only formulate - // the address incorrectly remotely. - out.writeInt(address.getPort()); - } - - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/core/src/main/java/org/elasticsearch/common/transport/LocalTransportAddress.java b/core/src/main/java/org/elasticsearch/common/transport/LocalTransportAddress.java index c78cf62988a..9ded8dd23e1 100644 --- a/core/src/main/java/org/elasticsearch/common/transport/LocalTransportAddress.java +++ b/core/src/main/java/org/elasticsearch/common/transport/LocalTransportAddress.java @@ -28,17 +28,24 @@ import java.io.IOException; * */ public final class LocalTransportAddress implements TransportAddress { - - public static final LocalTransportAddress PROTO = new LocalTransportAddress("_na"); + public static final short TYPE_ID = 2; private String id; + public LocalTransportAddress(String id) { + this.id = id; + } + + /** + * Read from a stream. + */ public LocalTransportAddress(StreamInput in) throws IOException { id = in.readString(); } - public LocalTransportAddress(String id) { - this.id = id; + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); } public String id() { @@ -47,7 +54,7 @@ public final class LocalTransportAddress implements TransportAddress { @Override public short uniqueAddressTypeId() { - return 2; + return TYPE_ID; } @Override @@ -75,16 +82,6 @@ public final class LocalTransportAddress implements TransportAddress { return 0; } - @Override - public LocalTransportAddress readFrom(StreamInput in) throws IOException { - return new LocalTransportAddress(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(id); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/core/src/main/java/org/elasticsearch/common/transport/TransportAddressSerializers.java b/core/src/main/java/org/elasticsearch/common/transport/TransportAddressSerializers.java index 026f8492aba..d5db2558163 100644 --- a/core/src/main/java/org/elasticsearch/common/transport/TransportAddressSerializers.java +++ b/core/src/main/java/org/elasticsearch/common/transport/TransportAddressSerializers.java @@ -21,8 +21,7 @@ package org.elasticsearch.common.transport; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; import java.util.HashMap; @@ -31,45 +30,35 @@ import java.util.Map; import static java.util.Collections.unmodifiableMap; /** - * A global registry of all different types of {@link org.elasticsearch.common.transport.TransportAddress} allowing - * to perform serialization of them. - *

- * By default, adds {@link org.elasticsearch.common.transport.InetSocketTransportAddress}. - * - * + * A global registry of all supported types of {@link TransportAddress}s. This registry is not open for modification by plugins. */ public abstract class TransportAddressSerializers { - - private static final ESLogger logger = Loggers.getLogger(TransportAddressSerializers.class); - - private static final Map ADDRESS_REGISTRY; + private static final Map> ADDRESS_REGISTRY; static { - Map registry = new HashMap<>(); - try { - addAddressType(registry, DummyTransportAddress.INSTANCE); - addAddressType(registry, InetSocketTransportAddress.PROTO); - addAddressType(registry, LocalTransportAddress.PROTO); - } catch (Exception e) { - logger.warn("Failed to setup TransportAddresses", e); - } + Map> registry = new HashMap<>(); + addAddressType(registry, DummyTransportAddress.INSTANCE.uniqueAddressTypeId(), (in) -> DummyTransportAddress.INSTANCE); + addAddressType(registry, InetSocketTransportAddress.TYPE_ID, InetSocketTransportAddress::new); + addAddressType(registry, LocalTransportAddress.TYPE_ID, LocalTransportAddress::new); ADDRESS_REGISTRY = unmodifiableMap(registry); } - public static synchronized void addAddressType(Map registry, TransportAddress address) throws Exception { - if (registry.containsKey(address.uniqueAddressTypeId())) { - throw new IllegalStateException("Address [" + address.uniqueAddressTypeId() + "] already bound"); + private static void addAddressType(Map> registry, short uniqueAddressTypeId, + Writeable.Reader address) { + if (registry.containsKey(uniqueAddressTypeId)) { + throw new IllegalStateException("Address [" + uniqueAddressTypeId + "] already bound"); } - registry.put(address.uniqueAddressTypeId(), address); + registry.put(uniqueAddressTypeId, address); } public static TransportAddress addressFromStream(StreamInput input) throws IOException { + // TODO why don't we just use named writeables here? short addressUniqueId = input.readShort(); - TransportAddress addressType = ADDRESS_REGISTRY.get(addressUniqueId); + Writeable.Reader addressType = ADDRESS_REGISTRY.get(addressUniqueId); if (addressType == null) { throw new IOException("No transport address mapped to [" + addressUniqueId + "]"); } - return addressType.readFrom(input); + return addressType.read(input); } public static void addressToStream(StreamOutput out, TransportAddress address) throws IOException { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 5837e0e193b..15b1463e4d0 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -251,7 +251,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase { assertEquals(100L, DiskThresholdDecider.getShardSize(test_1, info)); assertEquals(10L, DiskThresholdDecider.getShardSize(test_0, info)); - RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", LocalTransportAddress.PROTO, + RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", new LocalTransportAddress("test"), emptyMap(), emptySet(), Version.CURRENT), Arrays.asList(test_0, test_1.buildTargetRelocatingShard(), test_2)); assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false, "/dev/null")); assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/null")); @@ -270,7 +270,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase { ShardRoutingHelper.relocate(other_0, "node1"); - node = new RoutingNode("node1", new DiscoveryNode("node1", LocalTransportAddress.PROTO, + node = new RoutingNode("node1", new DiscoveryNode("node1", new LocalTransportAddress("test"), emptyMap(), emptySet(), Version.CURRENT), Arrays.asList(test_0, test_1.buildTargetRelocatingShard(), test_2, other_0.buildTargetRelocatingShard())); if (other_0.primary()) { assertEquals(10100L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false, "/dev/null"));