diff --git a/src/main/java/org/elasticsearch/Version.java b/src/main/java/org/elasticsearch/Version.java index 60e092fa5bc..8d6ffa9dea1 100644 --- a/src/main/java/org/elasticsearch/Version.java +++ b/src/main/java/org/elasticsearch/Version.java @@ -97,24 +97,8 @@ public class Version implements Serializable { public static Version fromId(int id) { switch (id) { - case V_0_18_0_ID: - return V_0_18_0; - case V_0_18_1_ID: - return V_0_18_1; - case V_0_18_2_ID: - return V_0_18_2; - case V_0_18_3_ID: - return V_0_18_3; - case V_0_18_4_ID: - return V_0_18_4; - case V_0_18_5_ID: - return V_0_18_5; - case V_0_18_6_ID: - return V_0_18_6; - case V_0_18_7_ID: - return V_0_18_7; - case V_0_18_8_ID: - return V_0_18_8; + case V_0_20_0_Beta1_ID: + return V_0_20_0_Beta1; case V_0_19_0_RC1_ID: return V_0_19_0_RC1; @@ -145,8 +129,24 @@ public class Version implements Serializable { case V_0_19_10_ID: return V_0_19_10; - case V_0_20_0_Beta1_ID: - return V_0_20_0_Beta1; + case V_0_18_0_ID: + return V_0_18_0; + case V_0_18_1_ID: + return V_0_18_1; + case V_0_18_2_ID: + return V_0_18_2; + case V_0_18_3_ID: + return V_0_18_3; + case V_0_18_4_ID: + return V_0_18_4; + case V_0_18_5_ID: + return V_0_18_5; + case V_0_18_6_ID: + return V_0_18_6; + case V_0_18_7_ID: + return V_0_18_7; + case V_0_18_8_ID: + return V_0_18_8; default: return new Version(id, null); @@ -220,4 +220,21 @@ public class Version implements Serializable { } return sb.toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Version version = (Version) o; + + if (id != version.id) return false; + + return true; + } + + @Override + public int hashCode() { + return id; + } } diff --git a/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java b/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java index 8587f67dfeb..184c91ec2b5 100644 --- a/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java +++ b/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.compress; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import java.io.EOFException; @@ -40,9 +41,16 @@ public abstract class CompressedStreamInput extends public CompressedStreamInput(StreamInput in, T context) throws IOException { this.in = in; this.context = context; + super.setVersion(in.getVersion()); readHeader(in); } + @Override + public StreamInput setVersion(Version version) { + in.setVersion(version); + return super.setVersion(version); + } + /** * Expert!, resets to buffer start, without the need to decompress it again. */ diff --git a/src/main/java/org/elasticsearch/common/compress/CompressedStreamOutput.java b/src/main/java/org/elasticsearch/common/compress/CompressedStreamOutput.java index 7f95828a1ca..57f15b5e944 100644 --- a/src/main/java/org/elasticsearch/common/compress/CompressedStreamOutput.java +++ b/src/main/java/org/elasticsearch/common/compress/CompressedStreamOutput.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.compress; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; @@ -39,9 +40,16 @@ public abstract class CompressedStreamOutput extend public CompressedStreamOutput(StreamOutput out, T context) throws IOException { this.out = out; this.context = context; + super.setVersion(out.getVersion()); writeHeader(out); } + @Override + public StreamOutput setVersion(Version version) { + out.setVersion(version); + return super.setVersion(version); + } + @Override public void write(int b) throws IOException { if (position >= uncompressedLength) { diff --git a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java index 0c781e694b7..cafd5c485da 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java @@ -1,5 +1,6 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.Version; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.text.Text; @@ -16,6 +17,13 @@ public abstract class AdapterStreamInput extends StreamInput { public AdapterStreamInput(StreamInput in) { this.in = in; + super.setVersion(in.getVersion()); + } + + @Override + public StreamInput setVersion(Version version) { + in.setVersion(version); + return super.setVersion(version); } public void reset(StreamInput in) { diff --git a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java index 97fbc098ddd..5fc112c404c 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.text.Text; @@ -33,6 +34,13 @@ public class AdapterStreamOutput extends StreamOutput { public AdapterStreamOutput(StreamOutput out) { this.out = out; + super.setVersion(out.getVersion()); + } + + @Override + public StreamOutput setVersion(Version version) { + out.setVersion(version); + return super.setVersion(version); } public void setOut(StreamOutput out) { diff --git a/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 32e07c224e5..d692c9f4cb1 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; @@ -36,6 +37,17 @@ import java.util.*; */ public abstract class StreamInput extends InputStream { + private Version version = Version.CURRENT; + + public Version getVersion() { + return this.version; + } + + public StreamInput setVersion(Version version) { + this.version = version; + return this; + } + /** * Reads and returns a single byte. */ diff --git a/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 9aeae0cda55..1d87a490320 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.UTF8StreamWriter; @@ -37,6 +38,17 @@ import java.util.Map; */ public abstract class StreamOutput extends OutputStream { + private Version version = Version.CURRENT; + + public Version getVersion() { + return this.version; + } + + public StreamOutput setVersion(Version version) { + this.version = version; + return this; + } + public boolean seekPositionSupported() { return false; } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 3754173a831..37487dc6dd9 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -394,6 +394,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem if (internal) { StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset() + INTERNAL_HEADER.length, datagramPacketReceive.getLength(), true)); Version version = Version.readVersion(input); + input.setVersion(version); id = input.readInt(); clusterName = ClusterName.readClusterName(input); requestingNodeX = readNode(input); diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 1b6398d3843..77772959815 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -19,9 +19,10 @@ package org.elasticsearch.discovery.zen.publish; +import com.google.common.collect.Maps; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.Compressor; @@ -33,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; +import java.util.Map; /** * @@ -66,36 +68,44 @@ public class PublishClusterStateAction extends AbstractComponent { public void publish(ClusterState clusterState) { DiscoveryNode localNode = nodesProvider.nodes().localNode(); - // serialize the cluster state here, so we won't do it several times per node - CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); - byte[] clusterStateInBytes; + Map serializedStates = Maps.newHashMap(); try { - StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor()); - ClusterState.Builder.writeTo(clusterState, stream); - stream.close(); - clusterStateInBytes = cachedEntry.bytes().bytes().copyBytesArray().toBytes(); - } catch (Exception e) { - logger.warn("failed to serialize cluster_state before publishing it to nodes", e); - return; - } finally { - CachedStreamOutput.pushEntry(cachedEntry); - } + for (final DiscoveryNode node : clusterState.nodes()) { + if (node.equals(localNode)) { + // no need to send to our self + continue; + } + // try and serialize the cluster state once (or per version), so we don't serialize it + // per node when we send it over the wire, compress it while we are at it... + CachedStreamOutput.Entry entry = serializedStates.get(node.version()); + if (entry == null) { + try { + entry = CachedStreamOutput.popEntry(); + StreamOutput stream = entry.handles(CompressorFactory.defaultCompressor()); + stream.setVersion(node.version()); + ClusterState.Builder.writeTo(clusterState, stream); + stream.close(); + serializedStates.put(node.version(), entry); + } catch (Exception e) { + logger.warn("failed to serialize cluster_state before publishing it to nodes", e); + return; + } + } + transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, + new PublishClusterStateRequest(entry.bytes().bytes()), + TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes - for (final DiscoveryNode node : clusterState.nodes()) { - if (node.equals(localNode)) { - // no need to send to our self - continue; + new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleException(TransportException exp) { + logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node); + } + }); + } + } finally { + for (CachedStreamOutput.Entry entry : serializedStates.values()) { + CachedStreamOutput.pushEntry(entry); } - transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, - new PublishClusterStateRequest(clusterStateInBytes), - TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes - - new VoidTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node); - } - }); } } @@ -106,8 +116,8 @@ public class PublishClusterStateAction extends AbstractComponent { private PublishClusterStateRequest() { } - private PublishClusterStateRequest(byte[] clusterStateInBytes) { - this.clusterStateInBytes = new BytesArray(clusterStateInBytes); + private PublishClusterStateRequest(BytesReference clusterStateInBytes) { + this.clusterStateInBytes = clusterStateInBytes; } @Override diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 495b1da0fc6..888175bba8d 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.Version; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; @@ -86,8 +87,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { long requestId = buffer.readLong(); byte status = buffer.readByte(); - boolean isRequest = TransportStatus.isRequest(status); - + Version version = Version.fromId(buffer.readInt()); StreamInput wrappedStream; if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) { @@ -106,9 +106,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } else { wrappedStream = CachedStreamInput.cachedHandles(streamIn); } + wrappedStream.setVersion(version); - if (isRequest) { - String action = handleRequest(ctx.getChannel(), wrappedStream, requestId); + if (TransportStatus.isRequest(status)) { + String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version); if (buffer.readerIndex() != expectedIndexReader) { if (buffer.readerIndex() < expectedIndexReader) { logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action); @@ -194,10 +195,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } } - private String handleRequest(Channel channel, StreamInput buffer, long requestId) throws IOException { + private String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { final String action = buffer.readUTF(); - final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId); + final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version); try { final TransportRequestHandler handler = transportServiceAdapter.handler(action); if (handler == null) { diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java b/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java index b0832c84844..e9dceadfecf 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java @@ -19,15 +19,16 @@ package org.elasticsearch.transport.netty; +import org.elasticsearch.Version; import org.jboss.netty.buffer.ChannelBuffer; /** */ public class NettyHeader { - public static final int HEADER_SIZE = 2 + 4 + 8 + 1; + public static final int HEADER_SIZE = 2 + 4 + 8 + 1 + 4; - public static void writeHeader(ChannelBuffer buffer, long requestId, byte status) { + public static void writeHeader(ChannelBuffer buffer, long requestId, byte status, Version version) { int index = buffer.readerIndex(); buffer.setByte(index, 'E'); index += 1; @@ -39,5 +40,7 @@ public class NettyHeader { buffer.setLong(index, requestId); index += 8; buffer.setByte(index, status); + index += 1; + buffer.setInt(index, version.id); } } diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 17985e82888..e8e02a01b42 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -520,18 +520,20 @@ public class NettyTransport extends AbstractLifecycleComponent implem status = TransportStatus.setCompress(status); cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor()); + stream.setVersion(node.version()); stream.writeString(action); message.writeTo(stream); stream.close(); } else { StreamOutput stream = cachedEntry.handles(); cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); + stream.setVersion(node.version()); stream.writeString(action); message.writeTo(stream); stream.close(); } ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer(); - NettyHeader.writeHeader(buffer, requestId, status); + NettyHeader.writeHeader(buffer, requestId, status, node.version()); ChannelFuture future = targetChannel.write(buffer); future.addListener(new CacheFutureListener(cachedEntry)); diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index 2254e2cfead..04850d98e7d 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.netty; +import org.elasticsearch.Version; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -42,17 +43,18 @@ import java.io.NotSerializableException; */ public class NettyTransportChannel implements TransportChannel { - private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; - private final NettyTransport transport; + private final Version version; + private final String action; private final Channel channel; private final long requestId; - public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId) { + public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version) { + this.version = version; this.transport = transport; this.action = action; this.channel = channel; @@ -83,16 +85,18 @@ public class NettyTransportChannel implements TransportChannel { status = TransportStatus.setCompress(status); cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor()); + stream.setVersion(version); message.writeTo(stream); stream.close(); } else { StreamOutput stream = cachedEntry.handles(); + stream.setVersion(version); cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); message.writeTo(stream); stream.close(); } ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer(); - NettyHeader.writeHeader(buffer, requestId, status); + NettyHeader.writeHeader(buffer, requestId, status, version); ChannelFuture future = channel.write(buffer); future.addListener(new NettyTransport.CacheFutureListener(cachedEntry)); } @@ -123,7 +127,7 @@ public class NettyTransportChannel implements TransportChannel { status = TransportStatus.setError(status); ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer(); - NettyHeader.writeHeader(buffer, requestId, status); + NettyHeader.writeHeader(buffer, requestId, status, version); ChannelFuture future = channel.write(buffer); future.addListener(new NettyTransport.CacheFutureListener(cachedEntry)); }