Remove DFS support from TermVector API

Retrieving distributed DF for TermVectors is beside it's esotheric justification
a very slow process and can cause serious load on the cluster. We also don't have nearly
enough testing for this stuff and given the complexity we should remove it rather than carrying it
around.
This commit is contained in:
Simon Willnauer 2016-02-04 16:20:24 +01:00
parent 15507580bb
commit 450ee70038
14 changed files with 5 additions and 636 deletions

View File

@ -190,14 +190,11 @@ import org.elasticsearch.action.termvectors.TermVectorsAction;
import org.elasticsearch.action.termvectors.TransportMultiTermVectorsAction; import org.elasticsearch.action.termvectors.TransportMultiTermVectorsAction;
import org.elasticsearch.action.termvectors.TransportShardMultiTermsVectorAction; import org.elasticsearch.action.termvectors.TransportShardMultiTermsVectorAction;
import org.elasticsearch.action.termvectors.TransportTermVectorsAction; import org.elasticsearch.action.termvectors.TransportTermVectorsAction;
import org.elasticsearch.action.termvectors.dfs.TransportDfsOnlyAction;
import org.elasticsearch.action.update.TransportUpdateAction; import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateAction; import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder; import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.NodeModule;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -323,8 +320,7 @@ public class ActionModule extends AbstractModule {
registerAction(IndexAction.INSTANCE, TransportIndexAction.class); registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
registerAction(GetAction.INSTANCE, TransportGetAction.class); registerAction(GetAction.INSTANCE, TransportGetAction.class);
registerAction(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class, registerAction(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class);
TransportDfsOnlyAction.class);
registerAction(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class, registerAction(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
TransportShardMultiTermsVectorAction.class); TransportShardMultiTermsVectorAction.class);
registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class); registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class);

View File

@ -373,22 +373,6 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
return this; return this;
} }
/**
* @return <code>true</code> if distributed frequencies should be returned. Otherwise
* <code>false</code>
*/
public boolean dfs() {
return flagsEnum.contains(Flag.Dfs);
}
/**
* Use distributed frequencies instead of shard statistics.
*/
public TermVectorsRequest dfs(boolean dfs) {
setFlag(Flag.Dfs, dfs);
return this;
}
/** /**
* Return only term vectors for special selected fields. Returns for term * Return only term vectors for special selected fields. Returns for term
* vectors for all fields if selectedFields == null * vectors for all fields if selectedFields == null
@ -583,7 +567,7 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
public static enum Flag { public static enum Flag {
// Do not change the order of these flags we use // Do not change the order of these flags we use
// the ordinal for encoding! Only append to the end! // the ordinal for encoding! Only append to the end!
Positions, Offsets, Payloads, FieldStatistics, TermStatistics, Dfs Positions, Offsets, Payloads, FieldStatistics, TermStatistics
} }
/** /**
@ -616,7 +600,7 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
} else if (currentFieldName.equals("field_statistics") || currentFieldName.equals("fieldStatistics")) { } else if (currentFieldName.equals("field_statistics") || currentFieldName.equals("fieldStatistics")) {
termVectorsRequest.fieldStatistics(parser.booleanValue()); termVectorsRequest.fieldStatistics(parser.booleanValue());
} else if (currentFieldName.equals("dfs")) { } else if (currentFieldName.equals("dfs")) {
termVectorsRequest.dfs(parser.booleanValue()); throw new IllegalArgumentException("distributed frequencies is not supported anymore for term vectors");
} else if (currentFieldName.equals("per_field_analyzer") || currentFieldName.equals("perFieldAnalyzer")) { } else if (currentFieldName.equals("per_field_analyzer") || currentFieldName.equals("perFieldAnalyzer")) {
termVectorsRequest.perFieldAnalyzer(readPerFieldAnalyzer(parser.map())); termVectorsRequest.perFieldAnalyzer(readPerFieldAnalyzer(parser.map()));
} else if (currentFieldName.equals("filter")) { } else if (currentFieldName.equals("filter")) {

View File

@ -149,14 +149,6 @@ public class TermVectorsRequestBuilder extends ActionRequestBuilder<TermVectorsR
return this; return this;
} }
/**
* Sets whether to use distributed frequencies instead of shard statistics.
*/
public TermVectorsRequestBuilder setDfs(boolean dfs) {
request.dfs(dfs);
return this;
}
/** /**
* Sets whether to return only term vectors for special selected fields. Returns the term * Sets whether to return only term vectors for special selected fields. Returns the term
* vectors for all fields if selectedFields == null * vectors for all fields if selectedFields == null

View File

@ -1,112 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.termvectors.dfs;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
public class DfsOnlyRequest extends BroadcastRequest<DfsOnlyRequest> {
private SearchRequest searchRequest = new SearchRequest();
long nowInMillis;
public DfsOnlyRequest() {
}
public DfsOnlyRequest(Fields termVectorsFields, String[] indices, String[] types, Set<String> selectedFields) throws IOException {
super(indices);
// build a search request with a query of all the terms
final BoolQueryBuilder boolBuilder = boolQuery();
for (String fieldName : termVectorsFields) {
if ((selectedFields != null) && (!selectedFields.contains(fieldName))) {
continue;
}
Terms terms = termVectorsFields.terms(fieldName);
TermsEnum iterator = terms.iterator();
while (iterator.next() != null) {
String text = iterator.term().utf8ToString();
boolBuilder.should(QueryBuilders.termQuery(fieldName, text));
}
}
// wrap a search request object
this.searchRequest = new SearchRequest(indices).types(types).source(new SearchSourceBuilder().query(boolBuilder));
}
public SearchRequest getSearchRequest() {
return searchRequest;
}
@Override
public ActionRequestValidationException validate() {
return searchRequest.validate();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.searchRequest.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
this.searchRequest.writeTo(out);
}
public String[] types() {
return this.searchRequest.types();
}
public String routing() {
return this.searchRequest.routing();
}
public String preference() {
return this.searchRequest.preference();
}
@Override
public String toString() {
String sSource = "_na_";
if (searchRequest.source() != null) {
sSource = searchRequest.source().toString();
}
return "[" + Arrays.toString(indices) + "]" + Arrays.toString(types()) + ", source[" + sSource + "]";
}
}

View File

@ -1,73 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.termvectors.dfs;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.dfs.AggregatedDfs;
import java.io.IOException;
import java.util.List;
/**
* A response of a dfs only request.
*/
public class DfsOnlyResponse extends BroadcastResponse {
private AggregatedDfs dfs;
private long tookInMillis;
DfsOnlyResponse(AggregatedDfs dfs, int totalShards, int successfulShards, int failedShards,
List<ShardOperationFailedException> shardFailures, long tookInMillis) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.dfs = dfs;
this.tookInMillis = tookInMillis;
}
public AggregatedDfs getDfs() {
return dfs;
}
public TimeValue getTook() {
return new TimeValue(tookInMillis);
}
public long getTookInMillis() {
return tookInMillis;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
AggregatedDfs.readAggregatedDfs(in);
tookInMillis = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
dfs.writeTo(out);
out.writeVLong(tookInMillis);
}
}

View File

@ -1,62 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.termvectors.dfs;
import org.elasticsearch.action.support.broadcast.BroadcastShardRequest;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import java.io.IOException;
public class ShardDfsOnlyRequest extends BroadcastShardRequest {
private ShardSearchTransportRequest shardSearchRequest = new ShardSearchTransportRequest();
public ShardDfsOnlyRequest() {
}
ShardDfsOnlyRequest(ShardRouting shardRouting, int numberOfShards, @Nullable String[] filteringAliases, long nowInMillis, DfsOnlyRequest request) {
super(shardRouting.shardId(), request);
this.shardSearchRequest = new ShardSearchTransportRequest(request.getSearchRequest(), shardRouting, numberOfShards,
filteringAliases, nowInMillis);
}
public ShardSearchRequest getShardSearchRequest() {
return shardSearchRequest;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardSearchRequest.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardSearchRequest.writeTo(out);
}
}

View File

@ -1,62 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.termvectors.dfs;
import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.dfs.DfsSearchResult;
import java.io.IOException;
/**
*
*/
class ShardDfsOnlyResponse extends BroadcastShardResponse {
private DfsSearchResult dfsSearchResult = new DfsSearchResult();
ShardDfsOnlyResponse() {
}
ShardDfsOnlyResponse(ShardId shardId, DfsSearchResult dfsSearchResult) {
super(shardId);
this.dfsSearchResult = dfsSearchResult;
}
public DfsSearchResult getDfsSearchResult() {
return dfsSearchResult;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
dfsSearchResult.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
dfsSearchResult.writeTo(out);
}
}

View File

@ -1,146 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.termvectors.dfs;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* Get the dfs only with no fetch phase. This is for internal use only.
*/
public class TransportDfsOnlyAction extends TransportBroadcastAction<DfsOnlyRequest, DfsOnlyResponse, ShardDfsOnlyRequest, ShardDfsOnlyResponse> {
public static final String NAME = "internal:index/termvectors/dfs";
private final SearchService searchService;
private final SearchPhaseController searchPhaseController;
@Inject
public TransportDfsOnlyAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, SearchService searchService, SearchPhaseController searchPhaseController) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
DfsOnlyRequest::new, ShardDfsOnlyRequest::new, ThreadPool.Names.SEARCH);
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
@Override
protected void doExecute(Task task, DfsOnlyRequest request, ActionListener<DfsOnlyResponse> listener) {
request.nowInMillis = System.currentTimeMillis();
super.doExecute(task, request, listener);
}
@Override
protected ShardDfsOnlyRequest newShardRequest(int numShards, ShardRouting shard, DfsOnlyRequest request) {
String[] filteringAliases = indexNameExpressionResolver.filteringAliases(clusterService.state(), shard.index().getName(), request.indices());
return new ShardDfsOnlyRequest(shard, numShards, filteringAliases, request.nowInMillis, request);
}
@Override
protected ShardDfsOnlyResponse newShardResponse() {
return new ShardDfsOnlyResponse();
}
@Override
protected GroupShardsIterator shards(ClusterState clusterState, DfsOnlyRequest request, String[] concreteIndices) {
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, request.routing(), request.indices());
return clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference());
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, DfsOnlyRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, DfsOnlyRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices);
}
@Override
protected DfsOnlyResponse newResponse(DfsOnlyRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
AtomicArray<DfsSearchResult> dfsResults = new AtomicArray<>(shardsResponses.length());
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = new ArrayList<>();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
dfsResults.set(i, ((ShardDfsOnlyResponse) shardResponse).getDfsSearchResult());
successfulShards++;
}
}
AggregatedDfs dfs = searchPhaseController.aggregateDfs(dfsResults);
return new DfsOnlyResponse(dfs, shardsResponses.length(), successfulShards, failedShards, shardFailures, buildTookInMillis(request));
}
@Override
protected ShardDfsOnlyResponse shardOperation(ShardDfsOnlyRequest request) {
DfsSearchResult dfsSearchResult = searchService.executeDfsPhase(request.getShardSearchRequest());
searchService.freeContext(dfsSearchResult.id());
return new ShardDfsOnlyResponse(request.shardId(), dfsSearchResult);
}
/**
* Builds how long it took to execute the dfs request.
*/
protected final long buildTookInMillis(DfsOnlyRequest request) {
// protect ourselves against time going backwards
// negative values don't make sense and we want to be able to serialize that thing as a vLong
return Math.max(1, System.currentTimeMillis() - request.nowInMillis);
}
}

View File

@ -1,23 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Distributed frequencies.
*/
package org.elasticsearch.action.termvectors.dfs;

View File

@ -287,8 +287,7 @@ public class MoreLikeThisQueryBuilder extends AbstractQueryBuilder<MoreLikeThisQ
.offsets(false) .offsets(false)
.payloads(false) .payloads(false)
.fieldStatistics(false) .fieldStatistics(false)
.termStatistics(false) .termStatistics(false);
.dfs(false);
// for artificial docs to make sure that the id has changed in the item too // for artificial docs to make sure that the id has changed in the item too
if (doc != null) { if (doc != null) {
termVectorsRequest.doc(doc, true); termVectorsRequest.doc(doc, true);

View File

@ -31,14 +31,9 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.termvectors.TermVectorsFilter; import org.elasticsearch.action.termvectors.TermVectorsFilter;
import org.elasticsearch.action.termvectors.TermVectorsRequest; import org.elasticsearch.action.termvectors.TermVectorsRequest;
import org.elasticsearch.action.termvectors.TermVectorsResponse; import org.elasticsearch.action.termvectors.TermVectorsResponse;
import org.elasticsearch.action.termvectors.dfs.DfsOnlyRequest;
import org.elasticsearch.action.termvectors.dfs.DfsOnlyResponse;
import org.elasticsearch.action.termvectors.dfs.TransportDfsOnlyAction;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetField; import org.elasticsearch.index.get.GetField;
@ -71,14 +66,6 @@ import static org.elasticsearch.index.mapper.SourceToParse.source;
public class TermVectorsService { public class TermVectorsService {
private final TransportDfsOnlyAction dfsAction;
@Inject
public TermVectorsService(TransportDfsOnlyAction dfsAction) {
this.dfsAction = dfsAction;
}
public TermVectorsResponse getTermVectors(IndexShard indexShard, TermVectorsRequest request) { public TermVectorsResponse getTermVectors(IndexShard indexShard, TermVectorsRequest request) {
final TermVectorsResponse termVectorsResponse = new TermVectorsResponse(indexShard.shardId().getIndex().getName(), request.type(), request.id()); final TermVectorsResponse termVectorsResponse = new TermVectorsResponse(indexShard.shardId().getIndex().getName(), request.type(), request.id());
final Term uidTerm = new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(request.type(), request.id())); final Term uidTerm = new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(request.type(), request.id()));
@ -137,10 +124,6 @@ public class TermVectorsService {
} }
/* if there are term vectors, optional compute dfs and/or terms filtering */ /* if there are term vectors, optional compute dfs and/or terms filtering */
if (termVectorsByField != null) { if (termVectorsByField != null) {
if (useDfs(request)) {
dfs = getAggregatedDfs(termVectorsByField, request);
}
if (request.filterSettings() != null) { if (request.filterSettings() != null) {
termVectorsFilter = new TermVectorsFilter(termVectorsByField, topLevelFields, request.selectedFields(), dfs); termVectorsFilter = new TermVectorsFilter(termVectorsByField, topLevelFields, request.selectedFields(), dfs);
termVectorsFilter.setSettings(request.filterSettings()); termVectorsFilter.setSettings(request.filterSettings());
@ -346,14 +329,4 @@ public class TermVectorsService {
} }
} }
private boolean useDfs(TermVectorsRequest request) {
return request.dfs() && (request.fieldStatistics() || request.termStatistics());
}
private AggregatedDfs getAggregatedDfs(Fields termVectorsFields, TermVectorsRequest request) throws IOException {
DfsOnlyRequest dfsOnlyRequest = new DfsOnlyRequest(termVectorsFields, new String[]{request.index()},
new String[]{request.type()}, request.selectedFields());
DfsOnlyResponse response = dfsAction.execute(dfsOnlyRequest).actionGet();
return response.getDfs();
}
} }

View File

@ -91,7 +91,6 @@ public class RestTermVectorsAction extends BaseRestHandler {
termVectorsRequest.termStatistics(request.paramAsBoolean("term_statistics", termVectorsRequest.termStatistics())); termVectorsRequest.termStatistics(request.paramAsBoolean("term_statistics", termVectorsRequest.termStatistics()));
termVectorsRequest.fieldStatistics(request.paramAsBoolean("fieldStatistics", termVectorsRequest.fieldStatistics())); termVectorsRequest.fieldStatistics(request.paramAsBoolean("fieldStatistics", termVectorsRequest.fieldStatistics()));
termVectorsRequest.fieldStatistics(request.paramAsBoolean("field_statistics", termVectorsRequest.fieldStatistics())); termVectorsRequest.fieldStatistics(request.paramAsBoolean("field_statistics", termVectorsRequest.fieldStatistics()));
termVectorsRequest.dfs(request.paramAsBoolean("dfs", termVectorsRequest.dfs()));
} }
static public void addFieldStringsFromParameter(TermVectorsRequest termVectorsRequest, String fields) { static public void addFieldStringsFromParameter(TermVectorsRequest termVectorsRequest, String fields) {

View File

@ -134,8 +134,7 @@ public class GetTermVectorsIT extends AbstractTermVectorsTestCase {
ActionFuture<TermVectorsResponse> termVectors = client().termVectors(new TermVectorsRequest(indexOrAlias(), "type1", "0") ActionFuture<TermVectorsResponse> termVectors = client().termVectors(new TermVectorsRequest(indexOrAlias(), "type1", "0")
.selectedFields(randomBoolean() ? new String[]{"existingfield"} : null) .selectedFields(randomBoolean() ? new String[]{"existingfield"} : null)
.termStatistics(true) .termStatistics(true)
.fieldStatistics(true) .fieldStatistics(true));
.dfs(true));
// lets see if the null term vectors are caught... // lets see if the null term vectors are caught...
TermVectorsResponse actionGet = termVectors.actionGet(); TermVectorsResponse actionGet = termVectors.actionGet();
@ -966,95 +965,6 @@ public class GetTermVectorsIT extends AbstractTermVectorsTestCase {
return randomBoolean() ? "test" : "alias"; return randomBoolean() ? "test" : "alias";
} }
public void testDfs() throws ExecutionException, InterruptedException, IOException {
logger.info("Setting up the index ...");
Settings.Builder settings = settingsBuilder()
.put(indexSettings())
.put("index.analysis.analyzer", "standard")
.put("index.number_of_shards", randomIntBetween(2, 10)); // we need at least 2 shards
assertAcked(prepareCreate("test")
.setSettings(settings)
.addMapping("type1", "text", "type=string"));
ensureGreen();
int numDocs = scaledRandomIntBetween(25, 100);
logger.info("Indexing {} documents...", numDocs);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
builders.add(client().prepareIndex("test", "type1", i + "").setSource("text", "cat"));
}
indexRandom(true, builders);
XContentBuilder expectedStats = jsonBuilder()
.startObject()
.startObject("text")
.startObject("field_statistics")
.field("sum_doc_freq", numDocs)
.field("doc_count", numDocs)
.field("sum_ttf", numDocs)
.endObject()
.startObject("terms")
.startObject("cat")
.field("doc_freq", numDocs)
.field("ttf", numDocs)
.endObject()
.endObject()
.endObject()
.endObject();
logger.info("Without dfs 'cat' should appear strictly less than {} times.", numDocs);
TermVectorsResponse response = client().prepareTermVectors("test", "type1", randomIntBetween(0, numDocs - 1) + "")
.setSelectedFields("text")
.setFieldStatistics(true)
.setTermStatistics(true)
.get();
checkStats(response.getFields(), expectedStats, false);
logger.info("With dfs 'cat' should appear exactly {} times.", numDocs);
response = client().prepareTermVectors("test", "type1", randomIntBetween(0, numDocs - 1) + "")
.setSelectedFields("text")
.setFieldStatistics(true)
.setTermStatistics(true)
.setDfs(true)
.get();
checkStats(response.getFields(), expectedStats, true);
}
private void checkStats(Fields fields, XContentBuilder xContentBuilder, boolean isEqual) throws IOException {
Map<String, Object> stats = JsonXContent.jsonXContent.createParser(xContentBuilder.bytes()).map();
assertThat("number of fields expected:", fields.size(), equalTo(stats.size()));
for (String fieldName : fields) {
logger.info("Checking field statistics for field: {}", fieldName);
Terms terms = fields.terms(fieldName);
Map<String, Integer> fieldStatistics = getFieldStatistics(stats, fieldName);
String msg = "field: " + fieldName + " ";
assertThat(msg + "sum_doc_freq:",
(int) terms.getSumDocFreq(),
equalOrLessThanTo(fieldStatistics.get("sum_doc_freq"), isEqual));
assertThat(msg + "doc_count:",
terms.getDocCount(),
equalOrLessThanTo(fieldStatistics.get("doc_count"), isEqual));
assertThat(msg + "sum_ttf:",
(int) terms.getSumTotalTermFreq(),
equalOrLessThanTo(fieldStatistics.get("sum_ttf"), isEqual));
final TermsEnum termsEnum = terms.iterator();
BytesRef text;
while((text = termsEnum.next()) != null) {
String term = text.utf8ToString();
logger.info("Checking term statistics for term: ({}, {})", fieldName, term);
Map<String, Integer> termStatistics = getTermStatistics(stats, fieldName, term);
msg = "term: (" + fieldName + "," + term + ") ";
assertThat(msg + "doc_freq:",
termsEnum.docFreq(),
equalOrLessThanTo(termStatistics.get("doc_freq"), isEqual));
assertThat(msg + "ttf:",
(int) termsEnum.totalTermFreq(),
equalOrLessThanTo(termStatistics.get("ttf"), isEqual));
}
}
}
private Map<String, Integer> getFieldStatistics(Map<String, Object> stats, String fieldName) throws IOException { private Map<String, Integer> getFieldStatistics(Map<String, Object> stats, String fieldName) throws IOException {
return (Map<String, Integer>) ((Map<String, Object>) stats.get(fieldName)).get("field_statistics"); return (Map<String, Integer>) ((Map<String, Object>) stats.get(fieldName)).get("field_statistics");
} }

View File

@ -34,12 +34,6 @@
"default" : true, "default" : true,
"required" : false "required" : false
}, },
"dfs" : {
"type" : "boolean",
"description" : "Specifies if distributed frequencies should be returned instead shard frequencies.",
"default" : false,
"required" : false
},
"fields" : { "fields" : {
"type" : "list", "type" : "list",
"description" : "A comma-separated list of fields to return.", "description" : "A comma-separated list of fields to return.",