From 0941d157beee9b0c9a321867511baa62d123c916 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sun, 8 Jan 2012 13:13:42 +0200 Subject: [PATCH] enhance StreamInput to allow to read a bytes ref, and have it directly read it from netty buffers / bytes, apply it to index/percolate/search/count (for now) --- .../action/count/CountRequest.java | 17 +++---- .../action/count/ShardCountRequest.java | 18 ++++---- .../action/index/IndexRequest.java | 26 ++++++----- .../action/percolate/PercolateRequest.java | 22 ++++++--- .../action/search/SearchRequest.java | 41 +++++------------ .../org/elasticsearch/common/BytesHolder.java | 4 +- .../common/io/stream/AdapterStreamInput.java | 7 +++ .../common/io/stream/AdapterStreamOutput.java | 12 +++++ .../common/io/stream/BytesStreamInput.java | 10 +++++ .../common/io/stream/StreamInput.java | 22 +++++++++ .../common/io/stream/StreamOutput.java | 11 +++++ .../internal/InternalSearchRequest.java | 45 ++++++------------- .../netty/ChannelBufferStreamInput.java | 17 ++++++- .../netty/MessageChannelHandler.java | 3 +- 14 files changed, 153 insertions(+), 102 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/count/CountRequest.java b/src/main/java/org/elasticsearch/action/count/CountRequest.java index 4a352aaef8e..bd604fb5ac3 100644 --- a/src/main/java/org/elasticsearch/action/count/CountRequest.java +++ b/src/main/java/org/elasticsearch/action/count/CountRequest.java @@ -25,10 +25,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import org.elasticsearch.client.Requests; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Required; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.Unicode; +import org.elasticsearch.common.*; import org.elasticsearch.common.io.BytesStream; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -48,7 +45,6 @@ import java.util.Map; *

The request requires the query source to be set either using {@link #query(org.elasticsearch.index.query.QueryBuilder)}, * or {@link #query(byte[])}. * - * * @see CountResponse * @see org.elasticsearch.client.Client#count(CountRequest) * @see org.elasticsearch.client.Requests#countRequest(String...) @@ -292,11 +288,11 @@ public class CountRequest extends BroadcastOperationRequest { routing = in.readUTF(); } + BytesHolder bytes = in.readBytesReference(); querySourceUnsafe = false; - querySourceOffset = 0; - querySourceLength = in.readVInt(); - querySource = new byte[querySourceLength]; - in.readFully(querySource); + querySource = bytes.bytes(); + querySourceOffset = bytes.offset(); + querySourceLength = bytes.length(); int typesSize = in.readVInt(); if (typesSize > 0) { @@ -325,8 +321,7 @@ public class CountRequest extends BroadcastOperationRequest { out.writeUTF(routing); } - out.writeVInt(querySourceLength); - out.writeBytes(querySource, querySourceOffset, querySourceLength); + out.writeBytesHolder(querySource, querySourceOffset, querySourceLength()); out.writeVInt(types.length); for (String type : types) { diff --git a/src/main/java/org/elasticsearch/action/count/ShardCountRequest.java b/src/main/java/org/elasticsearch/action/count/ShardCountRequest.java index 6aa5f7569bf..f0d06d730e7 100644 --- a/src/main/java/org/elasticsearch/action/count/ShardCountRequest.java +++ b/src/main/java/org/elasticsearch/action/count/ShardCountRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.count; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -29,8 +30,6 @@ import java.io.IOException; /** * Internal count request executed directly against a specific index shard. - * - * */ class ShardCountRequest extends BroadcastShardOperationRequest { @@ -87,10 +86,12 @@ class ShardCountRequest extends BroadcastShardOperationRequest { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); minScore = in.readFloat(); - querySourceLength = in.readVInt(); - querySourceOffset = 0; - querySource = new byte[querySourceLength]; - in.readFully(querySource); + + BytesHolder bytes = in.readBytesHolder(); + querySource = bytes.bytes(); + querySourceOffset = bytes.offset(); + querySourceLength = bytes.length(); + int typesSize = in.readVInt(); if (typesSize > 0) { types = new String[typesSize]; @@ -111,8 +112,9 @@ class ShardCountRequest extends BroadcastShardOperationRequest { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeFloat(minScore); - out.writeVInt(querySourceLength); - out.writeBytes(querySource, querySourceOffset, querySourceLength); + + out.writeBytesHolder(querySource, querySourceOffset, querySourceLength); + out.writeVInt(types.length); for (String type : types) { out.writeUTF(type); diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/src/main/java/org/elasticsearch/action/index/IndexRequest.java index a65340fea65..a674217b75c 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -32,10 +32,7 @@ import org.elasticsearch.action.support.replication.ShardReplicationOperationReq import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Required; -import org.elasticsearch.common.UUID; -import org.elasticsearch.common.Unicode; +import org.elasticsearch.common.*; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; @@ -65,7 +62,6 @@ import static org.elasticsearch.action.Actions.addValidationError; *

*

If the {@link #id(String)} is not set, it will be automatically generated. * - * * @see IndexResponse * @see org.elasticsearch.client.Requests#indexRequest(String) * @see org.elasticsearch.client.Client#index(IndexRequest) @@ -334,14 +330,23 @@ public class IndexRequest extends ShardReplicationOperationRequest { } public byte[] underlyingSource() { + if (sourceUnsafe) { + source(); + } return this.source; } public int underlyingSourceOffset() { + if (sourceUnsafe) { + source(); + } return this.sourceOffset; } public int underlyingSourceLength() { + if (sourceUnsafe) { + source(); + } return this.sourceLength; } @@ -692,11 +697,11 @@ public class IndexRequest extends ShardReplicationOperationRequest { timestamp = in.readUTF(); } ttl = in.readLong(); + BytesHolder bytes = in.readBytesReference(); sourceUnsafe = false; - sourceOffset = 0; - sourceLength = in.readVInt(); - source = new byte[sourceLength]; - in.readFully(source); + source = bytes.bytes(); + sourceOffset = bytes.offset(); + sourceLength = bytes.length(); opType = OpType.fromId(in.readByte()); refresh = in.readBoolean(); @@ -736,8 +741,7 @@ public class IndexRequest extends ShardReplicationOperationRequest { out.writeUTF(timestamp); } out.writeLong(ttl); - out.writeVInt(sourceLength); - out.writeBytes(source, sourceOffset, sourceLength); + out.writeBytesHolder(source, sourceOffset, sourceLength); out.writeByte(opType.id()); out.writeBoolean(refresh); out.writeLong(version); diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java b/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java index 55ba4e9e9fa..cb4916670f8 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java @@ -23,6 +23,7 @@ import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.ElasticSearchGenerationException; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Required; import org.elasticsearch.common.Unicode; import org.elasticsearch.common.io.stream.StreamInput; @@ -103,14 +104,23 @@ public class PercolateRequest extends SingleCustomOperationRequest { } public byte[] underlyingSource() { + if (sourceUnsafe) { + source(); + } return this.source; } public int underlyingSourceOffset() { + if (sourceUnsafe) { + source(); + } return this.sourceOffset; } public int underlyingSourceLength() { + if (sourceUnsafe) { + source(); + } return this.sourceLength; } @@ -202,11 +212,11 @@ public class PercolateRequest extends SingleCustomOperationRequest { index = in.readUTF(); type = in.readUTF(); + BytesHolder bytes = in.readBytesReference(); sourceUnsafe = false; - sourceOffset = 0; - sourceLength = in.readVInt(); - source = new byte[sourceLength]; - in.readFully(source); + source = bytes.bytes(); + sourceOffset = bytes.offset(); + sourceLength = bytes.length(); } @Override @@ -214,8 +224,6 @@ public class PercolateRequest extends SingleCustomOperationRequest { super.writeTo(out); out.writeUTF(index); out.writeUTF(type); - - out.writeVInt(sourceLength); - out.writeBytes(source, sourceOffset, sourceLength); + out.writeBytesHolder(source, sourceOffset, sourceLength); } } diff --git a/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/src/main/java/org/elasticsearch/action/search/SearchRequest.java index d621850864e..5dc611a78e6 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -25,7 +25,7 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.client.Requests; -import org.elasticsearch.common.Bytes; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.Unicode; @@ -56,7 +56,6 @@ import static org.elasticsearch.search.Scroll.readScroll; *

*

There is an option to specify an addition search source using the {@link #extraSource(org.elasticsearch.search.builder.SearchSourceBuilder)}. * - * * @see org.elasticsearch.client.Requests#searchRequest(String...) * @see org.elasticsearch.client.Client#search(SearchRequest) * @see SearchResponse @@ -551,25 +550,17 @@ public class SearchRequest implements ActionRequest { timeout = readTimeValue(in); } + BytesHolder bytes = in.readBytesReference(); sourceUnsafe = false; - sourceOffset = 0; - sourceLength = in.readVInt(); - if (sourceLength == 0) { - source = Bytes.EMPTY_ARRAY; - } else { - source = new byte[sourceLength]; - in.readFully(source); - } + source = bytes.bytes(); + sourceOffset = bytes.offset(); + sourceLength = bytes.length(); + bytes = in.readBytesReference(); extraSourceUnsafe = false; - extraSourceOffset = 0; - extraSourceLength = in.readVInt(); - if (extraSourceLength == 0) { - extraSource = Bytes.EMPTY_ARRAY; - } else { - extraSource = new byte[extraSourceLength]; - in.readFully(extraSource); - } + extraSource = bytes.bytes(); + extraSourceOffset = bytes.offset(); + extraSourceLength = bytes.length(); int typesSize = in.readVInt(); if (typesSize > 0) { @@ -621,18 +612,8 @@ public class SearchRequest implements ActionRequest { out.writeBoolean(true); timeout.writeTo(out); } - if (source == null) { - out.writeVInt(0); - } else { - out.writeVInt(sourceLength); - out.writeBytes(source, sourceOffset, sourceLength); - } - if (extraSource == null) { - out.writeVInt(0); - } else { - out.writeVInt(extraSourceLength); - out.writeBytes(extraSource, extraSourceOffset, extraSourceLength); - } + out.writeBytesHolder(source, sourceOffset, sourceLength); + out.writeBytesHolder(extraSource, extraSourceOffset, extraSourceLength); out.writeVInt(types.length); for (String type : types) { out.writeUTF(type); diff --git a/src/main/java/org/elasticsearch/common/BytesHolder.java b/src/main/java/org/elasticsearch/common/BytesHolder.java index ef646392bb6..c95960c7739 100644 --- a/src/main/java/org/elasticsearch/common/BytesHolder.java +++ b/src/main/java/org/elasticsearch/common/BytesHolder.java @@ -28,6 +28,8 @@ import java.util.Arrays; public class BytesHolder implements Streamable { + public static final BytesHolder EMPTY = new BytesHolder(Bytes.EMPTY_ARRAY, 0, 0); + private byte[] bytes; private int offset; private int length; @@ -75,7 +77,7 @@ public class BytesHolder implements Streamable { offset = 0; length = in.readVInt(); bytes = new byte[length]; - in.readFully(bytes); + in.readBytes(bytes, 0, length); } @Override diff --git a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java index c5b61f33b6d..bb1ee572803 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamInput.java @@ -1,5 +1,7 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.common.BytesHolder; + import java.io.IOException; /** @@ -29,6 +31,11 @@ public abstract class AdapterStreamInput extends StreamInput { in.readBytes(b, offset, len); } + @Override + public BytesHolder readBytesReference() throws IOException { + return in.readBytesReference(); + } + @Override public void reset() throws IOException { in.reset(); diff --git a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java index 2010160407a..0f9a2ca31b7 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/AdapterStreamOutput.java @@ -19,6 +19,8 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.common.BytesHolder; + import java.io.IOException; /** @@ -74,6 +76,16 @@ public class AdapterStreamOutput extends StreamOutput { out.writeBytes(b, length); } + @Override + public void writeBytesHolder(byte[] bytes, int offset, int length) throws IOException { + out.writeBytesHolder(bytes, offset, length); + } + + @Override + public void writeBytesHolder(BytesHolder bytes) throws IOException { + out.writeBytesHolder(bytes); + } + @Override public void writeInt(int i) throws IOException { out.writeInt(i); diff --git a/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java index 9ecc3a21343..8a4c0d7e75b 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/BytesStreamInput.java @@ -19,6 +19,8 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.common.BytesHolder; + import java.io.EOFException; import java.io.IOException; import java.io.UTFDataFormatException; @@ -44,6 +46,14 @@ public class BytesStreamInput extends StreamInput { this.count = Math.min(offset + length, buf.length); } + @Override + public BytesHolder readBytesReference() throws IOException { + int size = readVInt(); + BytesHolder bytes = new BytesHolder(buf, pos, size); + pos += size; + return bytes; + } + @Override public long skip(long n) throws IOException { if (pos + n > count) { diff --git a/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 66f2a114b9d..3a3d67af34b 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Nullable; import java.io.IOException; @@ -51,6 +52,27 @@ public abstract class StreamInput extends InputStream { */ public abstract void readBytes(byte[] b, int offset, int len) throws IOException; + /** + * Reads a fresh copy of the bytes. + */ + public BytesHolder readBytesHolder() throws IOException { + int size = readVInt(); + if (size == 0) { + return BytesHolder.EMPTY; + } + byte[] bytes = new byte[size]; + readBytes(bytes, 0, size); + return new BytesHolder(bytes, 0, size); + } + + /** + * Reads a bytes reference from this stream, might hold an actual reference to the underlying + * bytes of the stream. + */ + public BytesHolder readBytesReference() throws IOException { + return readBytesHolder(); + } + public void readFully(byte[] b) throws IOException { readBytes(b, 0, b.length); } diff --git a/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 704ab40f469..488ca5727ce 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Nullable; import java.io.IOException; @@ -70,6 +71,16 @@ public abstract class StreamOutput extends OutputStream { */ public abstract void writeBytes(byte[] b, int offset, int length) throws IOException; + public void writeBytesHolder(byte[] bytes, int offset, int length) throws IOException { + writeVInt(length); + writeBytes(bytes, offset, length); + } + + public void writeBytesHolder(BytesHolder bytes) throws IOException { + writeVInt(bytes.length()); + writeBytes(bytes.bytes(), bytes.offset(), bytes.length()); + } + public final void writeShort(short v) throws IOException { writeByte((byte) (v >> 8)); writeByte((byte) v); diff --git a/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java b/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java index 20f6013d930..66aef768097 100644 --- a/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java +++ b/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java @@ -21,7 +21,7 @@ package org.elasticsearch.search.internal; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.Bytes; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -50,8 +50,6 @@ import static org.elasticsearch.search.Scroll.readScroll; * } * } * - * - * */ public class InternalSearchRequest implements Streamable { @@ -208,22 +206,17 @@ public class InternalSearchRequest implements Streamable { if (in.readBoolean()) { timeout = readTimeValue(in); } - sourceOffset = 0; - sourceLength = in.readVInt(); - if (sourceLength == 0) { - source = Bytes.EMPTY_ARRAY; - } else { - source = new byte[sourceLength]; - in.readFully(source); - } - extraSourceOffset = 0; - extraSourceLength = in.readVInt(); - if (extraSourceLength == 0) { - extraSource = Bytes.EMPTY_ARRAY; - } else { - extraSource = new byte[extraSourceLength]; - in.readFully(extraSource); - } + + BytesHolder bytes = in.readBytesReference(); + source = bytes.bytes(); + sourceOffset = bytes.offset(); + sourceLength = bytes.length(); + + bytes = in.readBytesReference(); + extraSource = bytes.bytes(); + extraSourceOffset = bytes.offset(); + extraSourceLength = bytes.length(); + int typesSize = in.readVInt(); if (typesSize > 0) { types = new String[typesSize]; @@ -261,18 +254,8 @@ public class InternalSearchRequest implements Streamable { out.writeBoolean(true); timeout.writeTo(out); } - if (source == null) { - out.writeVInt(0); - } else { - out.writeVInt(sourceLength); - out.writeBytes(source, sourceOffset, sourceLength); - } - if (extraSource == null) { - out.writeVInt(0); - } else { - out.writeVInt(extraSourceLength); - out.writeBytes(extraSource, extraSourceOffset, extraSourceLength); - } + out.writeBytesHolder(source, sourceOffset, sourceLength); + out.writeBytesHolder(extraSource, extraSourceOffset, extraSourceLength); out.writeVInt(types.length); for (String type : types) { out.writeUTF(type); diff --git a/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java b/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java index f32d4a70183..8b15a64f019 100644 --- a/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java +++ b/src/main/java/org/elasticsearch/transport/netty/ChannelBufferStreamInput.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.netty; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.io.stream.StreamInput; import org.jboss.netty.buffer.ChannelBuffer; @@ -27,8 +28,6 @@ import java.io.IOException; /** * A Netty {@link org.jboss.netty.buffer.ChannelBuffer} based {@link org.elasticsearch.common.io.stream.StreamInput}. - * - * */ public class ChannelBufferStreamInput extends StreamInput { @@ -46,6 +45,20 @@ public class ChannelBufferStreamInput extends StreamInput { buffer.markReaderIndex(); } + @Override + public BytesHolder readBytesReference() throws IOException { + // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh + // buffer, or in the cumlation buffer, which is cleaned each time + // so: we can actually return a reference if this is an array backed buffer + if (!buffer.hasArray()) { + return super.readBytesReference(); + } + int size = readVInt(); + BytesHolder bytes = new BytesHolder(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), size); + buffer.skipBytes(size); + return bytes; + } + /** * Returns the number of read bytes by this stream so far. */ diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 7b98998e8da..bd2f1afb8e6 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -137,7 +137,6 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { process(context, channel, cumulation, dataLen); } - // TODO: we can potentially create a cumulation buffer cache, pop/push style if (!cumulation.readable()) { this.cumulation = null; } @@ -180,6 +179,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { int markedReaderIndex = buffer.readerIndex(); int expectedIndexReader = markedReaderIndex + size; + // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh + // buffer, or in the cumlation buffer, which is cleaned each time StreamInput streamIn = new ChannelBufferStreamInput(buffer, size); long requestId = buffer.readLong();