enhance StreamInput to allow to read a bytes ref, and have it directly read it from netty buffers / bytes, apply it to index/percolate/search/count (for now)

This commit is contained in:
Shay Banon 2012-01-08 13:13:42 +02:00
parent 45b5594e9b
commit 0941d157be
14 changed files with 153 additions and 102 deletions

View File

@ -25,10 +25,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Required;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.*;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -48,7 +45,6 @@ import java.util.Map;
* <p>The request requires the query source to be set either using {@link #query(org.elasticsearch.index.query.QueryBuilder)},
* or {@link #query(byte[])}.
*
*
* @see CountResponse
* @see org.elasticsearch.client.Client#count(CountRequest)
* @see org.elasticsearch.client.Requests#countRequest(String...)
@ -292,11 +288,11 @@ public class CountRequest extends BroadcastOperationRequest {
routing = in.readUTF();
}
BytesHolder bytes = in.readBytesReference();
querySourceUnsafe = false;
querySourceOffset = 0;
querySourceLength = in.readVInt();
querySource = new byte[querySourceLength];
in.readFully(querySource);
querySource = bytes.bytes();
querySourceOffset = bytes.offset();
querySourceLength = bytes.length();
int typesSize = in.readVInt();
if (typesSize > 0) {
@ -325,8 +321,7 @@ public class CountRequest extends BroadcastOperationRequest {
out.writeUTF(routing);
}
out.writeVInt(querySourceLength);
out.writeBytes(querySource, querySourceOffset, querySourceLength);
out.writeBytesHolder(querySource, querySourceOffset, querySourceLength());
out.writeVInt(types.length);
for (String type : types) {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.count;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -29,8 +30,6 @@ import java.io.IOException;
/**
* Internal count request executed directly against a specific index shard.
*
*
*/
class ShardCountRequest extends BroadcastShardOperationRequest {
@ -87,10 +86,12 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
minScore = in.readFloat();
querySourceLength = in.readVInt();
querySourceOffset = 0;
querySource = new byte[querySourceLength];
in.readFully(querySource);
BytesHolder bytes = in.readBytesHolder();
querySource = bytes.bytes();
querySourceOffset = bytes.offset();
querySourceLength = bytes.length();
int typesSize = in.readVInt();
if (typesSize > 0) {
types = new String[typesSize];
@ -111,8 +112,9 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeFloat(minScore);
out.writeVInt(querySourceLength);
out.writeBytes(querySource, querySourceOffset, querySourceLength);
out.writeBytesHolder(querySource, querySourceOffset, querySourceLength);
out.writeVInt(types.length);
for (String type : types) {
out.writeUTF(type);

View File

@ -32,10 +32,7 @@ import org.elasticsearch.action.support.replication.ShardReplicationOperationReq
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Required;
import org.elasticsearch.common.UUID;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.*;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
@ -65,7 +62,6 @@ import static org.elasticsearch.action.Actions.addValidationError;
* <p/>
* <p>If the {@link #id(String)} is not set, it will be automatically generated.
*
*
* @see IndexResponse
* @see org.elasticsearch.client.Requests#indexRequest(String)
* @see org.elasticsearch.client.Client#index(IndexRequest)
@ -334,14 +330,23 @@ public class IndexRequest extends ShardReplicationOperationRequest {
}
public byte[] underlyingSource() {
if (sourceUnsafe) {
source();
}
return this.source;
}
public int underlyingSourceOffset() {
if (sourceUnsafe) {
source();
}
return this.sourceOffset;
}
public int underlyingSourceLength() {
if (sourceUnsafe) {
source();
}
return this.sourceLength;
}
@ -692,11 +697,11 @@ public class IndexRequest extends ShardReplicationOperationRequest {
timestamp = in.readUTF();
}
ttl = in.readLong();
BytesHolder bytes = in.readBytesReference();
sourceUnsafe = false;
sourceOffset = 0;
sourceLength = in.readVInt();
source = new byte[sourceLength];
in.readFully(source);
source = bytes.bytes();
sourceOffset = bytes.offset();
sourceLength = bytes.length();
opType = OpType.fromId(in.readByte());
refresh = in.readBoolean();
@ -736,8 +741,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
out.writeUTF(timestamp);
}
out.writeLong(ttl);
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
out.writeBytesHolder(source, sourceOffset, sourceLength);
out.writeByte(opType.id());
out.writeBoolean(refresh);
out.writeLong(version);

View File

@ -23,6 +23,7 @@ import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Required;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.stream.StreamInput;
@ -103,14 +104,23 @@ public class PercolateRequest extends SingleCustomOperationRequest {
}
public byte[] underlyingSource() {
if (sourceUnsafe) {
source();
}
return this.source;
}
public int underlyingSourceOffset() {
if (sourceUnsafe) {
source();
}
return this.sourceOffset;
}
public int underlyingSourceLength() {
if (sourceUnsafe) {
source();
}
return this.sourceLength;
}
@ -202,11 +212,11 @@ public class PercolateRequest extends SingleCustomOperationRequest {
index = in.readUTF();
type = in.readUTF();
BytesHolder bytes = in.readBytesReference();
sourceUnsafe = false;
sourceOffset = 0;
sourceLength = in.readVInt();
source = new byte[sourceLength];
in.readFully(source);
source = bytes.bytes();
sourceOffset = bytes.offset();
sourceLength = bytes.length();
}
@Override
@ -214,8 +224,6 @@ public class PercolateRequest extends SingleCustomOperationRequest {
super.writeTo(out);
out.writeUTF(index);
out.writeUTF(type);
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
out.writeBytesHolder(source, sourceOffset, sourceLength);
}
}

View File

@ -25,7 +25,7 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Bytes;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
@ -56,7 +56,6 @@ import static org.elasticsearch.search.Scroll.readScroll;
* <p/>
* <p>There is an option to specify an addition search source using the {@link #extraSource(org.elasticsearch.search.builder.SearchSourceBuilder)}.
*
*
* @see org.elasticsearch.client.Requests#searchRequest(String...)
* @see org.elasticsearch.client.Client#search(SearchRequest)
* @see SearchResponse
@ -551,25 +550,17 @@ public class SearchRequest implements ActionRequest {
timeout = readTimeValue(in);
}
BytesHolder bytes = in.readBytesReference();
sourceUnsafe = false;
sourceOffset = 0;
sourceLength = in.readVInt();
if (sourceLength == 0) {
source = Bytes.EMPTY_ARRAY;
} else {
source = new byte[sourceLength];
in.readFully(source);
}
source = bytes.bytes();
sourceOffset = bytes.offset();
sourceLength = bytes.length();
bytes = in.readBytesReference();
extraSourceUnsafe = false;
extraSourceOffset = 0;
extraSourceLength = in.readVInt();
if (extraSourceLength == 0) {
extraSource = Bytes.EMPTY_ARRAY;
} else {
extraSource = new byte[extraSourceLength];
in.readFully(extraSource);
}
extraSource = bytes.bytes();
extraSourceOffset = bytes.offset();
extraSourceLength = bytes.length();
int typesSize = in.readVInt();
if (typesSize > 0) {
@ -621,18 +612,8 @@ public class SearchRequest implements ActionRequest {
out.writeBoolean(true);
timeout.writeTo(out);
}
if (source == null) {
out.writeVInt(0);
} else {
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
}
if (extraSource == null) {
out.writeVInt(0);
} else {
out.writeVInt(extraSourceLength);
out.writeBytes(extraSource, extraSourceOffset, extraSourceLength);
}
out.writeBytesHolder(source, sourceOffset, sourceLength);
out.writeBytesHolder(extraSource, extraSourceOffset, extraSourceLength);
out.writeVInt(types.length);
for (String type : types) {
out.writeUTF(type);

View File

@ -28,6 +28,8 @@ import java.util.Arrays;
public class BytesHolder implements Streamable {
public static final BytesHolder EMPTY = new BytesHolder(Bytes.EMPTY_ARRAY, 0, 0);
private byte[] bytes;
private int offset;
private int length;
@ -75,7 +77,7 @@ public class BytesHolder implements Streamable {
offset = 0;
length = in.readVInt();
bytes = new byte[length];
in.readFully(bytes);
in.readBytes(bytes, 0, length);
}
@Override

View File

@ -1,5 +1,7 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.BytesHolder;
import java.io.IOException;
/**
@ -29,6 +31,11 @@ public abstract class AdapterStreamInput extends StreamInput {
in.readBytes(b, offset, len);
}
@Override
public BytesHolder readBytesReference() throws IOException {
return in.readBytesReference();
}
@Override
public void reset() throws IOException {
in.reset();

View File

@ -19,6 +19,8 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.BytesHolder;
import java.io.IOException;
/**
@ -74,6 +76,16 @@ public class AdapterStreamOutput extends StreamOutput {
out.writeBytes(b, length);
}
@Override
public void writeBytesHolder(byte[] bytes, int offset, int length) throws IOException {
out.writeBytesHolder(bytes, offset, length);
}
@Override
public void writeBytesHolder(BytesHolder bytes) throws IOException {
out.writeBytesHolder(bytes);
}
@Override
public void writeInt(int i) throws IOException {
out.writeInt(i);

View File

@ -19,6 +19,8 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.BytesHolder;
import java.io.EOFException;
import java.io.IOException;
import java.io.UTFDataFormatException;
@ -44,6 +46,14 @@ public class BytesStreamInput extends StreamInput {
this.count = Math.min(offset + length, buf.length);
}
@Override
public BytesHolder readBytesReference() throws IOException {
int size = readVInt();
BytesHolder bytes = new BytesHolder(buf, pos, size);
pos += size;
return bytes;
}
@Override
public long skip(long n) throws IOException {
if (pos + n > count) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import java.io.IOException;
@ -51,6 +52,27 @@ public abstract class StreamInput extends InputStream {
*/
public abstract void readBytes(byte[] b, int offset, int len) throws IOException;
/**
* Reads a fresh copy of the bytes.
*/
public BytesHolder readBytesHolder() throws IOException {
int size = readVInt();
if (size == 0) {
return BytesHolder.EMPTY;
}
byte[] bytes = new byte[size];
readBytes(bytes, 0, size);
return new BytesHolder(bytes, 0, size);
}
/**
* Reads a bytes reference from this stream, might hold an actual reference to the underlying
* bytes of the stream.
*/
public BytesHolder readBytesReference() throws IOException {
return readBytesHolder();
}
public void readFully(byte[] b) throws IOException {
readBytes(b, 0, b.length);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import java.io.IOException;
@ -70,6 +71,16 @@ public abstract class StreamOutput extends OutputStream {
*/
public abstract void writeBytes(byte[] b, int offset, int length) throws IOException;
public void writeBytesHolder(byte[] bytes, int offset, int length) throws IOException {
writeVInt(length);
writeBytes(bytes, offset, length);
}
public void writeBytesHolder(BytesHolder bytes) throws IOException {
writeVInt(bytes.length());
writeBytes(bytes.bytes(), bytes.offset(), bytes.length());
}
public final void writeShort(short v) throws IOException {
writeByte((byte) (v >> 8));
writeByte((byte) v);

View File

@ -21,7 +21,7 @@ package org.elasticsearch.search.internal;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Bytes;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -50,8 +50,6 @@ import static org.elasticsearch.search.Scroll.readScroll;
* }
* }
* </pre>
*
*
*/
public class InternalSearchRequest implements Streamable {
@ -208,22 +206,17 @@ public class InternalSearchRequest implements Streamable {
if (in.readBoolean()) {
timeout = readTimeValue(in);
}
sourceOffset = 0;
sourceLength = in.readVInt();
if (sourceLength == 0) {
source = Bytes.EMPTY_ARRAY;
} else {
source = new byte[sourceLength];
in.readFully(source);
}
extraSourceOffset = 0;
extraSourceLength = in.readVInt();
if (extraSourceLength == 0) {
extraSource = Bytes.EMPTY_ARRAY;
} else {
extraSource = new byte[extraSourceLength];
in.readFully(extraSource);
}
BytesHolder bytes = in.readBytesReference();
source = bytes.bytes();
sourceOffset = bytes.offset();
sourceLength = bytes.length();
bytes = in.readBytesReference();
extraSource = bytes.bytes();
extraSourceOffset = bytes.offset();
extraSourceLength = bytes.length();
int typesSize = in.readVInt();
if (typesSize > 0) {
types = new String[typesSize];
@ -261,18 +254,8 @@ public class InternalSearchRequest implements Streamable {
out.writeBoolean(true);
timeout.writeTo(out);
}
if (source == null) {
out.writeVInt(0);
} else {
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
}
if (extraSource == null) {
out.writeVInt(0);
} else {
out.writeVInt(extraSourceLength);
out.writeBytes(extraSource, extraSourceOffset, extraSourceLength);
}
out.writeBytesHolder(source, sourceOffset, sourceLength);
out.writeBytesHolder(extraSource, extraSourceOffset, extraSourceLength);
out.writeVInt(types.length);
for (String type : types) {
out.writeUTF(type);

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.jboss.netty.buffer.ChannelBuffer;
@ -27,8 +28,6 @@ import java.io.IOException;
/**
* A Netty {@link org.jboss.netty.buffer.ChannelBuffer} based {@link org.elasticsearch.common.io.stream.StreamInput}.
*
*
*/
public class ChannelBufferStreamInput extends StreamInput {
@ -46,6 +45,20 @@ public class ChannelBufferStreamInput extends StreamInput {
buffer.markReaderIndex();
}
@Override
public BytesHolder readBytesReference() throws IOException {
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumlation buffer, which is cleaned each time
// so: we can actually return a reference if this is an array backed buffer
if (!buffer.hasArray()) {
return super.readBytesReference();
}
int size = readVInt();
BytesHolder bytes = new BytesHolder(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), size);
buffer.skipBytes(size);
return bytes;
}
/**
* Returns the number of read bytes by this stream so far.
*/

View File

@ -137,7 +137,6 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
process(context, channel, cumulation, dataLen);
}
// TODO: we can potentially create a cumulation buffer cache, pop/push style
if (!cumulation.readable()) {
this.cumulation = null;
}
@ -180,6 +179,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
int markedReaderIndex = buffer.readerIndex();
int expectedIndexReader = markedReaderIndex + size;
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumlation buffer, which is cleaned each time
StreamInput streamIn = new ChannelBufferStreamInput(buffer, size);
long requestId = buffer.readLong();