bytes reference to know how to convert to channel buffer, so it can be used directly when sending it over with netty

This commit is contained in:
Shay Banon 2012-07-29 16:50:43 +02:00
parent 7edafcf9a0
commit 408a74206f
8 changed files with 36 additions and 13 deletions

View File

@ -23,6 +23,8 @@ import com.google.common.base.Charsets;
import org.elasticsearch.common.Bytes;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;
import java.io.IOException;
@ -103,6 +105,11 @@ public class ByteBufferBytesReference implements BytesReference {
return new BytesArray(toBytes());
}
@Override
public ChannelBuffer toChannelBuffer() {
return ChannelBuffers.wrappedBuffer(buffer);
}
@Override
public boolean hasArray() {
return buffer.hasArray();

View File

@ -24,6 +24,8 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Bytes;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import java.io.IOException;
import java.io.OutputStream;
@ -99,6 +101,11 @@ public class BytesArray implements BytesReference {
return new BytesArray(Arrays.copyOfRange(bytes, offset, offset + length));
}
@Override
public ChannelBuffer toChannelBuffer() {
return ChannelBuffers.wrappedBuffer(bytes, offset, length);
}
@Override
public boolean hasArray() {
return true;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.bytes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
import java.io.OutputStream;
@ -69,6 +70,11 @@ public interface BytesReference {
*/
BytesArray copyBytesArray();
/**
* Returns the bytes as a channel buffer.
*/
ChannelBuffer toChannelBuffer();
/**
* Is there an underlying byte array for this bytes reference.
*/

View File

@ -82,6 +82,11 @@ public class ChannelBufferBytesReference implements BytesReference {
return new BytesArray(copy);
}
@Override
public ChannelBuffer toChannelBuffer() {
return buffer.duplicate();
}
@Override
public boolean hasArray() {
return buffer.hasArray();

View File

@ -24,6 +24,8 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import java.io.IOException;
import java.io.OutputStream;
@ -93,6 +95,11 @@ public class HashedBytesArray implements BytesReference {
return new BytesArray(copy);
}
@Override
public ChannelBuffer toChannelBuffer() {
return ChannelBuffers.wrappedBuffer(bytes, 0, bytes.length);
}
@Override
public boolean hasArray() {
return true;

View File

@ -19,7 +19,6 @@
package org.elasticsearch.http.netty;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.http.HttpChannel;
@ -99,8 +98,7 @@ public class NettyHttpChannel implements HttpChannel {
XContentBuilder builder = ((XContentRestResponse) response).builder();
if (builder.payload() instanceof CachedStreamOutput.Entry) {
releaseContentListener = new NettyTransport.CacheFutureListener((CachedStreamOutput.Entry) builder.payload());
BytesReference bytes = builder.bytes();
buf = ChannelBuffers.wrappedBuffer(bytes.array(), bytes.arrayOffset(), bytes.length());
buf = builder.bytes().toChannelBuffer();
} else if (response.contentThreadSafe()) {
buf = ChannelBuffers.wrappedBuffer(response.content(), 0, response.contentLength());
} else {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
@ -47,7 +46,6 @@ import org.elasticsearch.transport.support.TransportStreams;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
@ -481,8 +479,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
TransportStreams.buildRequest(cachedEntry, requestId, action, message, options);
BytesReference bytes = cachedEntry.bytes().bytes();
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(bytes.array(), bytes.arrayOffset(), bytes.length());
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer();
ChannelFuture future = targetChannel.write(buffer);
future.addListener(new CacheFutureListener(cachedEntry));
// We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future

View File

@ -19,7 +19,6 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
@ -30,7 +29,6 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.support.TransportStreams;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
@ -76,8 +74,7 @@ public class NettyTransportChannel implements TransportChannel {
}
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
TransportStreams.buildResponse(cachedEntry, requestId, message, options);
BytesReference bytes = cachedEntry.bytes().bytes();
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(bytes.array(), bytes.arrayOffset(), bytes.length());
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer();
ChannelFuture future = channel.write(buffer);
future.addListener(new NettyTransport.CacheFutureListener(cachedEntry));
}
@ -102,8 +99,7 @@ public class NettyTransportChannel implements TransportChannel {
too.writeObject(tx);
too.close();
}
BytesReference bytes = stream.bytes();
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(bytes.array(), bytes.arrayOffset(), bytes.length());
ChannelBuffer buffer = stream.bytes().toChannelBuffer();
buffer.setInt(0, buffer.writerIndex() - 4); // update real size.
ChannelFuture future = channel.write(buffer);
future.addListener(new NettyTransport.CacheFutureListener(cachedEntry));