improve search serialization and deserialization

This commit is contained in:
kimchy 2010-05-13 18:39:24 +03:00
parent bbdbfbeb59
commit 836461e6de
10 changed files with 200 additions and 70 deletions

View File

@ -166,12 +166,14 @@ public class TransportMoreLikeThisAction extends BaseAction<MoreLikeThisRequest,
SearchRequest searchRequest = searchRequest(searchIndices)
.types(searchTypes)
.searchType(request.searchType())
.source(request.searchSource())
.scroll(request.searchScroll())
.extraSource(searchSource()
.query(boolBuilder)
)
.listenerThreaded(request.listenerThreaded());
if (request.searchSource() != null) {
searchRequest.source(request.searchSource());
}
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
@Override public void onResponse(SearchResponse response) {
listener.onResponse(response);

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionRequest;
@ -36,8 +37,10 @@ import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.xcontent.XContentFactory;
import org.elasticsearch.util.xcontent.XContentType;
import org.elasticsearch.util.xcontent.builder.BinaryXContentBuilder;
import org.elasticsearch.util.xcontent.builder.XContentBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import static org.elasticsearch.action.Actions.*;
@ -69,8 +72,14 @@ public class SearchRequest implements ActionRequest {
private String queryHint;
private byte[] source;
private int sourceOffset;
private int sourceLength;
private boolean sourceUnsafe;
private byte[] extraSource;
private int extraSourceOffset;
private int extraSourceLength;
private boolean extraSourceUnsafe;
private Scroll scroll;
@ -81,9 +90,6 @@ public class SearchRequest implements ActionRequest {
private boolean listenerThreaded = false;
private SearchOperationThreading operationThreading = SearchOperationThreading.SINGLE_THREAD;
private transient SearchSourceBuilder sourceBuilder;
private transient SearchSourceBuilder extraSourceBuilder;
SearchRequest() {
}
@ -105,12 +111,28 @@ public class SearchRequest implements ActionRequest {
@Override public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (source == null && sourceBuilder == null && extraSource == null && extraSourceBuilder == null) {
if (source == null && extraSource == null) {
validationException = addValidationError("search source is missing", validationException);
}
return validationException;
}
/**
* Internal.
*/
public void beforeLocalFork() {
if (source != null && sourceUnsafe) {
source = Arrays.copyOfRange(source, sourceOffset, sourceLength);
sourceOffset = 0;
sourceUnsafe = false;
}
if (extraSource != null && extraSourceUnsafe) {
extraSource = Arrays.copyOfRange(extraSource, extraSourceOffset, extraSourceLength);
extraSourceOffset = 0;
extraSourceUnsafe = false;
}
}
/**
* Should the listener be called on a separate thread if needed.
*/
@ -195,7 +217,11 @@ public class SearchRequest implements ActionRequest {
* The source of the search request.
*/
public SearchRequest source(SearchSourceBuilder sourceBuilder) {
this.sourceBuilder = sourceBuilder;
FastByteArrayOutputStream bos = sourceBuilder.buildAsUnsafeBytes();
this.source = bos.unsafeByteArray();
this.sourceOffset = 0;
this.sourceLength = bos.size();
this.sourceUnsafe = true;
return this;
}
@ -204,7 +230,12 @@ public class SearchRequest implements ActionRequest {
* {@link #source(org.elasticsearch.search.builder.SearchSourceBuilder)}.
*/
public SearchRequest source(String source) {
return source(Unicode.fromStringAsBytes(source));
UnicodeUtil.UTF8Result result = Unicode.fromStringAsUtf8(source);
this.source = result.result;
this.sourceOffset = 0;
this.sourceLength = result.length;
this.sourceUnsafe = true;
return this;
}
/**
@ -214,18 +245,39 @@ public class SearchRequest implements ActionRequest {
try {
BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(contentType);
builder.map(source);
this.source = builder.copiedBytes();
return source(builder);
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + source + "]", e);
}
return this;
}
public SearchRequest source(XContentBuilder builder) {
try {
this.source = builder.unsafeBytes();
this.sourceOffset = 0;
this.sourceLength = builder.unsafeBytesLength();
this.sourceUnsafe = true;
return this;
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + builder + "]", e);
}
}
/**
* The search source to execute.
*/
public SearchRequest source(byte[] source) {
return source(source, 0, source.length);
}
/**
* The search source to execute.
*/
public SearchRequest source(byte[] source, int offset, int length) {
this.source = source;
this.sourceOffset = offset;
this.sourceLength = length;
this.sourceUnsafe = false;
return this;
}
@ -233,17 +285,26 @@ public class SearchRequest implements ActionRequest {
* The search source to execute.
*/
public byte[] source() {
if (source == null && sourceBuilder != null) {
source = sourceBuilder.buildAsBytes(contentType);
}
return source;
}
public int sourceOffset() {
return sourceOffset;
}
public int sourceLength() {
return sourceLength;
}
/**
* Allows to provide additional source that will be used as well.
*/
public SearchRequest extraSource(SearchSourceBuilder sourceBuilder) {
this.extraSourceBuilder = sourceBuilder;
FastByteArrayOutputStream bos = sourceBuilder.buildAsUnsafeBytes();
this.extraSource = bos.unsafeByteArray();
this.extraSourceOffset = 0;
this.extraSourceLength = bos.size();
this.extraSourceUnsafe = true;
return this;
}
@ -251,25 +312,51 @@ public class SearchRequest implements ActionRequest {
try {
BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(contentType);
builder.map(extraSource);
this.extraSource = builder.copiedBytes();
return extraSource(builder);
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + source + "]", e);
}
return this;
}
public SearchRequest extraSource(XContentBuilder builder) {
try {
this.extraSource = builder.unsafeBytes();
this.extraSourceOffset = 0;
this.extraSourceLength = builder.unsafeBytesLength();
this.extraSourceUnsafe = true;
return this;
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + builder + "]", e);
}
}
/**
* Allows to provide additional source that will use used as well.
*/
public SearchRequest extraSource(String source) {
return extraSource(Unicode.fromStringAsBytes(source));
UnicodeUtil.UTF8Result result = Unicode.fromStringAsUtf8(source);
this.extraSource = result.result;
this.extraSourceOffset = 0;
this.extraSourceLength = result.length;
this.extraSourceUnsafe = true;
return this;
}
/**
* Allows to provide additional source that will be used as well.
*/
public SearchRequest extraSource(byte[] source) {
return extraSource(source, 0, source.length);
}
/**
* Allows to provide additional source that will be used as well.
*/
public SearchRequest extraSource(byte[] source, int offset, int length) {
this.extraSource = source;
this.extraSourceOffset = offset;
this.extraSourceLength = length;
this.extraSourceUnsafe = false;
return this;
}
@ -277,12 +364,17 @@ public class SearchRequest implements ActionRequest {
* Additional search source to execute.
*/
public byte[] extraSource() {
if (extraSource == null && extraSourceBuilder != null) {
extraSource = extraSourceBuilder.buildAsBytes(contentType);
}
return this.extraSource;
}
public int extraSourceOffset() {
return extraSourceOffset;
}
public int extraSourceLength() {
return extraSourceLength;
}
/**
* The tye of search to execute.
*/
@ -368,18 +460,24 @@ public class SearchRequest implements ActionRequest {
if (in.readBoolean()) {
timeout = readTimeValue(in);
}
int size = in.readVInt();
if (size == 0) {
sourceUnsafe = false;
sourceOffset = 0;
sourceLength = in.readVInt();
if (sourceLength == 0) {
source = Bytes.EMPTY_ARRAY;
} else {
source = new byte[size];
source = new byte[sourceLength];
in.readFully(source);
}
size = in.readVInt();
if (size == 0) {
extraSourceUnsafe = false;
extraSourceOffset = 0;
extraSourceLength = in.readVInt();
if (extraSourceLength == 0) {
extraSource = Bytes.EMPTY_ARRAY;
} else {
extraSource = new byte[size];
extraSource = new byte[extraSourceLength];
in.readFully(extraSource);
}
@ -420,29 +518,17 @@ public class SearchRequest implements ActionRequest {
out.writeBoolean(true);
timeout.writeTo(out);
}
if (source == null && sourceBuilder == null) {
if (source == null) {
out.writeVInt(0);
} else {
if (source != null) {
out.writeVInt(source.length);
out.writeBytes(source);
} else {
FastByteArrayOutputStream os = sourceBuilder.buildAsUnsafeBytes(contentType);
out.writeVInt(os.size());
out.writeBytes(os.unsafeByteArray(), 0, os.size());
}
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
}
if (extraSource == null && extraSourceBuilder == null) {
if (extraSource == null) {
out.writeVInt(0);
} else {
if (extraSource != null) {
out.writeVInt(extraSource.length);
out.writeBytes(extraSource);
} else {
FastByteArrayOutputStream os = extraSourceBuilder.buildAsUnsafeBytes(contentType);
out.writeVInt(os.size());
out.writeBytes(os.unsafeByteArray(), 0, os.size());
}
out.writeVInt(extraSourceLength);
out.writeBytes(extraSource, extraSourceOffset, extraSourceLength);
}
out.writeVInt(types.length);
for (String type : types) {

View File

@ -64,8 +64,9 @@ public abstract class TransportSearchHelper {
}
public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, SearchRequest request) {
InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting, request.source());
internalRequest.extraSource(request.extraSource());
InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting);
internalRequest.source(request.source(), request.sourceOffset(), request.sourceLength());
internalRequest.extraSource(request.extraSource(), request.extraSourceOffset(), request.extraSourceLength());
internalRequest.scroll(request.scroll());
internalRequest.timeout(request.timeout());
internalRequest.types(request.types());

View File

@ -132,6 +132,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
// we have local operations, perform them now
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
request.beforeLocalFork();
threadPool.execute(new Runnable() {
@Override public void run() {
for (final ShardsIterator shardIt : shardsIts) {
@ -146,6 +147,9 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
if (localAsync) {
request.beforeLocalFork();
}
for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
if (shard != null) {

View File

@ -163,8 +163,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
// we have local operations, perform them now
if (localOperations > 0) {
request.beforeLocalFork();
if (request.operationThreading() == BroadcastOperationThreading.SINGLE_THREAD) {
request.beforeLocalFork();
threadPool.execute(new Runnable() {
@Override public void run() {
for (final ShardsIterator shardIt : shardsIts) {
@ -179,6 +179,9 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
});
} else {
boolean localAsync = request.operationThreading() == BroadcastOperationThreading.THREAD_PER_SHARD;
if (localAsync) {
request.beforeLocalFork();
}
for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
if (shard != null) {

View File

@ -289,8 +289,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
context.scroll(request.scroll());
parseSource(context, request.source());
parseSource(context, request.extraSource());
parseSource(context, request.source(), request.sourceOffset(), request.sourceLength());
parseSource(context, request.extraSource(), request.extraSourceOffset(), request.extraSourceLength());
// if the from and size are still not set, default them
if (context.from() == -1) {
@ -330,13 +330,13 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
context.release();
}
private void parseSource(SearchContext context, byte[] source) throws SearchParseException {
private void parseSource(SearchContext context, byte[] source, int offset, int length) throws SearchParseException {
// nothing to parse...
if (source == null || source.length == 0) {
if (source == null || length == 0) {
return;
}
try {
XContentParser parser = XContentFactory.xContent(source).createParser(source);
XContentParser parser = XContentFactory.xContent(source, offset, length).createParser(source, offset, length);
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
@ -352,7 +352,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
}
} catch (Exception e) {
throw new SearchParseException(context, "Failed to parse [" + Unicode.fromBytes(source) + "]", e);
throw new SearchParseException(context, "Failed to parse [" + Unicode.fromBytes(source, offset, length) + "]", e);
}
}

View File

@ -66,20 +66,23 @@ public class InternalSearchRequest implements Streamable {
private String[] types = Strings.EMPTY_ARRAY;
private byte[] source;
private int sourceOffset;
private int sourceLength;
private byte[] extraSource;
private int extraSourceOffset;
private int extraSourceLength;
public InternalSearchRequest() {
}
public InternalSearchRequest(ShardRouting shardRouting, byte[] source) {
this(shardRouting.index(), shardRouting.id(), source);
public InternalSearchRequest(ShardRouting shardRouting) {
this(shardRouting.index(), shardRouting.id());
}
public InternalSearchRequest(String index, int shardId, byte[] source) {
public InternalSearchRequest(String index, int shardId) {
this.index = index;
this.shardId = shardId;
this.source = source;
}
public String index() {
@ -94,12 +97,41 @@ public class InternalSearchRequest implements Streamable {
return this.source;
}
public int sourceOffset() {
return sourceOffset;
}
public int sourceLength() {
return sourceLength;
}
public byte[] extraSource() {
return this.extraSource;
}
public InternalSearchRequest extraSource(byte[] extraSource) {
public int extraSourceOffset() {
return extraSourceOffset;
}
public int extraSourceLength() {
return extraSourceLength;
}
public InternalSearchRequest source(byte[] source) {
return source(source, 0, source.length);
}
public InternalSearchRequest source(byte[] source, int offset, int length) {
this.source = source;
this.sourceOffset = offset;
this.sourceLength = length;
return this;
}
public InternalSearchRequest extraSource(byte[] extraSource, int offset, int length) {
this.extraSource = extraSource;
this.extraSourceOffset = offset;
this.extraSourceLength = length;
return this;
}
@ -138,18 +170,20 @@ public class InternalSearchRequest implements Streamable {
if (in.readBoolean()) {
timeout = readTimeValue(in);
}
int size = in.readVInt();
if (size == 0) {
sourceOffset = 0;
sourceLength = in.readVInt();
if (sourceLength == 0) {
source = Bytes.EMPTY_ARRAY;
} else {
source = new byte[size];
source = new byte[sourceLength];
in.readFully(source);
}
size = in.readVInt();
if (size == 0) {
extraSourceOffset = 0;
extraSourceLength = in.readVInt();
if (extraSourceLength == 0) {
extraSource = Bytes.EMPTY_ARRAY;
} else {
extraSource = new byte[size];
extraSource = new byte[extraSourceLength];
in.readFully(extraSource);
}
int typesSize = in.readVInt();
@ -179,14 +213,14 @@ public class InternalSearchRequest implements Streamable {
if (source == null) {
out.writeVInt(0);
} else {
out.writeVInt(source.length);
out.writeBytes(source);
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
}
if (extraSource == null) {
out.writeVInt(0);
} else {
out.writeVInt(extraSource.length);
out.writeBytes(extraSource);
out.writeVInt(extraSourceLength);
out.writeBytes(extraSource, extraSourceOffset, extraSourceLength);
}
out.writeVInt(types.length);
for (String type : types) {

View File

@ -194,7 +194,7 @@ public class SingleInstanceEmbeddedSearchTests extends AbstractNodesTests {
private InternalSearchRequest searchRequest(SearchSourceBuilder builder) {
return new InternalSearchRequest("test", 0, builder.buildAsBytes());
return new InternalSearchRequest("test", 0).source(builder.buildAsBytes());
}
private void index(Client client, String id, String nameValue, int age) {

View File

@ -354,7 +354,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
}
private InternalSearchRequest searchRequest(ShardRouting shardRouting, SearchSourceBuilder builder) {
return new InternalSearchRequest(shardRouting, builder.buildAsBytes());
return new InternalSearchRequest(shardRouting).source(builder.buildAsBytes());
}
private void index(Client client, String id, String nameValue, int age) {

View File

@ -360,7 +360,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
}
private static InternalSearchRequest searchRequest(ShardRouting shardRouting, SearchSourceBuilder builder) {
return new InternalSearchRequest(shardRouting, builder.buildAsBytes());
return new InternalSearchRequest(shardRouting).source(builder.buildAsBytes());
}
private void index(Client client, String id, String nameValue, int age) {