add node version to each request/response transport message

a version associated with each request, and having it associated with the stream when (de)serializing, can allow us in the future to better support cross version communication
This commit is contained in:
Shay Banon 2012-09-22 02:35:29 +02:00
parent cc7eb263be
commit aa435a288b
13 changed files with 158 additions and 64 deletions

View File

@ -97,24 +97,8 @@ public class Version implements Serializable {
public static Version fromId(int id) { public static Version fromId(int id) {
switch (id) { switch (id) {
case V_0_18_0_ID: case V_0_20_0_Beta1_ID:
return V_0_18_0; return V_0_20_0_Beta1;
case V_0_18_1_ID:
return V_0_18_1;
case V_0_18_2_ID:
return V_0_18_2;
case V_0_18_3_ID:
return V_0_18_3;
case V_0_18_4_ID:
return V_0_18_4;
case V_0_18_5_ID:
return V_0_18_5;
case V_0_18_6_ID:
return V_0_18_6;
case V_0_18_7_ID:
return V_0_18_7;
case V_0_18_8_ID:
return V_0_18_8;
case V_0_19_0_RC1_ID: case V_0_19_0_RC1_ID:
return V_0_19_0_RC1; return V_0_19_0_RC1;
@ -145,8 +129,24 @@ public class Version implements Serializable {
case V_0_19_10_ID: case V_0_19_10_ID:
return V_0_19_10; return V_0_19_10;
case V_0_20_0_Beta1_ID: case V_0_18_0_ID:
return V_0_20_0_Beta1; return V_0_18_0;
case V_0_18_1_ID:
return V_0_18_1;
case V_0_18_2_ID:
return V_0_18_2;
case V_0_18_3_ID:
return V_0_18_3;
case V_0_18_4_ID:
return V_0_18_4;
case V_0_18_5_ID:
return V_0_18_5;
case V_0_18_6_ID:
return V_0_18_6;
case V_0_18_7_ID:
return V_0_18_7;
case V_0_18_8_ID:
return V_0_18_8;
default: default:
return new Version(id, null); return new Version(id, null);
@ -220,4 +220,21 @@ public class Version implements Serializable {
} }
return sb.toString(); return sb.toString();
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Version version = (Version) o;
if (id != version.id) return false;
return true;
}
@Override
public int hashCode() {
return id;
}
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.compress; package org.elasticsearch.common.compress;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import java.io.EOFException; import java.io.EOFException;
@ -40,9 +41,16 @@ public abstract class CompressedStreamInput<T extends CompressorContext> extends
public CompressedStreamInput(StreamInput in, T context) throws IOException { public CompressedStreamInput(StreamInput in, T context) throws IOException {
this.in = in; this.in = in;
this.context = context; this.context = context;
super.setVersion(in.getVersion());
readHeader(in); readHeader(in);
} }
@Override
public StreamInput setVersion(Version version) {
in.setVersion(version);
return super.setVersion(version);
}
/** /**
* Expert!, resets to buffer start, without the need to decompress it again. * Expert!, resets to buffer start, without the need to decompress it again.
*/ */

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.compress; package org.elasticsearch.common.compress;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException; import java.io.IOException;
@ -39,9 +40,16 @@ public abstract class CompressedStreamOutput<T extends CompressorContext> extend
public CompressedStreamOutput(StreamOutput out, T context) throws IOException { public CompressedStreamOutput(StreamOutput out, T context) throws IOException {
this.out = out; this.out = out;
this.context = context; this.context = context;
super.setVersion(out.getVersion());
writeHeader(out); writeHeader(out);
} }
@Override
public StreamOutput setVersion(Version version) {
out.setVersion(version);
return super.setVersion(version);
}
@Override @Override
public void write(int b) throws IOException { public void write(int b) throws IOException {
if (position >= uncompressedLength) { if (position >= uncompressedLength) {

View File

@ -1,5 +1,6 @@
package org.elasticsearch.common.io.stream; package org.elasticsearch.common.io.stream;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
@ -16,6 +17,13 @@ public abstract class AdapterStreamInput extends StreamInput {
public AdapterStreamInput(StreamInput in) { public AdapterStreamInput(StreamInput in) {
this.in = in; this.in = in;
super.setVersion(in.getVersion());
}
@Override
public StreamInput setVersion(Version version) {
in.setVersion(version);
return super.setVersion(version);
} }
public void reset(StreamInput in) { public void reset(StreamInput in) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.io.stream; package org.elasticsearch.common.io.stream;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
@ -33,6 +34,13 @@ public class AdapterStreamOutput extends StreamOutput {
public AdapterStreamOutput(StreamOutput out) { public AdapterStreamOutput(StreamOutput out) {
this.out = out; this.out = out;
super.setVersion(out.getVersion());
}
@Override
public StreamOutput setVersion(Version version) {
out.setVersion(version);
return super.setVersion(version);
} }
public void setOut(StreamOutput out) { public void setOut(StreamOutput out) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.io.stream; package org.elasticsearch.common.io.stream;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
@ -36,6 +37,17 @@ import java.util.*;
*/ */
public abstract class StreamInput extends InputStream { public abstract class StreamInput extends InputStream {
private Version version = Version.CURRENT;
public Version getVersion() {
return this.version;
}
public StreamInput setVersion(Version version) {
this.version = version;
return this;
}
/** /**
* Reads and returns a single byte. * Reads and returns a single byte.
*/ */

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.io.stream; package org.elasticsearch.common.io.stream;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.UTF8StreamWriter; import org.elasticsearch.common.io.UTF8StreamWriter;
@ -37,6 +38,17 @@ import java.util.Map;
*/ */
public abstract class StreamOutput extends OutputStream { public abstract class StreamOutput extends OutputStream {
private Version version = Version.CURRENT;
public Version getVersion() {
return this.version;
}
public StreamOutput setVersion(Version version) {
this.version = version;
return this;
}
public boolean seekPositionSupported() { public boolean seekPositionSupported() {
return false; return false;
} }

View File

@ -394,6 +394,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
if (internal) { if (internal) {
StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset() + INTERNAL_HEADER.length, datagramPacketReceive.getLength(), true)); StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset() + INTERNAL_HEADER.length, datagramPacketReceive.getLength(), true));
Version version = Version.readVersion(input); Version version = Version.readVersion(input);
input.setVersion(version);
id = input.readInt(); id = input.readInt();
clusterName = ClusterName.readClusterName(input); clusterName = ClusterName.readClusterName(input);
requestingNodeX = readNode(input); requestingNodeX = readNode(input);

View File

@ -19,9 +19,10 @@
package org.elasticsearch.discovery.zen.publish; package org.elasticsearch.discovery.zen.publish;
import com.google.common.collect.Maps;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.Compressor;
@ -33,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
/** /**
* *
@ -66,28 +68,31 @@ public class PublishClusterStateAction extends AbstractComponent {
public void publish(ClusterState clusterState) { public void publish(ClusterState clusterState) {
DiscoveryNode localNode = nodesProvider.nodes().localNode(); DiscoveryNode localNode = nodesProvider.nodes().localNode();
// serialize the cluster state here, so we won't do it several times per node Map<Version, CachedStreamOutput.Entry> serializedStates = Maps.newHashMap();
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
byte[] clusterStateInBytes;
try { try {
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
ClusterState.Builder.writeTo(clusterState, stream);
stream.close();
clusterStateInBytes = cachedEntry.bytes().bytes().copyBytesArray().toBytes();
} catch (Exception e) {
logger.warn("failed to serialize cluster_state before publishing it to nodes", e);
return;
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
for (final DiscoveryNode node : clusterState.nodes()) { for (final DiscoveryNode node : clusterState.nodes()) {
if (node.equals(localNode)) { if (node.equals(localNode)) {
// no need to send to our self // no need to send to our self
continue; continue;
} }
// try and serialize the cluster state once (or per version), so we don't serialize it
// per node when we send it over the wire, compress it while we are at it...
CachedStreamOutput.Entry entry = serializedStates.get(node.version());
if (entry == null) {
try {
entry = CachedStreamOutput.popEntry();
StreamOutput stream = entry.handles(CompressorFactory.defaultCompressor());
stream.setVersion(node.version());
ClusterState.Builder.writeTo(clusterState, stream);
stream.close();
serializedStates.put(node.version(), entry);
} catch (Exception e) {
logger.warn("failed to serialize cluster_state before publishing it to nodes", e);
return;
}
}
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION,
new PublishClusterStateRequest(clusterStateInBytes), new PublishClusterStateRequest(entry.bytes().bytes()),
TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes
new VoidTransportResponseHandler(ThreadPool.Names.SAME) { new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@ -97,6 +102,11 @@ public class PublishClusterStateAction extends AbstractComponent {
} }
}); });
} }
} finally {
for (CachedStreamOutput.Entry entry : serializedStates.values()) {
CachedStreamOutput.pushEntry(entry);
}
}
} }
class PublishClusterStateRequest implements Streamable { class PublishClusterStateRequest implements Streamable {
@ -106,8 +116,8 @@ public class PublishClusterStateAction extends AbstractComponent {
private PublishClusterStateRequest() { private PublishClusterStateRequest() {
} }
private PublishClusterStateRequest(byte[] clusterStateInBytes) { private PublishClusterStateRequest(BytesReference clusterStateInBytes) {
this.clusterStateInBytes = new BytesArray(clusterStateInBytes); this.clusterStateInBytes = clusterStateInBytes;
} }
@Override @Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.transport.netty; package org.elasticsearch.transport.netty;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
@ -86,8 +87,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
long requestId = buffer.readLong(); long requestId = buffer.readLong();
byte status = buffer.readByte(); byte status = buffer.readByte();
boolean isRequest = TransportStatus.isRequest(status); Version version = Version.fromId(buffer.readInt());
StreamInput wrappedStream; StreamInput wrappedStream;
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) { if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
@ -106,9 +106,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
} else { } else {
wrappedStream = CachedStreamInput.cachedHandles(streamIn); wrappedStream = CachedStreamInput.cachedHandles(streamIn);
} }
wrappedStream.setVersion(version);
if (isRequest) { if (TransportStatus.isRequest(status)) {
String action = handleRequest(ctx.getChannel(), wrappedStream, requestId); String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
if (buffer.readerIndex() != expectedIndexReader) { if (buffer.readerIndex() != expectedIndexReader) {
if (buffer.readerIndex() < expectedIndexReader) { if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action); logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
@ -194,10 +195,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
} }
} }
private String handleRequest(Channel channel, StreamInput buffer, long requestId) throws IOException { private String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readUTF(); final String action = buffer.readUTF();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId); final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version);
try { try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action); final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) { if (handler == null) {

View File

@ -19,15 +19,16 @@
package org.elasticsearch.transport.netty; package org.elasticsearch.transport.netty;
import org.elasticsearch.Version;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
/** /**
*/ */
public class NettyHeader { public class NettyHeader {
public static final int HEADER_SIZE = 2 + 4 + 8 + 1; public static final int HEADER_SIZE = 2 + 4 + 8 + 1 + 4;
public static void writeHeader(ChannelBuffer buffer, long requestId, byte status) { public static void writeHeader(ChannelBuffer buffer, long requestId, byte status, Version version) {
int index = buffer.readerIndex(); int index = buffer.readerIndex();
buffer.setByte(index, 'E'); buffer.setByte(index, 'E');
index += 1; index += 1;
@ -39,5 +40,7 @@ public class NettyHeader {
buffer.setLong(index, requestId); buffer.setLong(index, requestId);
index += 8; index += 8;
buffer.setByte(index, status); buffer.setByte(index, status);
index += 1;
buffer.setInt(index, version.id);
} }
} }

View File

@ -520,18 +520,20 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
status = TransportStatus.setCompress(status); status = TransportStatus.setCompress(status);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor()); StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
stream.setVersion(node.version());
stream.writeString(action); stream.writeString(action);
message.writeTo(stream); message.writeTo(stream);
stream.close(); stream.close();
} else { } else {
StreamOutput stream = cachedEntry.handles(); StreamOutput stream = cachedEntry.handles();
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
stream.setVersion(node.version());
stream.writeString(action); stream.writeString(action);
message.writeTo(stream); message.writeTo(stream);
stream.close(); stream.close();
} }
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer(); ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status); NettyHeader.writeHeader(buffer, requestId, status, node.version());
ChannelFuture future = targetChannel.write(buffer); ChannelFuture future = targetChannel.write(buffer);
future.addListener(new CacheFutureListener(cachedEntry)); future.addListener(new CacheFutureListener(cachedEntry));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.netty; package org.elasticsearch.transport.netty;
import org.elasticsearch.Version;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -42,17 +43,18 @@ import java.io.NotSerializableException;
*/ */
public class NettyTransportChannel implements TransportChannel { public class NettyTransportChannel implements TransportChannel {
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
private final NettyTransport transport; private final NettyTransport transport;
private final Version version;
private final String action; private final String action;
private final Channel channel; private final Channel channel;
private final long requestId; private final long requestId;
public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId) { public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version) {
this.version = version;
this.transport = transport; this.transport = transport;
this.action = action; this.action = action;
this.channel = channel; this.channel = channel;
@ -83,16 +85,18 @@ public class NettyTransportChannel implements TransportChannel {
status = TransportStatus.setCompress(status); status = TransportStatus.setCompress(status);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor()); StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
stream.setVersion(version);
message.writeTo(stream); message.writeTo(stream);
stream.close(); stream.close();
} else { } else {
StreamOutput stream = cachedEntry.handles(); StreamOutput stream = cachedEntry.handles();
stream.setVersion(version);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE); cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
message.writeTo(stream); message.writeTo(stream);
stream.close(); stream.close();
} }
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer(); ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status); NettyHeader.writeHeader(buffer, requestId, status, version);
ChannelFuture future = channel.write(buffer); ChannelFuture future = channel.write(buffer);
future.addListener(new NettyTransport.CacheFutureListener(cachedEntry)); future.addListener(new NettyTransport.CacheFutureListener(cachedEntry));
} }
@ -123,7 +127,7 @@ public class NettyTransportChannel implements TransportChannel {
status = TransportStatus.setError(status); status = TransportStatus.setError(status);
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer(); ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status); NettyHeader.writeHeader(buffer, requestId, status, version);
ChannelFuture future = channel.write(buffer); ChannelFuture future = channel.write(buffer);
future.addListener(new NettyTransport.CacheFutureListener(cachedEntry)); future.addListener(new NettyTransport.CacheFutureListener(cachedEntry));
} }