internal support for custom attributes in scroll id

This commit is contained in:
kimchy 2011-03-19 21:51:18 +02:00
parent f3160b71bb
commit 52c750fc42
11 changed files with 83 additions and 46 deletions

View File

@ -21,6 +21,8 @@ package org.elasticsearch.action.search.type;
import org.elasticsearch.common.collect.Tuple;
import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
@ -36,12 +38,15 @@ public class ParsedScrollId {
private final String type;
private final Tuple<String, Long>[] values;
private final Tuple<String, Long>[] context;
public ParsedScrollId(String source, String type, Tuple<String, Long>[] values) {
private final Map<String, String> attributes;
public ParsedScrollId(String source, String type, Tuple<String, Long>[] context, Map<String, String> attributes) {
this.source = source;
this.type = type;
this.values = values;
this.context = context;
this.attributes = attributes;
}
public String source() {
@ -52,7 +57,11 @@ public class ParsedScrollId {
return type;
}
public Tuple<String, Long>[] values() {
return values;
public Tuple<String, Long>[] context() {
return context;
}
public Map<String, String> attributes() {
return this.attributes;
}
}

View File

@ -83,7 +83,7 @@ public class TransportSearchCountAction extends TransportSearchTypeAction {
final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, queryFetchResults, ImmutableMap.<SearchShardTarget, FetchSearchResultProvider>of());
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), queryFetchResults.values());
scrollId = buildScrollId(request.searchType(), queryFetchResults.values(), null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
searchCache.releaseQueryResults(queryFetchResults);

View File

@ -168,7 +168,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), dfsResults);
scrollId = buildScrollId(request.searchType(), dfsResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}

View File

@ -262,7 +262,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults);
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}

View File

@ -27,7 +27,10 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
@ -35,6 +38,7 @@ import org.elasticsearch.search.internal.InternalSearchRequest;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.regex.Pattern;
/**
@ -79,23 +83,32 @@ public abstract class TransportSearchHelper {
return internalRequest;
}
public static String buildScrollId(SearchType searchType, Iterable<? extends SearchPhaseResult> searchPhaseResults) throws IOException {
public static String buildScrollId(SearchType searchType, Collection<? extends SearchPhaseResult> searchPhaseResults, @Nullable Map<String, String> attributes) throws IOException {
if (searchType == SearchType.DFS_QUERY_THEN_FETCH || searchType == SearchType.QUERY_THEN_FETCH) {
return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults);
return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults, attributes);
} else if (searchType == SearchType.QUERY_AND_FETCH || searchType == SearchType.DFS_QUERY_AND_FETCH) {
return buildScrollId(ParsedScrollId.QUERY_AND_FETCH_TYPE, searchPhaseResults);
return buildScrollId(ParsedScrollId.QUERY_AND_FETCH_TYPE, searchPhaseResults, attributes);
} else if (searchType == SearchType.SCAN) {
return buildScrollId(ParsedScrollId.SCAN, searchPhaseResults);
return buildScrollId(ParsedScrollId.SCAN, searchPhaseResults, attributes);
} else {
throw new ElasticSearchIllegalStateException();
}
}
public static String buildScrollId(String type, Iterable<? extends SearchPhaseResult> searchPhaseResults) throws IOException {
public static String buildScrollId(String type, Collection<? extends SearchPhaseResult> searchPhaseResults, @Nullable Map<String, String> attributes) throws IOException {
StringBuilder sb = new StringBuilder().append(type).append(';');
sb.append(searchPhaseResults.size()).append(';');
for (SearchPhaseResult searchPhaseResult : searchPhaseResults) {
sb.append(searchPhaseResult.id()).append(':').append(searchPhaseResult.shardTarget().nodeId()).append(';');
}
if (attributes == null) {
sb.append("0;");
} else {
sb.append(attributes.size()).append(";");
for (Map.Entry<String, String> entry : attributes.entrySet()) {
sb.append(entry.getKey()).append(':').append(entry.getValue()).append(';');
}
}
return Base64.encodeBytes(Unicode.fromStringAsBytes(sb.toString()), Base64.URL_SAFE);
}
@ -106,13 +119,28 @@ public abstract class TransportSearchHelper {
throw new ElasticSearchIllegalArgumentException("Failed to decode scrollId", e);
}
String[] elements = scrollIdPattern.split(scrollId);
@SuppressWarnings({"unchecked"}) Tuple<String, Long>[] values = new Tuple[elements.length - 1];
for (int i = 1; i < elements.length; i++) {
String element = elements[i];
int index = element.indexOf(':');
values[i - 1] = new Tuple<String, Long>(element.substring(index + 1), Long.parseLong(element.substring(0, index)));
int index = 0;
String type = elements[index++];
int contextSize = Integer.parseInt(elements[index++]);
@SuppressWarnings({"unchecked"}) Tuple<String, Long>[] context = new Tuple[contextSize];
for (int i = 0; i < contextSize; i++) {
String element = elements[index++];
int sep = element.indexOf(':');
context[i] = new Tuple<String, Long>(element.substring(sep + 1), Long.parseLong(element.substring(0, sep)));
}
return new ParsedScrollId(scrollId, elements[0], values);
Map<String, String> attributes;
int attributesSize = Integer.parseInt(elements[index++]);
if (attributesSize == 0) {
attributes = ImmutableMap.of();
} else {
attributes = Maps.newHashMapWithExpectedSize(attributesSize);
for (int i = 0; i < attributesSize; i++) {
String element = elements[index++];
int sep = element.indexOf(':');
attributes.put(element.substring(0, sep), element.substring(sep + 1));
}
}
return new ParsedScrollId(scrollId, type, context, attributes);
}
private TransportSearchHelper() {

View File

@ -80,7 +80,7 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), queryFetchResults.values());
scrollId = buildScrollId(request.searchType(), queryFetchResults.values(), null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
searchCache.releaseQueryFetchResults(queryFetchResults);

View File

@ -175,7 +175,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values());
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values(), null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}

View File

@ -80,7 +80,7 @@ public class TransportSearchScanAction extends TransportSearchTypeAction {
final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, queryResults, ImmutableMap.<SearchShardTarget, FetchSearchResultProvider>of());
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), queryResults.values());
scrollId = buildScrollId(request.searchType(), queryResults.values(), null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
searchCache.releaseQueryResults(queryResults);

View File

@ -98,18 +98,18 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.values().length);
this.counter = new AtomicInteger(scrollId.values().length);
this.successfulOps = new AtomicInteger(scrollId.context().length);
this.counter = new AtomicInteger(scrollId.context().length);
}
public void start() {
if (scrollId.values().length == 0) {
if (scrollId.context().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null));
return;
}
int localOperations = 0;
for (Tuple<String, Long> target : scrollId.values()) {
for (Tuple<String, Long> target : scrollId.context()) {
DiscoveryNode node = nodes.get(target.v1());
if (node != null) {
if (nodes.localNodeId().equals(node.id())) {
@ -132,7 +132,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (Tuple<String, Long> target : scrollId.values()) {
for (Tuple<String, Long> target : scrollId.context()) {
DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executePhase(node, target.v2());
@ -142,7 +142,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final Tuple<String, Long> target : scrollId.values()) {
for (final Tuple<String, Long> target : scrollId.context()) {
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
@ -159,7 +159,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
}
}
for (Tuple<String, Long> target : scrollId.values()) {
for (Tuple<String, Long> target : scrollId.context()) {
DiscoveryNode node = nodes.get(target.v1());
if (node == null) {
if (logger.isDebugEnabled()) {
@ -212,7 +212,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
scrollId = request.scrollId();
}
searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(),
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache)));
}
}

View File

@ -104,18 +104,18 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.values().length);
this.successfulOps = new AtomicInteger(scrollId.context().length);
}
public void start() {
if (scrollId.values().length == 0) {
if (scrollId.context().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null));
return;
}
final AtomicInteger counter = new AtomicInteger(scrollId.values().length);
final AtomicInteger counter = new AtomicInteger(scrollId.context().length);
int localOperations = 0;
for (Tuple<String, Long> target : scrollId.values()) {
for (Tuple<String, Long> target : scrollId.context()) {
DiscoveryNode node = nodes.get(target.v1());
if (node != null) {
if (nodes.localNodeId().equals(node.id())) {
@ -138,7 +138,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (Tuple<String, Long> target : scrollId.values()) {
for (Tuple<String, Long> target : scrollId.context()) {
DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executeQueryPhase(counter, node, target.v2());
@ -148,7 +148,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final Tuple<String, Long> target : scrollId.values()) {
for (final Tuple<String, Long> target : scrollId.context()) {
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
@ -239,7 +239,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
if (request.scroll() != null) {
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(),
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache)));
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);

View File

@ -102,12 +102,12 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.values().length);
this.counter = new AtomicInteger(scrollId.values().length);
this.successfulOps = new AtomicInteger(scrollId.context().length);
this.counter = new AtomicInteger(scrollId.context().length);
}
public void start() {
if (scrollId.values().length == 0) {
if (scrollId.context().length == 0) {
final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, 0, 0.0f), null, false);
searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, TransportSearchHelper.buildShardFailures(shardFailures, searchCache)));
@ -115,7 +115,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
}
int localOperations = 0;
for (Tuple<String, Long> target : scrollId.values()) {
for (Tuple<String, Long> target : scrollId.context()) {
DiscoveryNode node = nodes.get(target.v1());
if (node != null) {
if (nodes.localNodeId().equals(node.id())) {
@ -138,7 +138,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (Tuple<String, Long> target : scrollId.values()) {
for (Tuple<String, Long> target : scrollId.context()) {
DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
executePhase(node, target.v2());
@ -148,7 +148,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final Tuple<String, Long> target : scrollId.values()) {
for (final Tuple<String, Long> target : scrollId.context()) {
final DiscoveryNode node = nodes.get(target.v1());
if (node != null && nodes.localNodeId().equals(node.id())) {
if (localAsync) {
@ -165,7 +165,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
}
}
for (Tuple<String, Long> target : scrollId.values()) {
for (Tuple<String, Long> target : scrollId.context()) {
DiscoveryNode node = nodes.get(target.v1());
if (node == null) {
if (logger.isDebugEnabled()) {
@ -236,10 +236,10 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
String scrollId = null;
if (request.scroll() != null) {
// we rebuild the scroll id since we remove shards that we finished scrolling on
scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values());
scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values(), null);
}
searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(),
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache)));
}
}