Don't carry remoteUUIDs around, simplify remote alias filter handling

Also added explanation of when and why alias filters don't work at the moment
This commit is contained in:
javanna 2016-12-02 00:20:48 +01:00 committed by Luca Cavanna
parent 32eeaef6cf
commit 8cb2cec0c2
1 changed files with 28 additions and 34 deletions

View File

@ -88,8 +88,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
private Map<String, AliasFilter> buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState,
Index[] concreteIndices, String[] remoteUUIDs,
Map<String, AliasFilter> remoteAliasMap) {
Index[] concreteIndices, Map<String, AliasFilter> remoteAliasMap) {
final Map<String, AliasFilter> aliasFilterMap = new HashMap<>();
for (Index index : concreteIndices) {
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index.getName());
@ -97,13 +96,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
assert aliasFilter != null;
aliasFilterMap.put(index.getUUID(), aliasFilter);
}
//TODO this is just a temporary workaround, alias filters need to be retrieved, at the moment they are ignored for remote indices
//they will be retrieved from search_shards from 5.1 on.
for (String remoteUUID : remoteUUIDs) {
// TODO does this map to be dense? Can't we handle this downstream?
AliasFilter orDefault = remoteAliasMap.getOrDefault(remoteUUID, new AliasFilter(null, Strings.EMPTY_ARRAY));
aliasFilterMap.put(remoteUUID, remoteAliasMap.getOrDefault(remoteUUID, new AliasFilter(null, Strings.EMPTY_ARRAY)));
}
aliasFilterMap.putAll(remoteAliasMap);
return aliasFilterMap;
}
@ -141,25 +134,24 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
if (remoteIndicesByCluster.isEmpty()) {
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices,
Strings.EMPTY_ARRAY, Collections.emptyList(), Collections.emptySet(), Collections.emptyMap(), listener);
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, Collections.emptyList(),
Collections.emptySet(), Collections.emptyMap(), listener);
} else {
// nocommit we have to extract this logic to add unittests ideally with manually prepared searchShardsResponses etc.
searchTransportService.sendSearchShards(searchRequest, remoteIndicesByCluster,
ActionListener.wrap((searchShardsResponses) -> {
List<ShardIterator> remoteShardIterators = new ArrayList<>();
Set<DiscoveryNode> remoteNodes = new HashSet<>();
Set<String> remoteUUIDs = new HashSet<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
processRemoteShards(searchShardsResponses, remoteShardIterators, remoteNodes, remoteUUIDs, remoteAliasFilters);
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices,
remoteUUIDs.toArray(new String[remoteUUIDs.size()]), remoteShardIterators, remoteNodes, remoteAliasFilters, listener);
processRemoteShards(searchShardsResponses, remoteShardIterators, remoteNodes, remoteAliasFilters);
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, remoteShardIterators,
remoteNodes, remoteAliasFilters, listener);
}, listener::onFailure));
}
}
private void processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
List<ShardIterator> remoteShardIterators, Set<DiscoveryNode> remoteNodes, Set<String> remoteUUIDs,
List<ShardIterator> remoteShardIterators, Set<DiscoveryNode> remoteNodes,
Map<String, AliasFilter> aliasFilterMap) {
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
String clusterName = entry.getKey();
@ -169,29 +161,31 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
//add the cluster name to the remote index names for indices disambiguation
//this ends up in the hits returned with the search response
ShardId sid = clusterSearchShardsGroup.getShardId();
Index index = new Index(clusterName + REMOTE_CLUSTER_INDEX_SEPARATOR +
sid.getIndex().getName(),
sid.getIndex().getUUID());
ShardIterator shardIterator = new PlainShardIterator(new ShardId(index, sid.getId()),
ShardId shardId = clusterSearchShardsGroup.getShardId();
Index index = new Index(clusterName + REMOTE_CLUSTER_INDEX_SEPARATOR + shardId.getIndex().getName(),
shardId.getIndex().getUUID());
ShardIterator shardIterator = new PlainShardIterator(new ShardId(index, shardId.getId()),
Arrays.asList(clusterSearchShardsGroup.getShards()));
remoteShardIterators.add(shardIterator);
remoteUUIDs.add(sid.getIndex().getUUID());
if (indicesAndFilters != null) {
AliasFilter aliasFilter = indicesAndFilters.get(sid.getIndexName());
if (aliasFilter != null) {
// here we have to map the filters to the UUID since from now on we use the uuid for the lookup
aliasFilterMap.put(sid.getIndex().getUUID(), aliasFilter);
}
AliasFilter aliasFilter;
if (indicesAndFilters == null) {
//TODO this section is returned only by 5.1+ nodes. With 5.0.x nodes we should rather retrieve the alias filters
//using another api. What we do now causes the remote alias filters to be ignored whenever the node that we
//called search shards against was on 5.0.x.
aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY);
} else {
aliasFilter = indicesAndFilters.get(shardId.getIndexName());
assert aliasFilter != null;
}
// here we have to map the filters to the UUID since from now on we use the uuid for the lookup
aliasFilterMap.put(shardId.getIndex().getUUID(), aliasFilter);
}
}
}
private void executeSearch(SearchTask task, long startTimeInMillis, SearchRequest searchRequest, String[] localIndices,
String[] remoteUUIDs, List<ShardIterator> remoteShardIterators, Set<DiscoveryNode> remoteNodes,
Map<String, AliasFilter> remoteAliasMap,
ActionListener<SearchResponse> listener) {
List<ShardIterator> remoteShardIterators, Set<DiscoveryNode> remoteNodes,
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener) {
ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
@ -199,13 +193,13 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
final Index[] indices;
if (localIndices.length == 0 && remoteUUIDs.length > 0) {
indices = new Index[0]; // don't search on ALL if nothing is specified
if (localIndices.length == 0 && remoteShardIterators.size() > 0) {
indices = new Index[0]; // don't search on _all if only remote indices were specified
} else {
indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
startTimeInMillis, localIndices);
}
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteUUIDs, remoteAliasMap);
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
String[] concreteIndices = new String[indices.length];