From 52c750fc42adc3f7318581984f4693f3f8f73685 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sat, 19 Mar 2011 21:51:18 +0200 Subject: [PATCH] internal support for custom attributes in scroll id --- .../action/search/type/ParsedScrollId.java | 19 +++++-- .../type/TransportSearchCountAction.java | 2 +- ...TransportSearchDfsQueryAndFetchAction.java | 2 +- ...ransportSearchDfsQueryThenFetchAction.java | 2 +- .../search/type/TransportSearchHelper.java | 50 +++++++++++++++---- .../TransportSearchQueryAndFetchAction.java | 2 +- .../TransportSearchQueryThenFetchAction.java | 2 +- .../type/TransportSearchScanAction.java | 2 +- ...nsportSearchScrollQueryAndFetchAction.java | 16 +++--- ...sportSearchScrollQueryThenFetchAction.java | 14 +++--- .../type/TransportSearchScrollScanAction.java | 18 +++---- 11 files changed, 83 insertions(+), 46 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java index e943eaedf90..a20dbcd95a9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java @@ -21,6 +21,8 @@ package org.elasticsearch.action.search.type; import org.elasticsearch.common.collect.Tuple; +import java.util.Map; + /** * @author kimchy (shay.banon) */ @@ -36,12 +38,15 @@ public class ParsedScrollId { private final String type; - private final Tuple[] values; + private final Tuple[] context; - public ParsedScrollId(String source, String type, Tuple[] values) { + private final Map attributes; + + public ParsedScrollId(String source, String type, Tuple[] context, Map attributes) { this.source = source; this.type = type; - this.values = values; + this.context = context; + this.attributes = attributes; } public String source() { @@ -52,7 +57,11 @@ public class ParsedScrollId { return type; } - public Tuple[] values() { - return values; + public Tuple[] context() { + return context; + } + + public Map attributes() { + return this.attributes; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java index a234fcde1ef..7e7ff7b40f8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java @@ -83,7 +83,7 @@ public class TransportSearchCountAction extends TransportSearchTypeAction { final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, queryFetchResults, ImmutableMap.of()); String scrollId = null; if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), queryFetchResults.values()); + scrollId = buildScrollId(request.searchType(), queryFetchResults.values(), null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); searchCache.releaseQueryResults(queryFetchResults); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index 406167b95b7..ed7005ee53d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -168,7 +168,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); String scrollId = null; if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), dfsResults); + scrollId = buildScrollId(request.searchType(), dfsResults, null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 4e3343a8902..9c807324c75 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -262,7 +262,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); String scrollId = null; if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults); + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults, null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java index 570f13a0f27..90e404748dd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java @@ -27,7 +27,10 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Base64; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Unicode; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.internal.InternalScrollSearchRequest; @@ -35,6 +38,7 @@ import org.elasticsearch.search.internal.InternalSearchRequest; import java.io.IOException; import java.util.Collection; +import java.util.Map; import java.util.regex.Pattern; /** @@ -79,23 +83,32 @@ public abstract class TransportSearchHelper { return internalRequest; } - public static String buildScrollId(SearchType searchType, Iterable searchPhaseResults) throws IOException { + public static String buildScrollId(SearchType searchType, Collection searchPhaseResults, @Nullable Map attributes) throws IOException { if (searchType == SearchType.DFS_QUERY_THEN_FETCH || searchType == SearchType.QUERY_THEN_FETCH) { - return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults); + return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults, attributes); } else if (searchType == SearchType.QUERY_AND_FETCH || searchType == SearchType.DFS_QUERY_AND_FETCH) { - return buildScrollId(ParsedScrollId.QUERY_AND_FETCH_TYPE, searchPhaseResults); + return buildScrollId(ParsedScrollId.QUERY_AND_FETCH_TYPE, searchPhaseResults, attributes); } else if (searchType == SearchType.SCAN) { - return buildScrollId(ParsedScrollId.SCAN, searchPhaseResults); + return buildScrollId(ParsedScrollId.SCAN, searchPhaseResults, attributes); } else { throw new ElasticSearchIllegalStateException(); } } - public static String buildScrollId(String type, Iterable searchPhaseResults) throws IOException { + public static String buildScrollId(String type, Collection searchPhaseResults, @Nullable Map attributes) throws IOException { StringBuilder sb = new StringBuilder().append(type).append(';'); + sb.append(searchPhaseResults.size()).append(';'); for (SearchPhaseResult searchPhaseResult : searchPhaseResults) { sb.append(searchPhaseResult.id()).append(':').append(searchPhaseResult.shardTarget().nodeId()).append(';'); } + if (attributes == null) { + sb.append("0;"); + } else { + sb.append(attributes.size()).append(";"); + for (Map.Entry entry : attributes.entrySet()) { + sb.append(entry.getKey()).append(':').append(entry.getValue()).append(';'); + } + } return Base64.encodeBytes(Unicode.fromStringAsBytes(sb.toString()), Base64.URL_SAFE); } @@ -106,13 +119,28 @@ public abstract class TransportSearchHelper { throw new ElasticSearchIllegalArgumentException("Failed to decode scrollId", e); } String[] elements = scrollIdPattern.split(scrollId); - @SuppressWarnings({"unchecked"}) Tuple[] values = new Tuple[elements.length - 1]; - for (int i = 1; i < elements.length; i++) { - String element = elements[i]; - int index = element.indexOf(':'); - values[i - 1] = new Tuple(element.substring(index + 1), Long.parseLong(element.substring(0, index))); + int index = 0; + String type = elements[index++]; + int contextSize = Integer.parseInt(elements[index++]); + @SuppressWarnings({"unchecked"}) Tuple[] context = new Tuple[contextSize]; + for (int i = 0; i < contextSize; i++) { + String element = elements[index++]; + int sep = element.indexOf(':'); + context[i] = new Tuple(element.substring(sep + 1), Long.parseLong(element.substring(0, sep))); } - return new ParsedScrollId(scrollId, elements[0], values); + Map attributes; + int attributesSize = Integer.parseInt(elements[index++]); + if (attributesSize == 0) { + attributes = ImmutableMap.of(); + } else { + attributes = Maps.newHashMapWithExpectedSize(attributesSize); + for (int i = 0; i < attributesSize; i++) { + String element = elements[index++]; + int sep = element.indexOf(':'); + attributes.put(element.substring(0, sep), element.substring(sep + 1)); + } + } + return new ParsedScrollId(scrollId, type, context, attributes); } private TransportSearchHelper() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java index e9f5506accc..1352302190f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java @@ -80,7 +80,7 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); String scrollId = null; if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), queryFetchResults.values()); + scrollId = buildScrollId(request.searchType(), queryFetchResults.values(), null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); searchCache.releaseQueryFetchResults(queryFetchResults); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index a7b8e52fd4d..eeb96b87e41 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -175,7 +175,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); String scrollId = null; if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values()); + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values(), null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java index 41ecbae4311..42758cdbfc7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java @@ -80,7 +80,7 @@ public class TransportSearchScanAction extends TransportSearchTypeAction { final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, queryResults, ImmutableMap.of()); String scrollId = null; if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), queryResults.values()); + scrollId = buildScrollId(request.searchType(), queryResults.values(), null); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); searchCache.releaseQueryResults(queryResults); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index 62139431525..596eb4ddf4d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -98,18 +98,18 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent this.listener = listener; this.scrollId = scrollId; this.nodes = clusterService.state().nodes(); - this.successfulOps = new AtomicInteger(scrollId.values().length); - this.counter = new AtomicInteger(scrollId.values().length); + this.successfulOps = new AtomicInteger(scrollId.context().length); + this.counter = new AtomicInteger(scrollId.context().length); } public void start() { - if (scrollId.values().length == 0) { + if (scrollId.context().length == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); return; } int localOperations = 0; - for (Tuple target : scrollId.values()) { + for (Tuple target : scrollId.context()) { DiscoveryNode node = nodes.get(target.v1()); if (node != null) { if (nodes.localNodeId().equals(node.id())) { @@ -132,7 +132,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - for (Tuple target : scrollId.values()) { + for (Tuple target : scrollId.context()) { DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { executePhase(node, target.v2()); @@ -142,7 +142,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent }); } else { boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final Tuple target : scrollId.values()) { + for (final Tuple target : scrollId.context()) { final DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { if (localAsync) { @@ -159,7 +159,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent } } - for (Tuple target : scrollId.values()) { + for (Tuple target : scrollId.context()) { DiscoveryNode node = nodes.get(target.v1()); if (node == null) { if (logger.isDebugEnabled()) { @@ -212,7 +212,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent scrollId = request.scrollId(); } searchCache.releaseQueryFetchResults(queryFetchResults); - listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(), + listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index 074f2e718f4..f76cf0d1ed2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -104,18 +104,18 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent this.listener = listener; this.scrollId = scrollId; this.nodes = clusterService.state().nodes(); - this.successfulOps = new AtomicInteger(scrollId.values().length); + this.successfulOps = new AtomicInteger(scrollId.context().length); } public void start() { - if (scrollId.values().length == 0) { + if (scrollId.context().length == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); return; } - final AtomicInteger counter = new AtomicInteger(scrollId.values().length); + final AtomicInteger counter = new AtomicInteger(scrollId.context().length); int localOperations = 0; - for (Tuple target : scrollId.values()) { + for (Tuple target : scrollId.context()) { DiscoveryNode node = nodes.get(target.v1()); if (node != null) { if (nodes.localNodeId().equals(node.id())) { @@ -138,7 +138,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - for (Tuple target : scrollId.values()) { + for (Tuple target : scrollId.context()) { DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { executeQueryPhase(counter, node, target.v2()); @@ -148,7 +148,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent }); } else { boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final Tuple target : scrollId.values()) { + for (final Tuple target : scrollId.context()) { final DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { if (localAsync) { @@ -239,7 +239,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent if (request.scroll() != null) { scrollId = request.scrollId(); } - listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(), + listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); searchCache.releaseQueryResults(queryResults); searchCache.releaseFetchResults(fetchResults); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index 63eaea44c7b..9cccbf3665b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -102,12 +102,12 @@ public class TransportSearchScrollScanAction extends AbstractComponent { this.listener = listener; this.scrollId = scrollId; this.nodes = clusterService.state().nodes(); - this.successfulOps = new AtomicInteger(scrollId.values().length); - this.counter = new AtomicInteger(scrollId.values().length); + this.successfulOps = new AtomicInteger(scrollId.context().length); + this.counter = new AtomicInteger(scrollId.context().length); } public void start() { - if (scrollId.values().length == 0) { + if (scrollId.context().length == 0) { final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, 0, 0.0f), null, false); searchCache.releaseQueryFetchResults(queryFetchResults); listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, TransportSearchHelper.buildShardFailures(shardFailures, searchCache))); @@ -115,7 +115,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { } int localOperations = 0; - for (Tuple target : scrollId.values()) { + for (Tuple target : scrollId.context()) { DiscoveryNode node = nodes.get(target.v1()); if (node != null) { if (nodes.localNodeId().equals(node.id())) { @@ -138,7 +138,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { - for (Tuple target : scrollId.values()) { + for (Tuple target : scrollId.context()) { DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { executePhase(node, target.v2()); @@ -148,7 +148,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { }); } else { boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final Tuple target : scrollId.values()) { + for (final Tuple target : scrollId.context()) { final DiscoveryNode node = nodes.get(target.v1()); if (node != null && nodes.localNodeId().equals(node.id())) { if (localAsync) { @@ -165,7 +165,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { } } - for (Tuple target : scrollId.values()) { + for (Tuple target : scrollId.context()) { DiscoveryNode node = nodes.get(target.v1()); if (node == null) { if (logger.isDebugEnabled()) { @@ -236,10 +236,10 @@ public class TransportSearchScrollScanAction extends AbstractComponent { String scrollId = null; if (request.scroll() != null) { // we rebuild the scroll id since we remove shards that we finished scrolling on - scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values()); + scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values(), null); } searchCache.releaseQueryFetchResults(queryFetchResults); - listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(), + listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); } }