From 836461e6de36c3218d9a7d90712370f1523057bb Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 13 May 2010 18:39:24 +0300 Subject: [PATCH] improve search serialization and deserialization --- .../mlt/TransportMoreLikeThisAction.java | 4 +- .../action/search/SearchRequest.java | 170 +++++++++++++----- .../search/type/TransportSearchHelper.java | 5 +- .../type/TransportSearchTypeAction.java | 4 + .../TransportBroadcastOperationAction.java | 5 +- .../elasticsearch/search/SearchService.java | 12 +- .../internal/InternalSearchRequest.java | 64 +++++-- .../SingleInstanceEmbeddedSearchTests.java | 2 +- .../TwoInstanceEmbeddedSearchTests.java | 2 +- ...ceUnbalancedShardsEmbeddedSearchTests.java | 2 +- 10 files changed, 200 insertions(+), 70 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java index 84161ace755..3c736dbfbe2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java @@ -166,12 +166,14 @@ public class TransportMoreLikeThisAction extends BaseAction() { @Override public void onResponse(SearchResponse response) { listener.onResponse(response); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 15aed2010b9..845c64aad56 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.ElasticSearchGenerationException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.action.ActionRequest; @@ -36,8 +37,10 @@ import org.elasticsearch.util.io.stream.StreamOutput; import org.elasticsearch.util.xcontent.XContentFactory; import org.elasticsearch.util.xcontent.XContentType; import org.elasticsearch.util.xcontent.builder.BinaryXContentBuilder; +import org.elasticsearch.util.xcontent.builder.XContentBuilder; import java.io.IOException; +import java.util.Arrays; import java.util.Map; import static org.elasticsearch.action.Actions.*; @@ -69,8 +72,14 @@ public class SearchRequest implements ActionRequest { private String queryHint; private byte[] source; + private int sourceOffset; + private int sourceLength; + private boolean sourceUnsafe; private byte[] extraSource; + private int extraSourceOffset; + private int extraSourceLength; + private boolean extraSourceUnsafe; private Scroll scroll; @@ -81,9 +90,6 @@ public class SearchRequest implements ActionRequest { private boolean listenerThreaded = false; private SearchOperationThreading operationThreading = SearchOperationThreading.SINGLE_THREAD; - private transient SearchSourceBuilder sourceBuilder; - private transient SearchSourceBuilder extraSourceBuilder; - SearchRequest() { } @@ -105,12 +111,28 @@ public class SearchRequest implements ActionRequest { @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; - if (source == null && sourceBuilder == null && extraSource == null && extraSourceBuilder == null) { + if (source == null && extraSource == null) { validationException = addValidationError("search source is missing", validationException); } return validationException; } + /** + * Internal. + */ + public void beforeLocalFork() { + if (source != null && sourceUnsafe) { + source = Arrays.copyOfRange(source, sourceOffset, sourceLength); + sourceOffset = 0; + sourceUnsafe = false; + } + if (extraSource != null && extraSourceUnsafe) { + extraSource = Arrays.copyOfRange(extraSource, extraSourceOffset, extraSourceLength); + extraSourceOffset = 0; + extraSourceUnsafe = false; + } + } + /** * Should the listener be called on a separate thread if needed. */ @@ -195,7 +217,11 @@ public class SearchRequest implements ActionRequest { * The source of the search request. */ public SearchRequest source(SearchSourceBuilder sourceBuilder) { - this.sourceBuilder = sourceBuilder; + FastByteArrayOutputStream bos = sourceBuilder.buildAsUnsafeBytes(); + this.source = bos.unsafeByteArray(); + this.sourceOffset = 0; + this.sourceLength = bos.size(); + this.sourceUnsafe = true; return this; } @@ -204,7 +230,12 @@ public class SearchRequest implements ActionRequest { * {@link #source(org.elasticsearch.search.builder.SearchSourceBuilder)}. */ public SearchRequest source(String source) { - return source(Unicode.fromStringAsBytes(source)); + UnicodeUtil.UTF8Result result = Unicode.fromStringAsUtf8(source); + this.source = result.result; + this.sourceOffset = 0; + this.sourceLength = result.length; + this.sourceUnsafe = true; + return this; } /** @@ -214,18 +245,39 @@ public class SearchRequest implements ActionRequest { try { BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(contentType); builder.map(source); - this.source = builder.copiedBytes(); + return source(builder); } catch (IOException e) { throw new ElasticSearchGenerationException("Failed to generate [" + source + "]", e); } - return this; + } + + public SearchRequest source(XContentBuilder builder) { + try { + this.source = builder.unsafeBytes(); + this.sourceOffset = 0; + this.sourceLength = builder.unsafeBytesLength(); + this.sourceUnsafe = true; + return this; + } catch (IOException e) { + throw new ElasticSearchGenerationException("Failed to generate [" + builder + "]", e); + } } /** * The search source to execute. */ public SearchRequest source(byte[] source) { + return source(source, 0, source.length); + } + + /** + * The search source to execute. + */ + public SearchRequest source(byte[] source, int offset, int length) { this.source = source; + this.sourceOffset = offset; + this.sourceLength = length; + this.sourceUnsafe = false; return this; } @@ -233,17 +285,26 @@ public class SearchRequest implements ActionRequest { * The search source to execute. */ public byte[] source() { - if (source == null && sourceBuilder != null) { - source = sourceBuilder.buildAsBytes(contentType); - } return source; } + public int sourceOffset() { + return sourceOffset; + } + + public int sourceLength() { + return sourceLength; + } + /** * Allows to provide additional source that will be used as well. */ public SearchRequest extraSource(SearchSourceBuilder sourceBuilder) { - this.extraSourceBuilder = sourceBuilder; + FastByteArrayOutputStream bos = sourceBuilder.buildAsUnsafeBytes(); + this.extraSource = bos.unsafeByteArray(); + this.extraSourceOffset = 0; + this.extraSourceLength = bos.size(); + this.extraSourceUnsafe = true; return this; } @@ -251,25 +312,51 @@ public class SearchRequest implements ActionRequest { try { BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(contentType); builder.map(extraSource); - this.extraSource = builder.copiedBytes(); + return extraSource(builder); } catch (IOException e) { throw new ElasticSearchGenerationException("Failed to generate [" + source + "]", e); } - return this; + } + + public SearchRequest extraSource(XContentBuilder builder) { + try { + this.extraSource = builder.unsafeBytes(); + this.extraSourceOffset = 0; + this.extraSourceLength = builder.unsafeBytesLength(); + this.extraSourceUnsafe = true; + return this; + } catch (IOException e) { + throw new ElasticSearchGenerationException("Failed to generate [" + builder + "]", e); + } } /** * Allows to provide additional source that will use used as well. */ public SearchRequest extraSource(String source) { - return extraSource(Unicode.fromStringAsBytes(source)); + UnicodeUtil.UTF8Result result = Unicode.fromStringAsUtf8(source); + this.extraSource = result.result; + this.extraSourceOffset = 0; + this.extraSourceLength = result.length; + this.extraSourceUnsafe = true; + return this; } /** * Allows to provide additional source that will be used as well. */ public SearchRequest extraSource(byte[] source) { + return extraSource(source, 0, source.length); + } + + /** + * Allows to provide additional source that will be used as well. + */ + public SearchRequest extraSource(byte[] source, int offset, int length) { this.extraSource = source; + this.extraSourceOffset = offset; + this.extraSourceLength = length; + this.extraSourceUnsafe = false; return this; } @@ -277,12 +364,17 @@ public class SearchRequest implements ActionRequest { * Additional search source to execute. */ public byte[] extraSource() { - if (extraSource == null && extraSourceBuilder != null) { - extraSource = extraSourceBuilder.buildAsBytes(contentType); - } return this.extraSource; } + public int extraSourceOffset() { + return extraSourceOffset; + } + + public int extraSourceLength() { + return extraSourceLength; + } + /** * The tye of search to execute. */ @@ -368,18 +460,24 @@ public class SearchRequest implements ActionRequest { if (in.readBoolean()) { timeout = readTimeValue(in); } - int size = in.readVInt(); - if (size == 0) { + + sourceUnsafe = false; + sourceOffset = 0; + sourceLength = in.readVInt(); + if (sourceLength == 0) { source = Bytes.EMPTY_ARRAY; } else { - source = new byte[size]; + source = new byte[sourceLength]; in.readFully(source); } - size = in.readVInt(); - if (size == 0) { + + extraSourceUnsafe = false; + extraSourceOffset = 0; + extraSourceLength = in.readVInt(); + if (extraSourceLength == 0) { extraSource = Bytes.EMPTY_ARRAY; } else { - extraSource = new byte[size]; + extraSource = new byte[extraSourceLength]; in.readFully(extraSource); } @@ -420,29 +518,17 @@ public class SearchRequest implements ActionRequest { out.writeBoolean(true); timeout.writeTo(out); } - if (source == null && sourceBuilder == null) { + if (source == null) { out.writeVInt(0); } else { - if (source != null) { - out.writeVInt(source.length); - out.writeBytes(source); - } else { - FastByteArrayOutputStream os = sourceBuilder.buildAsUnsafeBytes(contentType); - out.writeVInt(os.size()); - out.writeBytes(os.unsafeByteArray(), 0, os.size()); - } + out.writeVInt(sourceLength); + out.writeBytes(source, sourceOffset, sourceLength); } - if (extraSource == null && extraSourceBuilder == null) { + if (extraSource == null) { out.writeVInt(0); } else { - if (extraSource != null) { - out.writeVInt(extraSource.length); - out.writeBytes(extraSource); - } else { - FastByteArrayOutputStream os = extraSourceBuilder.buildAsUnsafeBytes(contentType); - out.writeVInt(os.size()); - out.writeBytes(os.unsafeByteArray(), 0, os.size()); - } + out.writeVInt(extraSourceLength); + out.writeBytes(extraSource, extraSourceOffset, extraSourceLength); } out.writeVInt(types.length); for (String type : types) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java index 074c4433b0b..b85eb8ff84c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java @@ -64,8 +64,9 @@ public abstract class TransportSearchHelper { } public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, SearchRequest request) { - InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting, request.source()); - internalRequest.extraSource(request.extraSource()); + InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting); + internalRequest.source(request.source(), request.sourceOffset(), request.sourceLength()); + internalRequest.extraSource(request.extraSource(), request.extraSourceOffset(), request.extraSourceLength()); internalRequest.scroll(request.scroll()); internalRequest.timeout(request.timeout()); internalRequest.types(request.types()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index b852e3e0d38..3463e02991c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -132,6 +132,7 @@ public abstract class TransportSearchTypeAction extends BaseAction 0) { if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { + request.beforeLocalFork(); threadPool.execute(new Runnable() { @Override public void run() { for (final ShardsIterator shardIt : shardsIts) { @@ -146,6 +147,9 @@ public abstract class TransportSearchTypeAction extends BaseAction 0) { - request.beforeLocalFork(); if (request.operationThreading() == BroadcastOperationThreading.SINGLE_THREAD) { + request.beforeLocalFork(); threadPool.execute(new Runnable() { @Override public void run() { for (final ShardsIterator shardIt : shardsIts) { @@ -179,6 +179,9 @@ public abstract class TransportBroadcastOperationAction { context.scroll(request.scroll()); - parseSource(context, request.source()); - parseSource(context, request.extraSource()); + parseSource(context, request.source(), request.sourceOffset(), request.sourceLength()); + parseSource(context, request.extraSource(), request.extraSourceOffset(), request.extraSourceLength()); // if the from and size are still not set, default them if (context.from() == -1) { @@ -330,13 +330,13 @@ public class SearchService extends AbstractLifecycleComponent { context.release(); } - private void parseSource(SearchContext context, byte[] source) throws SearchParseException { + private void parseSource(SearchContext context, byte[] source, int offset, int length) throws SearchParseException { // nothing to parse... - if (source == null || source.length == 0) { + if (source == null || length == 0) { return; } try { - XContentParser parser = XContentFactory.xContent(source).createParser(source); + XContentParser parser = XContentFactory.xContent(source, offset, length).createParser(source, offset, length); XContentParser.Token token; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -352,7 +352,7 @@ public class SearchService extends AbstractLifecycleComponent { } } } catch (Exception e) { - throw new SearchParseException(context, "Failed to parse [" + Unicode.fromBytes(source) + "]", e); + throw new SearchParseException(context, "Failed to parse [" + Unicode.fromBytes(source, offset, length) + "]", e); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java index c18e4c1eca0..f813733083e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java @@ -66,20 +66,23 @@ public class InternalSearchRequest implements Streamable { private String[] types = Strings.EMPTY_ARRAY; private byte[] source; + private int sourceOffset; + private int sourceLength; private byte[] extraSource; + private int extraSourceOffset; + private int extraSourceLength; public InternalSearchRequest() { } - public InternalSearchRequest(ShardRouting shardRouting, byte[] source) { - this(shardRouting.index(), shardRouting.id(), source); + public InternalSearchRequest(ShardRouting shardRouting) { + this(shardRouting.index(), shardRouting.id()); } - public InternalSearchRequest(String index, int shardId, byte[] source) { + public InternalSearchRequest(String index, int shardId) { this.index = index; this.shardId = shardId; - this.source = source; } public String index() { @@ -94,12 +97,41 @@ public class InternalSearchRequest implements Streamable { return this.source; } + public int sourceOffset() { + return sourceOffset; + } + + public int sourceLength() { + return sourceLength; + } + public byte[] extraSource() { return this.extraSource; } - public InternalSearchRequest extraSource(byte[] extraSource) { + public int extraSourceOffset() { + return extraSourceOffset; + } + + public int extraSourceLength() { + return extraSourceLength; + } + + public InternalSearchRequest source(byte[] source) { + return source(source, 0, source.length); + } + + public InternalSearchRequest source(byte[] source, int offset, int length) { + this.source = source; + this.sourceOffset = offset; + this.sourceLength = length; + return this; + } + + public InternalSearchRequest extraSource(byte[] extraSource, int offset, int length) { this.extraSource = extraSource; + this.extraSourceOffset = offset; + this.extraSourceLength = length; return this; } @@ -138,18 +170,20 @@ public class InternalSearchRequest implements Streamable { if (in.readBoolean()) { timeout = readTimeValue(in); } - int size = in.readVInt(); - if (size == 0) { + sourceOffset = 0; + sourceLength = in.readVInt(); + if (sourceLength == 0) { source = Bytes.EMPTY_ARRAY; } else { - source = new byte[size]; + source = new byte[sourceLength]; in.readFully(source); } - size = in.readVInt(); - if (size == 0) { + extraSourceOffset = 0; + extraSourceLength = in.readVInt(); + if (extraSourceLength == 0) { extraSource = Bytes.EMPTY_ARRAY; } else { - extraSource = new byte[size]; + extraSource = new byte[extraSourceLength]; in.readFully(extraSource); } int typesSize = in.readVInt(); @@ -179,14 +213,14 @@ public class InternalSearchRequest implements Streamable { if (source == null) { out.writeVInt(0); } else { - out.writeVInt(source.length); - out.writeBytes(source); + out.writeVInt(sourceLength); + out.writeBytes(source, sourceOffset, sourceLength); } if (extraSource == null) { out.writeVInt(0); } else { - out.writeVInt(extraSource.length); - out.writeBytes(extraSource); + out.writeVInt(extraSourceLength); + out.writeBytes(extraSource, extraSourceOffset, extraSourceLength); } out.writeVInt(types.length); for (String type : types) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java index 2b7e06aee68..264aa726397 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java @@ -194,7 +194,7 @@ public class SingleInstanceEmbeddedSearchTests extends AbstractNodesTests { private InternalSearchRequest searchRequest(SearchSourceBuilder builder) { - return new InternalSearchRequest("test", 0, builder.buildAsBytes()); + return new InternalSearchRequest("test", 0).source(builder.buildAsBytes()); } private void index(Client client, String id, String nameValue, int age) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java index 44510065f6f..8246ff02b35 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java @@ -354,7 +354,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests { } private InternalSearchRequest searchRequest(ShardRouting shardRouting, SearchSourceBuilder builder) { - return new InternalSearchRequest(shardRouting, builder.buildAsBytes()); + return new InternalSearchRequest(shardRouting).source(builder.buildAsBytes()); } private void index(Client client, String id, String nameValue, int age) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java index 4655c6a81d7..b1f75c10e5f 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java @@ -360,7 +360,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode } private static InternalSearchRequest searchRequest(ShardRouting shardRouting, SearchSourceBuilder builder) { - return new InternalSearchRequest(shardRouting, builder.buildAsBytes()); + return new InternalSearchRequest(shardRouting).source(builder.buildAsBytes()); } private void index(Client client, String id, String nameValue, int age) {