Remove PROTOs from TransportAddresses

We have this TransportAddressSerializers that works similarly to
NamedWriteables except it uses shorts instead of streams. I don't know
enough to propose removing it in favor of NamedWriteables to I just ported
it to using Writeable.Reader and left it alone.

Relates to #17085
This commit is contained in:
Nik Everett 2016-04-18 17:54:18 -04:00
parent 34da01f794
commit 6941966b16
5 changed files with 54 additions and 82 deletions

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
@ -64,11 +63,6 @@ public class DummyTransportAddress implements TransportAddress {
return 42;
}
@Override
public DummyTransportAddress readFrom(StreamInput in) throws IOException {
return INSTANCE;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}

View File

@ -31,24 +31,10 @@ import java.net.InetSocketAddress;
* A transport address used for IP socket address (wraps {@link java.net.InetSocketAddress}).
*/
public final class InetSocketTransportAddress implements TransportAddress {
public static final InetSocketTransportAddress PROTO = new InetSocketTransportAddress();
public static final short TYPE_ID = 1;
private final InetSocketAddress address;
public InetSocketTransportAddress(StreamInput in) throws IOException {
final int len = in.readByte();
final byte[] a = new byte[len]; // 4 bytes (IPv4) or 16 bytes (IPv6)
in.readFully(a);
InetAddress inetAddress = InetAddress.getByAddress(a);
int port = in.readInt();
this.address = new InetSocketAddress(inetAddress, port);
}
private InetSocketTransportAddress() {
address = null;
}
public InetSocketTransportAddress(InetAddress address, int port) {
this(new InetSocketAddress(address, port));
}
@ -63,9 +49,32 @@ public final class InetSocketTransportAddress implements TransportAddress {
this.address = address;
}
/**
* Read from a stream.
*/
public InetSocketTransportAddress(StreamInput in) throws IOException {
final int len = in.readByte();
final byte[] a = new byte[len]; // 4 bytes (IPv4) or 16 bytes (IPv6)
in.readFully(a);
InetAddress inetAddress = InetAddress.getByAddress(a);
int port = in.readInt();
this.address = new InetSocketAddress(inetAddress, port);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
byte[] bytes = address().getAddress().getAddress(); // 4 bytes (IPv4) or 16 bytes (IPv6)
out.writeByte((byte) bytes.length); // 1 byte
out.write(bytes, 0, bytes.length);
// don't serialize scope ids over the network!!!!
// these only make sense with respect to the local machine, and will only formulate
// the address incorrectly remotely.
out.writeInt(address.getPort());
}
@Override
public short uniqueAddressTypeId() {
return 1;
return TYPE_ID;
}
@Override
@ -98,23 +107,6 @@ public final class InetSocketTransportAddress implements TransportAddress {
return this.address;
}
@Override
public TransportAddress readFrom(StreamInput in) throws IOException {
return new InetSocketTransportAddress(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
byte[] bytes = address().getAddress().getAddress(); // 4 bytes (IPv4) or 16 bytes (IPv6)
out.writeByte((byte) bytes.length); // 1 byte
out.write(bytes, 0, bytes.length);
// don't serialize scope ids over the network!!!!
// these only make sense with respect to the local machine, and will only formulate
// the address incorrectly remotely.
out.writeInt(address.getPort());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -28,17 +28,24 @@ import java.io.IOException;
*
*/
public final class LocalTransportAddress implements TransportAddress {
public static final LocalTransportAddress PROTO = new LocalTransportAddress("_na");
public static final short TYPE_ID = 2;
private String id;
public LocalTransportAddress(String id) {
this.id = id;
}
/**
* Read from a stream.
*/
public LocalTransportAddress(StreamInput in) throws IOException {
id = in.readString();
}
public LocalTransportAddress(String id) {
this.id = id;
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
}
public String id() {
@ -47,7 +54,7 @@ public final class LocalTransportAddress implements TransportAddress {
@Override
public short uniqueAddressTypeId() {
return 2;
return TYPE_ID;
}
@Override
@ -75,16 +82,6 @@ public final class LocalTransportAddress implements TransportAddress {
return 0;
}
@Override
public LocalTransportAddress readFrom(StreamInput in) throws IOException {
return new LocalTransportAddress(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -21,8 +21,7 @@ package org.elasticsearch.common.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.HashMap;
@ -31,45 +30,35 @@ import java.util.Map;
import static java.util.Collections.unmodifiableMap;
/**
* A global registry of all different types of {@link org.elasticsearch.common.transport.TransportAddress} allowing
* to perform serialization of them.
* <p>
* By default, adds {@link org.elasticsearch.common.transport.InetSocketTransportAddress}.
*
*
* A global registry of all supported types of {@link TransportAddress}s. This registry is not open for modification by plugins.
*/
public abstract class TransportAddressSerializers {
private static final ESLogger logger = Loggers.getLogger(TransportAddressSerializers.class);
private static final Map<Short, TransportAddress> ADDRESS_REGISTRY;
private static final Map<Short, Writeable.Reader<TransportAddress>> ADDRESS_REGISTRY;
static {
Map<Short, TransportAddress> registry = new HashMap<>();
try {
addAddressType(registry, DummyTransportAddress.INSTANCE);
addAddressType(registry, InetSocketTransportAddress.PROTO);
addAddressType(registry, LocalTransportAddress.PROTO);
} catch (Exception e) {
logger.warn("Failed to setup TransportAddresses", e);
}
Map<Short, Writeable.Reader<TransportAddress>> registry = new HashMap<>();
addAddressType(registry, DummyTransportAddress.INSTANCE.uniqueAddressTypeId(), (in) -> DummyTransportAddress.INSTANCE);
addAddressType(registry, InetSocketTransportAddress.TYPE_ID, InetSocketTransportAddress::new);
addAddressType(registry, LocalTransportAddress.TYPE_ID, LocalTransportAddress::new);
ADDRESS_REGISTRY = unmodifiableMap(registry);
}
public static synchronized void addAddressType(Map<Short, TransportAddress> registry, TransportAddress address) throws Exception {
if (registry.containsKey(address.uniqueAddressTypeId())) {
throw new IllegalStateException("Address [" + address.uniqueAddressTypeId() + "] already bound");
private static void addAddressType(Map<Short, Writeable.Reader<TransportAddress>> registry, short uniqueAddressTypeId,
Writeable.Reader<TransportAddress> address) {
if (registry.containsKey(uniqueAddressTypeId)) {
throw new IllegalStateException("Address [" + uniqueAddressTypeId + "] already bound");
}
registry.put(address.uniqueAddressTypeId(), address);
registry.put(uniqueAddressTypeId, address);
}
public static TransportAddress addressFromStream(StreamInput input) throws IOException {
// TODO why don't we just use named writeables here?
short addressUniqueId = input.readShort();
TransportAddress addressType = ADDRESS_REGISTRY.get(addressUniqueId);
Writeable.Reader<TransportAddress> addressType = ADDRESS_REGISTRY.get(addressUniqueId);
if (addressType == null) {
throw new IOException("No transport address mapped to [" + addressUniqueId + "]");
}
return addressType.readFrom(input);
return addressType.read(input);
}
public static void addressToStream(StreamOutput out, TransportAddress address) throws IOException {

View File

@ -251,7 +251,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
assertEquals(100L, DiskThresholdDecider.getShardSize(test_1, info));
assertEquals(10L, DiskThresholdDecider.getShardSize(test_0, info));
RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", LocalTransportAddress.PROTO,
RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", new LocalTransportAddress("test"),
emptyMap(), emptySet(), Version.CURRENT), Arrays.asList(test_0, test_1.buildTargetRelocatingShard(), test_2));
assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false, "/dev/null"));
assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, true, "/dev/null"));
@ -270,7 +270,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
ShardRoutingHelper.relocate(other_0, "node1");
node = new RoutingNode("node1", new DiscoveryNode("node1", LocalTransportAddress.PROTO,
node = new RoutingNode("node1", new DiscoveryNode("node1", new LocalTransportAddress("test"),
emptyMap(), emptySet(), Version.CURRENT), Arrays.asList(test_0, test_1.buildTargetRelocatingShard(), test_2, other_0.buildTargetRelocatingShard()));
if (other_0.primary()) {
assertEquals(10100L, DiskThresholdDecider.sizeOfRelocatingShards(node, info, false, "/dev/null"));