move rest request to provide content as BytesReference

this allows for better buffer usage, specifically when forwarding requests to other nodes
This commit is contained in:
Shay Banon 2012-07-07 22:49:31 +02:00
parent 5f5458fd56
commit f7b538e17f
54 changed files with 149 additions and 170 deletions

View File

@ -96,7 +96,7 @@ class ShardValidateQueryRequest extends BroadcastShardOperationRequest {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(querySource, true);
out.writeBytesReference(querySource);
out.writeVInt(types.length);
for (String type : types) {

View File

@ -235,7 +235,7 @@ public class ValidateQueryRequest extends BroadcastOperationRequest {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(querySource, true);
out.writeBytesReference(querySource);
out.writeVInt(types.length);
for (String type : types) {

View File

@ -301,7 +301,7 @@ public class CountRequest extends BroadcastOperationRequest {
out.writeUTF(routing);
}
out.writeBytesReference(querySource, true);
out.writeBytesReference(querySource);
out.writeVInt(types.length);
for (String type : types) {

View File

@ -100,7 +100,7 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
super.writeTo(out);
out.writeFloat(minScore);
out.writeBytesReference(querySource, true);
out.writeBytesReference(querySource);
out.writeVInt(types.length);
for (String type : types) {

View File

@ -283,7 +283,7 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(querySource, true);
out.writeBytesReference(querySource);
if (routing == null) {
out.writeBoolean(false);

View File

@ -118,7 +118,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(querySource, true);
out.writeBytesReference(querySource);
out.writeVInt(types.length);
for (String type : types) {
out.writeUTF(type);

View File

@ -123,7 +123,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(querySource, true);
out.writeBytesReference(querySource);
out.writeVInt(shardId);
out.writeVInt(types.length);
for (String type : types) {

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -234,7 +236,11 @@ public class MultiGetRequest implements ActionRequest {
}
public void add(@Nullable String defaultIndex, @Nullable String defaultType, @Nullable String[] defaultFields, byte[] data, int from, int length) throws Exception {
XContentParser parser = XContentFactory.xContent(data, from, length).createParser(data, from, length);
add(defaultIndex, defaultType, defaultFields, new BytesArray(data, from, length));
}
public void add(@Nullable String defaultIndex, @Nullable String defaultType, @Nullable String[] defaultFields, BytesReference data) throws Exception {
XContentParser parser = XContentFactory.xContent(data).createParser(data);
try {
XContentParser.Token token;
String currentFieldName = null;

View File

@ -717,7 +717,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
out.writeUTF(timestamp);
}
out.writeLong(ttl);
out.writeBytesReference(source, true);
out.writeBytesReference(source);
out.writeByte(opType.id());
out.writeBoolean(refresh);
out.writeLong(version);

View File

@ -661,7 +661,7 @@ public class MoreLikeThisRequest implements ActionRequest {
out.writeBoolean(true);
searchScroll.writeTo(out);
}
out.writeBytesReference(searchSource, true);
out.writeBytesReference(searchSource);
out.writeVInt(searchSize);
out.writeVInt(searchFrom);

View File

@ -186,6 +186,6 @@ public class PercolateRequest extends SingleCustomOperationRequest {
super.writeTo(out);
out.writeUTF(index);
out.writeUTF(type);
out.writeBytesReference(source, true);
out.writeBytesReference(source);
}
}

View File

@ -533,8 +533,8 @@ public class SearchRequest implements ActionRequest {
out.writeBoolean(true);
scroll.writeTo(out);
}
out.writeBytesReference(source, true);
out.writeBytesReference(extraSource, true);
out.writeBytesReference(source);
out.writeBytesReference(extraSource);
out.writeVInt(types.length);
for (String type : types) {
out.writeUTF(type);

View File

@ -19,14 +19,19 @@
package org.elasticsearch.common.bytes;
import com.google.common.base.Charsets;
import org.elasticsearch.common.Bytes;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.jboss.netty.util.CharsetUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
/**
*/
@ -61,14 +66,6 @@ public class ByteBufferBytesReference implements BytesReference {
return new ByteBufferStreamInput(buffer);
}
@Override
public void writeTo(StreamOutput out, boolean withLength) throws IOException {
if (withLength) {
out.writeVInt(length());
}
writeTo(out);
}
@Override
public void writeTo(OutputStream os) throws IOException {
if (buffer.hasArray()) {
@ -120,4 +117,27 @@ public class ByteBufferBytesReference implements BytesReference {
public int arrayOffset() {
return buffer.arrayOffset() + buffer.position();
}
@Override
public String toUtf8() {
if (!buffer.hasRemaining()) {
return "";
}
final CharsetDecoder decoder = CharsetUtil.getDecoder(Charsets.UTF_8);
final CharBuffer dst = CharBuffer.allocate(
(int) ((double) buffer.remaining() * decoder.maxCharsPerByte()));
try {
CoderResult cr = decoder.decode(buffer, dst, true);
if (!cr.isUnderflow()) {
cr.throwException();
}
cr = decoder.flush(dst);
if (!cr.isUnderflow()) {
cr.throwException();
}
} catch (CharacterCodingException x) {
throw new IllegalStateException(x);
}
return dst.flip().toString();
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Bytes;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.io.OutputStream;
@ -77,14 +76,6 @@ public class BytesArray implements BytesReference {
return new BytesStreamInput(bytes, offset, length, false);
}
@Override
public void writeTo(StreamOutput out, boolean withLength) throws IOException {
if (withLength) {
out.writeVInt(length);
}
out.writeBytes(bytes, offset, length);
}
@Override
public void writeTo(OutputStream os) throws IOException {
os.write(bytes, offset, length);
@ -123,6 +114,14 @@ public class BytesArray implements BytesReference {
return offset;
}
@Override
public String toUtf8() {
if (length == 0) {
return "";
}
return new String(bytes, offset, length, Charsets.UTF_8);
}
@Override
public boolean equals(Object obj) {
return bytesEquals((BytesArray) obj);

View File

@ -20,7 +20,6 @@
package org.elasticsearch.common.bytes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.io.OutputStream;
@ -51,10 +50,8 @@ public interface BytesReference {
StreamInput streamInput();
/**
* Writes the bytes into the output, with an optional length header (variable encoded).
* Writes the bytes directly to the output stream.
*/
void writeTo(StreamOutput out, boolean withLength) throws IOException;
void writeTo(OutputStream os) throws IOException;
/**
@ -86,4 +83,9 @@ public interface BytesReference {
* The offset into the underlying byte array.
*/
int arrayOffset();
/**
* Converts to a string based on utf8.
*/
String toUtf8();
}

View File

@ -19,8 +19,8 @@
package org.elasticsearch.common.bytes;
import com.google.common.base.Charsets;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
import org.jboss.netty.buffer.ChannelBuffer;
@ -57,14 +57,6 @@ public class ChannelBufferBytesReference implements BytesReference {
return ChannelBufferStreamInputFactory.create(buffer.duplicate());
}
@Override
public void writeTo(StreamOutput out, boolean withLength) throws IOException {
if (withLength) {
out.writeVInt(buffer.readableBytes());
}
buffer.getBytes(buffer.readerIndex(), out, length());
}
@Override
public void writeTo(OutputStream os) throws IOException {
buffer.getBytes(buffer.readerIndex(), os, length());
@ -104,4 +96,9 @@ public class ChannelBufferBytesReference implements BytesReference {
public int arrayOffset() {
return buffer.arrayOffset() + buffer.readerIndex();
}
@Override
public String toUtf8() {
return buffer.toString(Charsets.UTF_8);
}
}

View File

@ -19,11 +19,11 @@
package org.elasticsearch.common.bytes;
import com.google.common.base.Charsets;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.io.OutputStream;
@ -71,14 +71,6 @@ public class HashedBytesArray implements BytesReference {
return new BytesStreamInput(bytes, false);
}
@Override
public void writeTo(StreamOutput out, boolean withLength) throws IOException {
if (withLength) {
out.writeVInt(bytes.length);
}
out.writeBytes(bytes);
}
@Override
public void writeTo(OutputStream os) throws IOException {
os.write(bytes);
@ -116,6 +108,14 @@ public class HashedBytesArray implements BytesReference {
return 0;
}
@Override
public String toUtf8() {
if (bytes.length == 0) {
return "";
}
return new String(bytes, Charsets.UTF_8);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -36,11 +36,6 @@ public abstract class AdapterStreamInput extends StreamInput {
return in.readBytesReference();
}
@Override
public BytesReference readBytesReference(int length) throws IOException {
return in.readBytesReference(length);
}
@Override
public void reset() throws IOException {
in.reset();

View File

@ -78,8 +78,8 @@ public class AdapterStreamOutput extends StreamOutput {
}
@Override
public void writeBytesReference(@Nullable BytesReference bytes, boolean withLength) throws IOException {
out.writeBytesReference(bytes, withLength);
public void writeBytesReference(@Nullable BytesReference bytes) throws IOException {
out.writeBytesReference(bytes);
}
@Override

View File

@ -60,9 +60,13 @@ public class BytesStreamInput extends StreamInput {
}
@Override
public BytesReference readBytesReference(int length) throws IOException {
public BytesReference readBytesReference() throws IOException {
if (unsafe) {
return super.readBytesReference(length);
return super.readBytesReference();
}
int length = readVInt();
if (length == 0) {
return BytesArray.EMPTY;
}
BytesArray bytes = new BytesArray(buf, pos, length);
pos += length;

View File

@ -53,14 +53,7 @@ public abstract class StreamInput extends InputStream {
* bytes of the stream.
*/
public BytesReference readBytesReference() throws IOException {
return readBytesReference(readVInt());
}
/**
* Reads a bytes reference from this stream, might hold an actual reference to the underlying
* bytes of the stream.
*/
public BytesReference readBytesReference(int length) throws IOException {
int length = readVInt();
if (length == 0) {
return BytesArray.EMPTY;
}

View File

@ -68,14 +68,16 @@ public abstract class StreamOutput extends OutputStream {
*/
public abstract void writeBytes(byte[] b, int offset, int length) throws IOException;
public void writeBytesReference(@Nullable BytesReference bytes, boolean withLength) throws IOException {
/**
* Writes the bytes reference, including a length header.
*/
public void writeBytesReference(@Nullable BytesReference bytes) throws IOException {
if (bytes == null) {
if (withLength) {
writeVInt(0);
}
writeVInt(0);
return;
}
bytes.writeTo(this, withLength);
writeVInt(bytes.length());
bytes.writeTo(this);
}
public final void writeShort(short v) throws IOException {

View File

@ -117,7 +117,7 @@ public class PublishClusterStateAction extends AbstractComponent {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBytesReference(clusterStateInBytes, true);
out.writeBytesReference(clusterStateInBytes);
}
}

View File

@ -19,7 +19,9 @@
package org.elasticsearch.http.netty;
import com.google.common.base.Charsets;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ChannelBufferBytesReference;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.rest.support.AbstractRestRequest;
import org.elasticsearch.rest.support.RestUtils;
@ -39,14 +41,16 @@ public class NettyHttpRequest extends AbstractRestRequest implements HttpRequest
private final String rawPath;
private final int contentLength;
private byte[] contentAsBytes;
private final BytesReference content;
public NettyHttpRequest(org.jboss.netty.handler.codec.http.HttpRequest request) {
this.request = request;
this.params = new HashMap<String, String>();
this.contentLength = request.getContent().readableBytes();
if (request.getContent().readable()) {
this.content = new ChannelBufferBytesReference(request.getContent());
} else {
this.content = BytesArray.EMPTY;
}
String uri = request.getUri();
int pathEndPos = uri.indexOf('?');
@ -101,58 +105,18 @@ public class NettyHttpRequest extends AbstractRestRequest implements HttpRequest
@Override
public boolean hasContent() {
return contentLength > 0;
}
@Override
public int contentLength() {
return contentLength;
return content.length() > 0;
}
@Override
public boolean contentUnsafe() {
// if its a copy, then its not unsafe...
if (contentAsBytes != null) {
return false;
}
// HttpMessageDecoder#content is sliced but out of freshly created buffers for each read
// Netty http decoder always copies over the http content
return false;
//return request.getContent().hasArray();
}
@Override
public byte[] contentByteArray() {
if (contentAsBytes != null) {
return contentAsBytes;
}
if (request.getContent().hasArray()) {
return request.getContent().array();
}
contentAsBytes = new byte[request.getContent().readableBytes()];
request.getContent().getBytes(request.getContent().readerIndex(), contentAsBytes);
// clear the content, so it can be GC'ed, we make sure to work from contentAsBytes from here on
request.setContent(null);
return contentAsBytes;
}
@Override
public int contentByteArrayOffset() {
if (contentAsBytes != null) {
return 0;
}
if (request.getContent().hasArray()) {
// get the array offset, and the reader index offset within it
return request.getContent().arrayOffset() + request.getContent().readerIndex();
}
return 0;
}
@Override
public String contentAsString() {
if (contentAsBytes != null) {
return new String(contentAsBytes, Charsets.UTF_8);
}
return request.getContent().toString(Charsets.UTF_8);
public BytesReference content() {
return content;
}
@Override

View File

@ -345,7 +345,7 @@ public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
out.writeLong(version);
out.writeBoolean(exists);
if (exists) {
out.writeBytesReference(source, true);
out.writeBytesReference(source);
if (fields == null) {
out.writeVInt(0);
} else {

View File

@ -278,7 +278,7 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
builder.close();
} else {
StreamOutput streamOutput = cachedEntry.bytes(CompressorFactory.defaultCompressor());
streamOutput.writeBytesReference(source, false);
source.writeTo(streamOutput);
streamOutput.close();
}
// we copy over the byte array, since we need to push back the cached entry

View File

@ -350,7 +350,7 @@ public interface Translog extends IndexShardComponent {
out.writeVInt(5); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeBytesReference(source, true);
out.writeBytesReference(source);
if (routing == null) {
out.writeBoolean(false);
} else {
@ -479,7 +479,7 @@ public interface Translog extends IndexShardComponent {
out.writeVInt(5); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeBytesReference(source, true);
out.writeBytesReference(source);
if (routing == null) {
out.writeBoolean(false);
} else {
@ -632,7 +632,7 @@ public interface Translog extends IndexShardComponent {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(2); // version
out.writeBytesReference(source, true);
out.writeBytesReference(source);
out.writeVInt(types.length);
for (String type : types) {
out.writeUTF(type);

View File

@ -108,7 +108,7 @@ class RecoveryFileChunkRequest implements Streamable {
out.writeBoolean(true);
out.writeUTF(checksum);
}
out.writeBytesReference(content, true);
out.writeBytesReference(content);
}
@Override

View File

@ -19,6 +19,7 @@
package org.elasticsearch.rest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
@ -58,13 +59,7 @@ public interface RestRequest extends ToXContent.Params {
*/
boolean contentUnsafe();
byte[] contentByteArray();
int contentByteArrayOffset();
int contentLength();
String contentAsString();
BytesReference content();
String header(String name);

View File

@ -27,7 +27,6 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.*;
import java.io.IOException;
@ -48,10 +47,7 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest();
clusterUpdateSettingsRequest.listenerThreaded(false);
try {
XContentType xContentType = XContentFactory.xContentType(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
Map<String, Object> source = XContentFactory.xContent(xContentType)
.createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapAndClose();
Map<String, Object> source = XContentFactory.xContent(request.content()).createParser(request.content()).mapAndClose();
if (source.containsKey("transient")) {
clusterUpdateSettingsRequest.transientSettings((Map) source.get("transient"));
}

View File

@ -56,6 +56,7 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
indicesAliasesRequest.listenerThreaded(false);
XContentParser parser = null;
try {
// {
// actions : [
@ -64,8 +65,7 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
// ]
// }
indicesAliasesRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
XContentParser parser = XContentFactory.xContent(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength())
.createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
parser = XContentFactory.xContent(request.content()).createParser(request.content());
XContentParser.Token token = parser.nextToken();
if (token == null) {
throw new ElasticSearchIllegalArgumentException("No action is specified");
@ -149,6 +149,8 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
logger.warn("Failed to send response", e1);
}
return;
} finally {
parser.close();
}
client.admin().indices().aliases(indicesAliasesRequest, new ActionListener<IndicesAliasesResponse>() {
@Override

View File

@ -54,7 +54,7 @@ public class RestAnalyzeAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
String text = request.param("text");
if (text == null && request.hasContent()) {
text = request.contentAsString();
text = request.content().toUtf8();
}
if (text == null) {
try {

View File

@ -53,7 +53,7 @@ public class RestCreateIndexAction extends BaseRestHandler {
createIndexRequest.listenerThreaded(false);
if (request.hasContent()) {
try {
createIndexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
createIndexRequest.source(request.content());
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));

View File

@ -58,7 +58,7 @@ public class RestPutMappingAction extends BaseRestHandler {
PutMappingRequest putMappingRequest = putMappingRequest(splitIndices(request.param("index")));
putMappingRequest.listenerThreaded(false);
putMappingRequest.type(request.param("type"));
putMappingRequest.source(request.contentAsString());
putMappingRequest.source(request.content().toUtf8());
putMappingRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
putMappingRequest.ignoreConflicts(request.paramAsBoolean("ignore_conflicts", putMappingRequest.ignoreConflicts()));
client.admin().indices().putMapping(putMappingRequest, new ActionListener<PutMappingResponse>() {

View File

@ -58,7 +58,7 @@ public class RestUpdateSettingsAction extends BaseRestHandler {
updateSettingsRequest.listenerThreaded(false);
ImmutableSettings.Builder updateSettings = ImmutableSettings.settingsBuilder();
String bodySettings = request.contentAsString();
String bodySettings = request.content().toUtf8();
if (Strings.hasText(bodySettings)) {
try {
updateSettings.put(ImmutableSettings.settingsBuilder().loadFromSource(bodySettings).build());

View File

@ -67,7 +67,7 @@ public class RestPutIndexTemplateAction extends BaseRestHandler {
putRequest.create(request.paramAsBoolean("create", false));
putRequest.cause(request.param("cause", ""));
putRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
putRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
putRequest.source(request.content());
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));

View File

@ -70,7 +70,7 @@ public class RestValidateQueryAction extends BaseRestHandler {
}
validateQueryRequest.operationThreading(operationThreading);
if (request.hasContent()) {
validateQueryRequest.query(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), true);
validateQueryRequest.query(request.content(), request.contentUnsafe());
} else {
String source = request.param("source");
if (source != null) {

View File

@ -53,7 +53,7 @@ public class RestPutWarmerAction extends BaseRestHandler {
putWarmerRequest.listenerThreaded(false);
SearchRequest searchRequest = new SearchRequest(RestActions.splitIndices(request.param("index")))
.types(RestActions.splitTypes(request.param("type")))
.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
.source(request.content(), request.contentUnsafe());
putWarmerRequest.searchRequest(searchRequest);
client.admin().indices().putWarmer(putWarmerRequest, new ActionListener<PutWarmerResponse>() {
@Override

View File

@ -82,7 +82,7 @@ public class RestBulkAction extends BaseRestHandler {
}
bulkRequest.refresh(request.paramAsBoolean("refresh", bulkRequest.refresh()));
try {
bulkRequest.add(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe(), defaultIndex, defaultType);
bulkRequest.add(request.content(), request.contentUnsafe(), defaultIndex, defaultType);
} catch (Exception e) {
try {
XContentBuilder builder = restContentBuilder(request);

View File

@ -70,7 +70,7 @@ public class RestCountAction extends BaseRestHandler {
}
countRequest.operationThreading(operationThreading);
if (request.hasContent()) {
countRequest.query(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), true);
countRequest.query(request.content(), request.contentUnsafe());
} else {
String source = request.param("source");
if (source != null) {

View File

@ -61,7 +61,7 @@ public class RestDeleteByQueryAction extends BaseRestHandler {
deleteByQueryRequest.listenerThreaded(false);
try {
if (request.hasContent()) {
deleteByQueryRequest.query(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
deleteByQueryRequest.query(request.content(), request.contentUnsafe());
} else {
String source = request.param("source");
if (source != null) {

View File

@ -65,7 +65,7 @@ public class RestMultiGetAction extends BaseRestHandler {
}
try {
multiGetRequest.add(request.param("index"), request.param("type"), sFields, request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
multiGetRequest.add(request.param("index"), request.param("type"), sFields, request.content());
} catch (Exception e) {
try {
XContentBuilder builder = restContentBuilder(request);

View File

@ -74,7 +74,7 @@ public class RestIndexAction extends BaseRestHandler {
if (request.hasParam("ttl")) {
indexRequest.ttl(request.paramAsTime("ttl", null).millis());
}
indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
indexRequest.source(request.content(), request.contentUnsafe());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));
indexRequest.version(RestActions.parseVersion(request));

View File

@ -79,7 +79,7 @@ public class RestMoreLikeThisAction extends BaseRestHandler {
mltRequest.searchScroll(new Scroll(parseTimeValue(searchScroll, null)));
}
if (request.hasContent()) {
mltRequest.searchSource(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
mltRequest.searchSource(request.content(), request.contentUnsafe());
} else {
String searchSource = request.param("search_source");
if (searchSource != null) {

View File

@ -52,7 +52,7 @@ public class RestPercolateAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
PercolateRequest percolateRequest = new PercolateRequest(request.param("index"), request.param("type"));
percolateRequest.listenerThreaded(false);
percolateRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
percolateRequest.source(request.content(), request.contentUnsafe());
// we just send a response, no need to fork
percolateRequest.listenerThreaded(false);

View File

@ -62,8 +62,7 @@ public class RestMultiSearchAction extends BaseRestHandler {
String[] types = RestActions.splitTypes(request.param("type"));
try {
multiSearchRequest.add(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe(),
indices, types, request.param("search_type"));
multiSearchRequest.add(request.content(), request.contentUnsafe(), indices, types, request.param("search_type"));
} catch (Exception e) {
try {
XContentBuilder builder = restContentBuilder(request);

View File

@ -120,7 +120,7 @@ public class RestSearchAction extends BaseRestHandler {
SearchRequest searchRequest = new SearchRequest(indices);
// get the content, and put it in the body
if (request.hasContent()) {
searchRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
searchRequest.source(request.content(), request.contentUnsafe());
} else {
String source = request.param("source");
if (source != null) {

View File

@ -57,7 +57,7 @@ public class RestSearchScrollAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
String scrollId = request.param("scroll_id");
if (scrollId == null && request.hasContent()) {
scrollId = request.contentAsString();
scrollId = request.content().toUtf8();
}
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
searchScrollRequest.listenerThreaded(false);

View File

@ -39,7 +39,7 @@ public class RestXContentBuilder {
if (contentType == null) {
// try and guess it from the body, if exists
if (request.hasContent()) {
contentType = XContentFactory.xContentType(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
contentType = XContentFactory.xContentType(request.content());
}
}
if (contentType == null) {

View File

@ -89,7 +89,7 @@ public class RestUpdateAction extends BaseRestHandler {
// see if we have it in the body
if (request.hasContent()) {
try {
updateRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
updateRequest.source(request.content());
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.routing(request.param("routing"));

View File

@ -606,7 +606,7 @@ public class InternalSearchHit implements SearchHit {
out.writeUTF(id);
out.writeUTF(type);
out.writeLong(version);
out.writeBytesReference(source, true);
out.writeBytesReference(source);
if (explanation == null) {
out.writeBoolean(false);
} else {

View File

@ -197,8 +197,8 @@ public class InternalSearchRequest implements Streamable {
out.writeBoolean(true);
scroll.writeTo(out);
}
out.writeBytesReference(source, true);
out.writeBytesReference(extraSource, true);
out.writeBytesReference(source);
out.writeBytesReference(extraSource);
out.writeVInt(types.length);
for (String type : types) {
out.writeUTF(type);

View File

@ -114,7 +114,7 @@ public class IndexWarmersMetaData implements IndexMetaData.Custom {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeBytesReference(entry.source(), true);
out.writeBytesReference(entry.source());
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ChannelBufferBytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -51,7 +52,11 @@ public class ChannelBufferStreamInput extends StreamInput {
}
@Override
public BytesReference readBytesReference(int length) throws IOException {
public BytesReference readBytesReference() throws IOException {
int length = readVInt();
if (length == 0) {
return BytesArray.EMPTY;
}
ChannelBufferBytesReference ref = new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex(), length));
buffer.skipBytes(length);
return ref;