reduce shard search response size by not serilaizing the node id, index, and shard id

This commit is contained in:
kimchy 2010-09-02 11:19:25 +03:00
parent 042af200e8
commit 0250896a09
17 changed files with 237 additions and 85 deletions

View File

@ -35,7 +35,7 @@ import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.action.search.SearchType.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TransportSearchAction extends BaseAction<SearchRequest, SearchResponse> {

View File

@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAction {
@ -87,42 +87,42 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
final AtomicInteger counter = new AtomicInteger(dfsResults.size());
int localOperations = 0;
for (DfsSearchResult dfsResult : dfsResults) {
for (final DfsSearchResult dfsResult : dfsResults) {
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(dfsResult.id(), dfs);
executeSecondPhase(counter, node, querySearchRequest);
executeSecondPhase(dfsResult, counter, node, querySearchRequest);
}
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
@Override public void run() {
for (DfsSearchResult dfsResult : dfsResults) {
for (final DfsSearchResult dfsResult : dfsResults) {
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(dfsResult.id(), dfs);
executeSecondPhase(counter, node, querySearchRequest);
executeSecondPhase(dfsResult, counter, node, querySearchRequest);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (DfsSearchResult dfsResult : dfsResults) {
for (final DfsSearchResult dfsResult : dfsResults) {
final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(dfsResult.id(), dfs);
if (localAsync) {
threadPool.execute(new Runnable() {
@Override public void run() {
executeSecondPhase(counter, node, querySearchRequest);
executeSecondPhase(dfsResult, counter, node, querySearchRequest);
}
});
} else {
executeSecondPhase(counter, node, querySearchRequest);
executeSecondPhase(dfsResult, counter, node, querySearchRequest);
}
}
}
@ -130,9 +130,10 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
}
}
private void executeSecondPhase(final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
private void executeSecondPhase(final DfsSearchResult dfsResult, final AtomicInteger counter, DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchService.sendExecuteFetch(node, querySearchRequest, new SearchServiceListener<QueryFetchSearchResult>() {
@Override public void onResult(QueryFetchSearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryFetchResults.put(result.shardTarget(), result);
if (counter.decrementAndGet() == 0) {
finishHim();

View File

@ -48,7 +48,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeAction {
@ -93,13 +93,13 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
int localOperations = 0;
for (DfsSearchResult dfsResult : dfsResults) {
for (final DfsSearchResult dfsResult : dfsResults) {
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(dfsResult.id(), dfs);
executeQuery(counter, querySearchRequest, node);
executeQuery(dfsResult, counter, querySearchRequest, node);
}
}
@ -107,29 +107,29 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
@Override public void run() {
for (DfsSearchResult dfsResult : dfsResults) {
for (final DfsSearchResult dfsResult : dfsResults) {
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(dfsResult.id(), dfs);
executeQuery(counter, querySearchRequest, node);
executeQuery(dfsResult, counter, querySearchRequest, node);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (DfsSearchResult dfsResult : dfsResults) {
for (final DfsSearchResult dfsResult : dfsResults) {
final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(dfsResult.id(), dfs);
if (localAsync) {
threadPool.execute(new Runnable() {
@Override public void run() {
executeQuery(counter, querySearchRequest, node);
executeQuery(dfsResult, counter, querySearchRequest, node);
}
});
} else {
executeQuery(counter, querySearchRequest, node);
executeQuery(dfsResult, counter, querySearchRequest, node);
}
}
}
@ -137,9 +137,10 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
}
private void executeQuery(final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) {
private void executeQuery(final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) {
searchService.sendExecuteQuery(node, querySearchRequest, new SearchServiceListener<QuerySearchResult>() {
@Override public void onResult(QuerySearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryResults.put(result.shardTarget(), result);
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
@ -178,13 +179,13 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.size());
int localOperations = 0;
for (Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(queryResults.get(entry.getKey()).id(), entry.getValue());
executeFetch(counter, fetchSearchRequest, node);
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
}
}
@ -192,29 +193,29 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
@Override public void run() {
for (Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
if (node.id().equals(nodes.localNodeId())) {
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(queryResults.get(entry.getKey()).id(), entry.getValue());
executeFetch(counter, fetchSearchRequest, node);
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
final DiscoveryNode node = nodes.get(entry.getKey().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(queryResults.get(entry.getKey()).id(), entry.getValue());
if (localAsync) {
threadPool.execute(new Runnable() {
@Override public void run() {
executeFetch(counter, fetchSearchRequest, node);
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
}
});
} else {
executeFetch(counter, fetchSearchRequest, node);
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
}
}
}
@ -222,9 +223,10 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
}
private void executeFetch(final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
private void executeFetch(final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override public void onResult(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.put(result.shardTarget(), result);
if (counter.decrementAndGet() == 0) {
finishHim();

View File

@ -93,13 +93,13 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.size());
int localOperations = 0;
for (Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(queryResults.get(entry.getKey()).id(), entry.getValue());
executeFetch(counter, fetchSearchRequest, node);
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
}
}
@ -107,29 +107,29 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.execute(new Runnable() {
@Override public void run() {
for (Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
DiscoveryNode node = nodes.get(entry.getKey().nodeId());
if (node.id().equals(nodes.localNodeId())) {
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(queryResults.get(entry.getKey()).id(), entry.getValue());
executeFetch(counter, fetchSearchRequest, node);
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
final DiscoveryNode node = nodes.get(entry.getKey().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(queryResults.get(entry.getKey()).id(), entry.getValue());
if (localAsync) {
threadPool.execute(new Runnable() {
@Override public void run() {
executeFetch(counter, fetchSearchRequest, node);
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
}
});
} else {
executeFetch(counter, fetchSearchRequest, node);
executeFetch(entry.getKey(), counter, fetchSearchRequest, node);
}
}
}
@ -137,9 +137,10 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
}
}
private void executeFetch(final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
private void executeFetch(final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override public void onResult(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.put(result.shardTarget(), result);
if (counter.decrementAndGet() == 0) {
finishHim();

View File

@ -195,13 +195,14 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.size());
for (Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
for (final Map.Entry<SearchShardTarget, ExtTIntArrayList> entry : docIdsToLoad.entrySet()) {
SearchShardTarget shardTarget = entry.getKey();
ExtTIntArrayList docIds = entry.getValue();
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(queryResults.get(shardTarget).id(), docIds);
DiscoveryNode node = nodes.get(shardTarget.nodeId());
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override public void onResult(FetchSearchResult result) {
result.shardTarget(entry.getKey());
fetchResults.put(result.shardTarget(), result);
if (counter.decrementAndGet() == 0) {
finishHim();

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
@ -77,7 +78,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
this.searchPhaseController = searchPhaseController;
}
protected abstract class BaseAsyncAction<FirstResult> {
protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> {
protected final ActionListener<SearchResponse> listener;
@ -205,6 +206,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
private void onFirstPhaseResult(ShardRouting shard, FirstResult result, ShardsIterator shardIt) {
result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id()));
processFirstPhaseResult(shard, result);
// increment all the "future" shards to update the total ops since we some may work and some may not...
// and when that happens, we break on total ops, so we must maintain them

View File

@ -29,4 +29,6 @@ public interface SearchPhaseResult extends Streamable {
long id();
SearchShardTarget shardTarget();
void shardTarget(SearchShardTarget shardTarget);
}

View File

@ -45,17 +45,11 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.dfs.CachedDfSource;
import org.elasticsearch.search.dfs.DfsPhase;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.*;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QueryPhaseExecutionException;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.*;
import org.elasticsearch.timer.TimerService;
import javax.annotation.Nullable;
@ -174,12 +168,12 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
}
public QuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) throws ElasticSearchException {
public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
try {
processScroll(request, context);
queryPhase.execute(context);
return context.queryResult();
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
} catch (RuntimeException e) {
freeContext(context);
throw e;
@ -241,7 +235,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
}
public QueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) throws ElasticSearchException {
public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) throws ElasticSearchException {
SearchContext context = findContext(request.id());
try {
processScroll(request, context);
@ -251,7 +245,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
if (context.scroll() == null) {
freeContext(request.id());
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
} catch (RuntimeException e) {
freeContext(context);
throw e;
@ -287,7 +281,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(request.shardId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.state().nodes().localNodeId(), request.index(), request.shardId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
Engine.Searcher engineSearcher = indexShard.searcher();
SearchContext context = new SearchContext(idGenerator.incrementAndGet(), shardTarget, request.numberOfShards(), request.timeout(), request.types(), engineSearcher, indexService, scriptService);

View File

@ -32,10 +32,12 @@ import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.transport.*;
/**
@ -186,20 +188,20 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener<QuerySearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
QuerySearchResult result = searchService.executeQueryPhase(request);
listener.onResult(result);
ScrollQuerySearchResult result = searchService.executeQueryPhase(request);
listener.onResult(result.queryResult());
} catch (Exception e) {
listener.onFailure(e);
}
} else {
transportService.sendRequest(node, SearchQueryScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler<QuerySearchResult>() {
transportService.sendRequest(node, SearchQueryScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler<ScrollQuerySearchResult>() {
@Override public QuerySearchResult newInstance() {
return new QuerySearchResult();
@Override public ScrollQuerySearchResult newInstance() {
return new ScrollQuerySearchResult();
}
@Override public void handleResponse(QuerySearchResult response) {
listener.onResult(response);
@Override public void handleResponse(ScrollQuerySearchResult response) {
listener.onResult(response.queryResult());
}
@Override public void handleException(RemoteTransportException exp) {
@ -276,20 +278,20 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, final SearchServiceListener<QueryFetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
listener.onResult(result);
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request);
listener.onResult(result.result());
} catch (Exception e) {
listener.onFailure(e);
}
} else {
transportService.sendRequest(node, SearchQueryFetchScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler<QueryFetchSearchResult>() {
transportService.sendRequest(node, SearchQueryFetchScrollTransportHandler.ACTION, request, new BaseTransportResponseHandler<ScrollQueryFetchSearchResult>() {
@Override public QueryFetchSearchResult newInstance() {
return new QueryFetchSearchResult();
@Override public ScrollQueryFetchSearchResult newInstance() {
return new ScrollQueryFetchSearchResult();
}
@Override public void handleResponse(QueryFetchSearchResult response) {
listener.onResult(response);
@Override public void handleResponse(ScrollQueryFetchSearchResult response) {
listener.onResult(response.result());
}
@Override public void handleException(RemoteTransportException exp) {
@ -399,7 +401,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
}
@Override public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel) throws Exception {
QuerySearchResult result = searchService.executeQueryPhase(request);
ScrollQuerySearchResult result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
}
}
@ -455,7 +457,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
}
@Override public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel) throws Exception {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
}

View File

@ -27,8 +27,6 @@ import org.elasticsearch.search.SearchShardTarget;
import java.io.IOException;
import static org.elasticsearch.search.SearchShardTarget.*;
/**
* @author kimchy (shay.banon)
*/
@ -65,6 +63,10 @@ public class DfsSearchResult implements SearchPhaseResult {
return shardTarget;
}
@Override public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
}
public DfsSearchResult maxDoc(int maxDoc) {
this.maxDoc = maxDoc;
return this;
@ -96,7 +98,7 @@ public class DfsSearchResult implements SearchPhaseResult {
@Override public void readFrom(StreamInput in) throws IOException {
id = in.readLong();
shardTarget = readSearchShardTarget(in);
// shardTarget = readSearchShardTarget(in);
int termsSize = in.readVInt();
if (termsSize == 0) {
terms = EMPTY_TERMS;
@ -120,7 +122,7 @@ public class DfsSearchResult implements SearchPhaseResult {
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeLong(id);
shardTarget.writeTo(out);
// shardTarget.writeTo(out);
out.writeVInt(terms.length);
for (Term term : terms) {
out.writeUTF(term.field());

View File

@ -27,7 +27,6 @@ import org.elasticsearch.search.internal.InternalSearchHits;
import java.io.IOException;
import static org.elasticsearch.search.SearchShardTarget.*;
import static org.elasticsearch.search.internal.InternalSearchHits.*;
/**
@ -65,6 +64,10 @@ public class FetchSearchResult implements Streamable, FetchSearchResultProvider
return this.shardTarget;
}
@Override public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
}
public void hits(InternalSearchHits hits) {
this.hits = hits;
}
@ -90,13 +93,13 @@ public class FetchSearchResult implements Streamable, FetchSearchResultProvider
@Override public void readFrom(StreamInput in) throws IOException {
id = in.readLong();
shardTarget = readSearchShardTarget(in);
// shardTarget = readSearchShardTarget(in);
hits = readSearchHits(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeLong(id);
shardTarget.writeTo(out);
// shardTarget.writeTo(out);
hits.writeTo(out);
}
}

View File

@ -32,7 +32,7 @@ import static org.elasticsearch.search.fetch.FetchSearchResult.*;
import static org.elasticsearch.search.query.QuerySearchResult.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class QueryFetchSearchResult implements Streamable, QuerySearchResultProvider, FetchSearchResultProvider {
@ -57,6 +57,11 @@ public class QueryFetchSearchResult implements Streamable, QuerySearchResultProv
return queryResult.shardTarget();
}
@Override public void shardTarget(SearchShardTarget shardTarget) {
queryResult.shardTarget(shardTarget);
fetchResult.shardTarget(shardTarget);
}
@Override public boolean includeFetch() {
return true;
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.fetch;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.SearchShardTarget;
import java.io.IOException;
import static org.elasticsearch.search.SearchShardTarget.*;
import static org.elasticsearch.search.fetch.QueryFetchSearchResult.*;
/**
* @author kimchy (shay.banon)
*/
public class ScrollQueryFetchSearchResult implements Streamable {
private QueryFetchSearchResult result;
private SearchShardTarget shardTarget;
public ScrollQueryFetchSearchResult() {
}
public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) {
this.result = result;
this.shardTarget = shardTarget;
}
public QueryFetchSearchResult result() {
return result;
}
public SearchShardTarget shardTarget() {
return shardTarget;
}
@Override public void readFrom(StreamInput in) throws IOException {
shardTarget = readSearchShardTarget(in);
result = readQueryFetchSearchResult(in);
result.shardTarget(shardTarget);
}
@Override public void writeTo(StreamOutput out) throws IOException {
shardTarget.writeTo(out);
result.writeTo(out);
}
}

View File

@ -30,10 +30,9 @@ import org.elasticsearch.search.facets.internal.InternalFacets;
import java.io.IOException;
import static org.elasticsearch.common.lucene.Lucene.*;
import static org.elasticsearch.search.SearchShardTarget.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class QuerySearchResult implements Streamable, QuerySearchResultProvider {
@ -76,6 +75,10 @@ public class QuerySearchResult implements Streamable, QuerySearchResultProvider
return shardTarget;
}
@Override public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
}
public void searchTimedOut(boolean searchTimedOut) {
this.searchTimedOut = searchTimedOut;
}
@ -126,7 +129,7 @@ public class QuerySearchResult implements Streamable, QuerySearchResultProvider
@Override public void readFrom(StreamInput in) throws IOException {
id = in.readLong();
shardTarget = readSearchShardTarget(in);
// shardTarget = readSearchShardTarget(in);
from = in.readVInt();
size = in.readVInt();
topDocs = readTopDocs(in);
@ -138,7 +141,7 @@ public class QuerySearchResult implements Streamable, QuerySearchResultProvider
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeLong(id);
shardTarget.writeTo(out);
// shardTarget.writeTo(out);
out.writeVInt(from);
out.writeVInt(size);
writeTopDocs(out, topDocs, 0);

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.query;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.SearchShardTarget;
import java.io.IOException;
import static org.elasticsearch.search.SearchShardTarget.*;
import static org.elasticsearch.search.query.QuerySearchResult.*;
/**
* @author kimchy (shay.banon)
*/
public class ScrollQuerySearchResult implements Streamable {
private QuerySearchResult queryResult;
private SearchShardTarget shardTarget;
public ScrollQuerySearchResult() {
}
public ScrollQuerySearchResult(QuerySearchResult queryResult, SearchShardTarget shardTarget) {
this.queryResult = queryResult;
this.shardTarget = shardTarget;
}
public QuerySearchResult queryResult() {
return queryResult;
}
public SearchShardTarget shardTarget() {
return shardTarget;
}
@Override public void readFrom(StreamInput in) throws IOException {
shardTarget = readSearchShardTarget(in);
queryResult = readQuerySearchResult(in);
queryResult.shardTarget(shardTarget);
}
@Override public void writeTo(StreamOutput out) throws IOException {
shardTarget.writeTo(out);
queryResult.writeTo(out);
}
}

View File

@ -152,7 +152,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
// now try and scroll to the next batch of results
Map<SearchShardTarget, QuerySearchResultProvider> scrollQueryResults = newHashMap();
for (QuerySearchResultProvider queryResult : queryResults.values()) {
scrollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id())));
scrollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id())).queryResult());
}
queryResults = scrollQueryResults;
@ -219,7 +219,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
// now try and scroll to the next batch of results
Map<SearchShardTarget, QuerySearchResultProvider> scrollQueryResults = newHashMap();
for (QuerySearchResultProvider queryResult : queryResults.values()) {
scrollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id()).scroll(new Scroll(timeValueMinutes(10)))));
scrollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id()).scroll(new Scroll(timeValueMinutes(10)))).queryResult());
}
queryResults = scrollQueryResults;
@ -245,7 +245,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
// now try and scroll to the next next batch of results
scrollQueryResults = newHashMap();
for (QuerySearchResultProvider queryResult : queryResults.values()) {
scrollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id())));
scrollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id())).queryResult());
}
queryResults = scrollQueryResults;
@ -295,7 +295,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
// scrolling with query+fetch is not perfect when it comes to dist sorting
Map<SearchShardTarget, QueryFetchSearchResult> scrollQueryFetchResults = newHashMap();
for (QueryFetchSearchResult searchResult : queryFetchResults.values()) {
QueryFetchSearchResult queryFetchResult = nodeToSearchService.get(searchResult.shardTarget().nodeId()).executeFetchPhase(new InternalScrollSearchRequest(searchResult.id()).scroll(new Scroll(timeValueMinutes(10))));
QueryFetchSearchResult queryFetchResult = nodeToSearchService.get(searchResult.shardTarget().nodeId()).executeFetchPhase(new InternalScrollSearchRequest(searchResult.id()).scroll(new Scroll(timeValueMinutes(10)))).result();
scrollQueryFetchResults.put(queryFetchResult.shardTarget(), queryFetchResult);
}
queryFetchResults = scrollQueryFetchResults;

View File

@ -157,7 +157,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
// now try and scroll to the next batch of results
Map<SearchShardTarget, QuerySearchResultProvider> scollQueryResults = newHashMap();
for (QuerySearchResultProvider queryResult : queryResults.values()) {
scollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id())));
scollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id())).queryResult());
}
queryResults = scollQueryResults;
@ -224,7 +224,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
// now try and scroll to the next batch of results
Map<SearchShardTarget, QuerySearchResultProvider> scollQueryResults = newHashMap();
for (QuerySearchResultProvider queryResult : queryResults.values()) {
scollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id()).scroll(new Scroll(timeValueMinutes(10)))));
scollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id()).scroll(new Scroll(timeValueMinutes(10)))).queryResult());
}
queryResults = scollQueryResults;
@ -250,7 +250,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
// now try and scroll to the next next batch of results
scollQueryResults = newHashMap();
for (QuerySearchResultProvider queryResult : queryResults.values()) {
scollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id())));
scollQueryResults.put(queryResult.queryResult().shardTarget(), nodeToSearchService.get(queryResult.shardTarget().nodeId()).executeQueryPhase(new InternalScrollSearchRequest(queryResult.id())).queryResult());
}
queryResults = scollQueryResults;