read reference binary data in publish cluster state

This commit is contained in:
Shay Banon 2012-01-08 20:07:13 +02:00
parent 858195351b
commit 0cc906aa21

View File

@ -21,6 +21,7 @@ package org.elasticsearch.discovery.zen.publish;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.settings.Settings;
@ -97,25 +98,23 @@ public class PublishClusterStateAction extends AbstractComponent {
class PublishClusterStateRequest implements Streamable {
private byte[] clusterStateInBytes;
BytesHolder clusterStateInBytes;
private PublishClusterStateRequest() {
}
private PublishClusterStateRequest(byte[] clusterStateInBytes) {
this.clusterStateInBytes = clusterStateInBytes;
this.clusterStateInBytes = new BytesHolder(clusterStateInBytes);
}
@Override
public void readFrom(StreamInput in) throws IOException {
clusterStateInBytes = new byte[in.readVInt()];
in.readFully(clusterStateInBytes);
clusterStateInBytes = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(clusterStateInBytes.length);
out.writeBytes(clusterStateInBytes);
out.writeBytesHolder(clusterStateInBytes);
}
}
@ -130,7 +129,7 @@ public class PublishClusterStateAction extends AbstractComponent {
@Override
public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception {
StreamInput in = CachedStreamInput.cachedHandlesLzf(new BytesStreamInput(request.clusterStateInBytes, false));
StreamInput in = CachedStreamInput.cachedHandlesLzf(new BytesStreamInput(request.clusterStateInBytes.bytes(), request.clusterStateInBytes.offset(), request.clusterStateInBytes.length(), false));
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
listener.onNewClusterState(clusterState);
channel.sendResponse(VoidStreamable.INSTANCE);