Term Vectors: support for distributed frequencies

Adds distributed frequencies support for the Term Vectors API. A new parameter
called `dfs` is introduced which defaults to `false`.

Closes #8144
This commit is contained in:
Alex Ksikes 2014-10-16 19:46:47 +02:00
parent 19514a2ef4
commit c13f5f21de
17 changed files with 774 additions and 20 deletions

View File

@ -76,16 +76,23 @@ omit :
* sum of total term frequencies (the sum of total term frequencies of
each term in this field)
[float]
==== Distributed frequencies coming[1.5.0]
Setting `dfs` to `true` (default is `false`) will return the term statistics
or the field statistics of the entire index, and not just at the shard. Use it
with caution as distributed frequencies can have a serious performance impact.
[float]
=== Behaviour
The term and field statistics are not accurate. Deleted documents
are not taken into account. The information is only retrieved for the
shard the requested document resides in. The term and field statistics
are therefore only useful as relative measures whereas the absolute
numbers have no meaning in this context. By default, when requesting
term vectors of artificial documents, a shard to get the statistics from
is randomly selected. Use `routing` only to hit a particular shard.
shard the requested document resides in, unless `dfs` is set to `true`.
The term and field statistics are therefore only useful as relative measures
whereas the absolute numbers have no meaning in this context. By default,
when requesting term vectors of artificial documents, a shard to get the statistics
from is randomly selected. Use `routing` only to hit a particular shard.
[float]
=== Example 1

View File

@ -35,6 +35,12 @@
"default" : true,
"required" : false
},
"dfs" : {
"type" : "boolean",
"description" : "Specifies if distributed frequencies should be returned instead shard frequencies.",
"default" : false,
"required" : false
},
"fields" : {
"type" : "list",
"description" : "A comma-separated list of fields to return.",

View File

@ -160,6 +160,7 @@ import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.termvector.*;
import org.elasticsearch.action.termvector.dfs.TransportDfsOnlyAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.common.inject.AbstractModule;
@ -280,7 +281,8 @@ public class ActionModule extends AbstractModule {
registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
registerAction(GetAction.INSTANCE, TransportGetAction.class);
registerAction(TermVectorAction.INSTANCE, TransportSingleShardTermVectorAction.class);
registerAction(TermVectorAction.INSTANCE, TransportSingleShardTermVectorAction.class,
TransportDfsOnlyAction.class);
registerAction(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
TransportSingleShardMultiTermsVectorAction.class);
registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class,

View File

@ -292,6 +292,22 @@ public class TermVectorRequest extends SingleShardOperationRequest<TermVectorReq
return this;
}
/**
* @return <code>true</code> if distributed frequencies should be returned. Otherwise
* <code>false</code>
*/
public boolean dfs() {
return flagsEnum.contains(Flag.Dfs);
}
/**
* Use distributed frequencies instead of shard statistics.
*/
public TermVectorRequest dfs(boolean dfs) {
setFlag(Flag.Dfs, dfs);
return this;
}
/**
* Return only term vectors for special selected fields. Returns for term
* vectors for all fields if selectedFields == null
@ -309,24 +325,30 @@ public class TermVectorRequest extends SingleShardOperationRequest<TermVectorReq
return this;
}
/**
* Return whether term vectors should be generated real-time (default to true).
*/
public boolean realtime() {
return this.realtime == null ? true : this.realtime;
}
/**
* Choose whether term vectors be generated real-time.
*/
public TermVectorRequest realtime(Boolean realtime) {
this.realtime = realtime;
return this;
}
/**
* Return the overridden analyzers at each field
* Return the overridden analyzers at each field.
*/
public Map<String, String> perFieldAnalyzer() {
return perFieldAnalyzer;
}
/**
* Override the analyzer used at each field when generating term vectors
* Override the analyzer used at each field when generating term vectors.
*/
public TermVectorRequest perFieldAnalyzer(Map<String, String> perFieldAnalyzer) {
this.perFieldAnalyzer = perFieldAnalyzer != null && perFieldAnalyzer.size() != 0 ? Maps.newHashMap(perFieldAnalyzer) : null;
@ -444,7 +466,7 @@ public class TermVectorRequest extends SingleShardOperationRequest<TermVectorReq
public static enum Flag {
// Do not change the order of these flags we use
// the ordinal for encoding! Only append to the end!
Positions, Offsets, Payloads, FieldStatistics, TermStatistics
Positions, Offsets, Payloads, FieldStatistics, TermStatistics, Dfs
}
/**
@ -477,6 +499,8 @@ public class TermVectorRequest extends SingleShardOperationRequest<TermVectorReq
termVectorRequest.termStatistics(parser.booleanValue());
} else if (currentFieldName.equals("field_statistics") || currentFieldName.equals("fieldStatistics")) {
termVectorRequest.fieldStatistics(parser.booleanValue());
} else if (currentFieldName.equals("dfs")) {
termVectorRequest.dfs(parser.booleanValue());
} else if (currentFieldName.equals("per_field_analyzer") || currentFieldName.equals("perFieldAnalyzer")) {
termVectorRequest.perFieldAnalyzer(readPerFieldAnalyzer(parser.map()));
} else if ("_index".equals(currentFieldName)) { // the following is important for multi request parsing.

View File

@ -27,6 +27,11 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.util.Map;
/**
* The builder class for a term vector request.
* Returns the term vector (doc frequency, positions, offsets) for a document.
* <p/>
* Note, the {@code index}, {@code type} and {@code id} are
* required.
*/
public class TermVectorRequestBuilder extends ActionRequestBuilder<TermVectorRequest, TermVectorResponse, TermVectorRequestBuilder, Client> {
@ -34,6 +39,11 @@ public class TermVectorRequestBuilder extends ActionRequestBuilder<TermVectorReq
super(client, new TermVectorRequest());
}
/**
* Constructs a new term vector request builder for a document that will be fetch
* from the provided index. Use {@code index}, {@code type} and
* {@code id} to specify the document to load.
*/
public TermVectorRequestBuilder(Client client, String index, String type, String id) {
super(client, new TermVectorRequest(index, type, id));
}
@ -92,47 +102,81 @@ public class TermVectorRequestBuilder extends ActionRequestBuilder<TermVectorReq
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
* a custom value, which guarantees that the same order will be used across different requests.
*/
public TermVectorRequestBuilder setPreference(String preference) {
request.preference(preference);
return this;
}
/**
* Sets whether to return the start and stop offsets for each term if they were stored or
* skip offsets.
*/
public TermVectorRequestBuilder setOffsets(boolean offsets) {
request.offsets(offsets);
return this;
}
/**
* Sets whether to return the positions for each term if stored or skip.
*/
public TermVectorRequestBuilder setPositions(boolean positions) {
request.positions(positions);
return this;
}
/**
* Sets whether to return the payloads for each term or skip.
*/
public TermVectorRequestBuilder setPayloads(boolean payloads) {
request.payloads(payloads);
return this;
}
/**
* Sets whether to return the term statistics for each term in the shard or skip.
*/
public TermVectorRequestBuilder setTermStatistics(boolean termStatistics) {
request.termStatistics(termStatistics);
return this;
}
/**
* Sets whether to return the field statistics for each term in the shard or skip.
*/
public TermVectorRequestBuilder setFieldStatistics(boolean fieldStatistics) {
request.fieldStatistics(fieldStatistics);
return this;
}
/**
* Sets whether to use distributed frequencies instead of shard statistics.
*/
public TermVectorRequestBuilder setDfs(boolean dfs) {
request.dfs(dfs);
return this;
}
/**
* Sets whether to return only term vectors for special selected fields. Returns the term
* vectors for all fields if selectedFields == null
*/
public TermVectorRequestBuilder setSelectedFields(String... fields) {
request.selectedFields(fields);
return this;
}
/**
* Sets whether term vectors are generated real-time.
*/
public TermVectorRequestBuilder setRealtime(Boolean realtime) {
request.realtime(realtime);
return this;
}
/**
* Sets the analyzer used at each field when generating term vectors.
*/
public TermVectorRequestBuilder setPerFieldAnalyzer(Map<String, String> perFieldAnalyzer) {
request.perFieldAnalyzer(perFieldAnalyzer);
return this;

View File

@ -30,6 +30,7 @@ import org.apache.lucene.util.CharsRefBuilder;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.termvector.TermVectorRequest.Flag;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -38,6 +39,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.search.dfs.AggregatedDfs;
import java.io.IOException;
import java.util.EnumSet;
@ -320,10 +322,14 @@ public class TermVectorResponse extends ActionResponse implements ToXContent {
}
public void setFields(Fields termVectorsByField, Set<String> selectedFields, EnumSet<Flag> flags, Fields topLevelFields) throws IOException {
setFields(termVectorsByField, selectedFields, flags, topLevelFields, null);
}
public void setFields(Fields termVectorsByField, Set<String> selectedFields, EnumSet<Flag> flags, Fields topLevelFields, @Nullable AggregatedDfs dfs) throws IOException {
TermVectorWriter tvw = new TermVectorWriter(this);
if (termVectorsByField != null) {
tvw.setFields(termVectorsByField, selectedFields, flags, topLevelFields);
tvw.setFields(termVectorsByField, selectedFields, flags, topLevelFields, dfs);
}
}

View File

@ -19,10 +19,14 @@
package org.elasticsearch.action.termvector;
import org.apache.lucene.index.*;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.termvector.TermVectorRequest.Flag;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.search.dfs.AggregatedDfs;
import java.io.IOException;
import java.util.ArrayList;
@ -45,7 +49,7 @@ final class TermVectorWriter {
response = termVectorResponse;
}
void setFields(Fields termVectorsByField, Set<String> selectedFields, EnumSet<Flag> flags, Fields topLevelFields) throws IOException {
void setFields(Fields termVectorsByField, Set<String> selectedFields, EnumSet<Flag> flags, Fields topLevelFields, @Nullable AggregatedDfs dfs) throws IOException {
int numFieldsWritten = 0;
TermsEnum iterator = null;
DocsAndPositionsEnum docsAndPosEnum = null;
@ -70,8 +74,12 @@ final class TermVectorWriter {
boolean payloads = flags.contains(Flag.Payloads) && fieldTermVector.hasPayloads();
startField(field, fieldTermVector.size(), positions, offsets, payloads);
if (flags.contains(Flag.FieldStatistics)) {
if (dfs != null) {
writeFieldStatistics(dfs.fieldStatistics().get(field));
} else {
writeFieldStatistics(topLevelTerms);
}
}
iterator = fieldTermVector.iterator(iterator);
final boolean useDocsAndPos = positions || offsets || payloads;
while (iterator.next() != null) { // iterate all terms of the
@ -81,8 +89,12 @@ final class TermVectorWriter {
boolean foundTerm = topLevelIterator.seekExact(term);
startTerm(term);
if (flags.contains(Flag.TermStatistics)) {
if (dfs != null) {
writeTermStatistics(dfs.termStatistics().get(new Term(field, term.utf8ToString())));
} else {
writeTermStatistics(topLevelIterator);
}
}
if (useDocsAndPos) {
// given we have pos or offsets
docsAndPosEnum = writeTermWithDocsAndPos(iterator, docsAndPosEnum, positions, offsets, payloads);
@ -161,7 +173,6 @@ final class TermVectorWriter {
}
private void writeFreq(int termFreq) throws IOException {
writePotentiallyNegativeVInt(termFreq);
}
@ -205,7 +216,15 @@ final class TermVectorWriter {
long ttf = topLevelIterator.totalTermFreq();
assert (ttf >= -1);
writePotentiallyNegativeVLong(ttf);
}
private void writeTermStatistics(TermStatistics termStatistics) throws IOException {
int docFreq = (int) termStatistics.docFreq();
assert (docFreq >= -1);
writePotentiallyNegativeVInt(docFreq);
long ttf = termStatistics.totalTermFreq();
assert (ttf >= -1);
writePotentiallyNegativeVLong(ttf);
}
private void writeFieldStatistics(Terms topLevelTerms) throws IOException {
@ -218,7 +237,18 @@ final class TermVectorWriter {
int dc = topLevelTerms.getDocCount();
assert (dc >= -1);
writePotentiallyNegativeVInt(dc);
}
private void writeFieldStatistics(CollectionStatistics fieldStats) throws IOException {
long sttf = fieldStats.sumTotalTermFreq();
assert (sttf >= -1);
writePotentiallyNegativeVLong(sttf);
long sdf = fieldStats.sumDocFreq();
assert (sdf >= -1);
writePotentiallyNegativeVLong(sdf);
int dc = (int) fieldStats.docCount();
assert (dc >= -1);
writePotentiallyNegativeVInt(dc);
}
private void writePotentiallyNegativeVInt(int value) throws IOException {

View File

@ -0,0 +1,121 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.termvector.dfs;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
public class DfsOnlyRequest extends BroadcastOperationRequest<DfsOnlyRequest> {
private SearchRequest searchRequest = new SearchRequest();
long nowInMillis;
DfsOnlyRequest() {
}
public DfsOnlyRequest(Fields termVectorFields, String[] indices, String[] types, Set<String> selectedFields) throws IOException {
super(indices);
// build a search request with a query of all the terms
final BoolQueryBuilder boolBuilder = boolQuery();
TermsEnum iterator = null;
for (String fieldName : termVectorFields) {
if ((selectedFields != null) && (!selectedFields.contains(fieldName))) {
continue;
}
Terms terms = termVectorFields.terms(fieldName);
iterator = terms.iterator(iterator);
while (iterator.next() != null) {
String text = iterator.term().utf8ToString();
boolBuilder.should(QueryBuilders.termQuery(fieldName, text));
}
}
// wrap a search request object
this.searchRequest = new SearchRequest(indices).types(types).source(new SearchSourceBuilder().query(boolBuilder));
}
public SearchRequest getSearchRequest() {
return searchRequest;
}
@Override
public ActionRequestValidationException validate() {
return searchRequest.validate();
}
@Override
protected void beforeStart() {
searchRequest.beforeStart();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.searchRequest.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
this.searchRequest.writeTo(out);
}
public String[] types() {
return this.searchRequest.types();
}
public String routing() {
return this.searchRequest.routing();
}
public String preference() {
return this.searchRequest.preference();
}
@Override
public String toString() {
String sSource = "_na_";
try {
sSource = XContentHelper.convertToJson(searchRequest.source(), false);
} catch (IOException e) {
// ignore
}
return "[" + Arrays.toString(indices) + "]" + Arrays.toString(types()) + ", source[" + sSource + "]";
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.termvector.dfs;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.dfs.AggregatedDfs;
import java.io.IOException;
import java.util.List;
/**
* A response of a dfs only request.
*/
public class DfsOnlyResponse extends BroadcastOperationResponse {
private AggregatedDfs dfs;
private long tookInMillis;
DfsOnlyResponse(AggregatedDfs dfs, int totalShards, int successfulShards, int failedShards,
List<ShardOperationFailedException> shardFailures, long tookInMillis) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.dfs = dfs;
this.tookInMillis = tookInMillis;
}
public AggregatedDfs getDfs() {
return dfs;
}
public TimeValue getTook() {
return new TimeValue(tookInMillis);
}
public long getTookInMillis() {
return tookInMillis;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
AggregatedDfs.readAggregatedDfs(in);
tookInMillis = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
dfs.writeTo(out);
out.writeVLong(tookInMillis);
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.termvector.dfs;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import java.io.IOException;
class ShardDfsOnlyRequest extends BroadcastShardOperationRequest {
private ShardSearchTransportRequest shardSearchRequest = new ShardSearchTransportRequest();
ShardDfsOnlyRequest() {
}
ShardDfsOnlyRequest(ShardRouting shardRouting, int numberOfShards, @Nullable String[] filteringAliases, @Nullable long nowInMillis, DfsOnlyRequest request) {
super(shardRouting.shardId(), request);
this.shardSearchRequest = new ShardSearchTransportRequest(request.getSearchRequest(), shardRouting, numberOfShards, false,
filteringAliases, nowInMillis);
}
public ShardSearchRequest getShardSearchRequest() {
return shardSearchRequest;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardSearchRequest.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardSearchRequest.writeTo(out);
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.termvector.dfs;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.dfs.DfsSearchResult;
import java.io.IOException;
/**
*
*/
class ShardDfsOnlyResponse extends BroadcastShardOperationResponse {
private DfsSearchResult dfsSearchResult = new DfsSearchResult();
ShardDfsOnlyResponse() {
}
ShardDfsOnlyResponse(ShardId shardId, DfsSearchResult dfsSearchResult) {
super(shardId);
this.dfsSearchResult = dfsSearchResult;
}
public DfsSearchResult getDfsSearchResult() {
return dfsSearchResult;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
dfsSearchResult.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
dfsSearchResult.writeTo(out);
}
}

View File

@ -0,0 +1,160 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.termvector.dfs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.newArrayList;
/**
* Get the dfs only with no fetch phase. This is for internal use only.
*/
public class TransportDfsOnlyAction extends TransportBroadcastOperationAction<DfsOnlyRequest, DfsOnlyResponse, ShardDfsOnlyRequest, ShardDfsOnlyResponse> {
public static final String NAME = "internal:index/termvectors/dfs";
private final SearchService searchService;
private final SearchPhaseController searchPhaseController;
@Inject
public TransportDfsOnlyAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, SearchService searchService, SearchPhaseController searchPhaseController) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters);
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
@Override
protected void doExecute(DfsOnlyRequest request, ActionListener<DfsOnlyResponse> listener) {
request.nowInMillis = System.currentTimeMillis();
super.doExecute(request, listener);
}
@Override
protected String executor() {
return ThreadPool.Names.SEARCH;
}
@Override
protected DfsOnlyRequest newRequest() {
return new DfsOnlyRequest();
}
@Override
protected ShardDfsOnlyRequest newShardRequest() {
return new ShardDfsOnlyRequest();
}
@Override
protected ShardDfsOnlyRequest newShardRequest(int numShards, ShardRouting shard, DfsOnlyRequest request) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(shard.index(), request.indices());
return new ShardDfsOnlyRequest(shard, numShards, filteringAliases, request.nowInMillis, request);
}
@Override
protected ShardDfsOnlyResponse newShardResponse() {
return new ShardDfsOnlyResponse();
}
@Override
protected GroupShardsIterator shards(ClusterState clusterState, DfsOnlyRequest request, String[] concreteIndices) {
Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(request.routing(), request.indices());
return clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, routingMap, request.preference());
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, DfsOnlyRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, DfsOnlyRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices);
}
@Override
protected DfsOnlyResponse newResponse(DfsOnlyRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
AtomicArray<DfsSearchResult> dfsResults = new AtomicArray<>(shardsResponses.length());
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
dfsResults.set(i, ((ShardDfsOnlyResponse) shardResponse).getDfsSearchResult());
successfulShards++;
}
}
AggregatedDfs dfs = searchPhaseController.aggregateDfs(dfsResults);
return new DfsOnlyResponse(dfs, shardsResponses.length(), successfulShards, failedShards, shardFailures, buildTookInMillis(request));
}
@Override
protected ShardDfsOnlyResponse shardOperation(ShardDfsOnlyRequest request) throws ElasticsearchException {
DfsSearchResult dfsSearchResult = searchService.executeDfsPhase(request.getShardSearchRequest());
searchService.freeContext(dfsSearchResult.id());
return new ShardDfsOnlyResponse(request.shardId(), dfsSearchResult);
}
/**
* Builds how long it took to execute the dfs request.
*/
protected final long buildTookInMillis(DfsOnlyRequest request) {
// protect ourselves against time going backwards
// negative values don't make sense and we want to be able to serialize that thing as a vLong
return Math.max(1, System.currentTimeMillis() - request.nowInMillis);
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Distributed frequencies.
*/
package org.elasticsearch.action.termvector.dfs;

View File

@ -25,6 +25,9 @@ import org.apache.lucene.index.memory.MemoryIndex;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.termvector.TermVectorRequest;
import org.elasticsearch.action.termvector.TermVectorResponse;
import org.elasticsearch.action.termvector.dfs.DfsOnlyRequest;
import org.elasticsearch.action.termvector.dfs.DfsOnlyResponse;
import org.elasticsearch.action.termvector.dfs.TransportDfsOnlyAction;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -44,6 +47,7 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.search.dfs.AggregatedDfs;
import java.io.IOException;
import java.util.*;
@ -57,11 +61,13 @@ public class ShardTermVectorService extends AbstractIndexShardComponent {
private IndexShard indexShard;
private final MappingUpdatedAction mappingUpdatedAction;
private final TransportDfsOnlyAction dfsAction;
@Inject
public ShardTermVectorService(ShardId shardId, @IndexSettings Settings indexSettings, MappingUpdatedAction mappingUpdatedAction) {
public ShardTermVectorService(ShardId shardId, @IndexSettings Settings indexSettings, MappingUpdatedAction mappingUpdatedAction, TransportDfsOnlyAction dfsAction) {
super(shardId, indexSettings);
this.mappingUpdatedAction = mappingUpdatedAction;
this.dfsAction = dfsAction;
}
// sadly, to overcome cyclic dep, we need to do this and inject it ourselves...
@ -78,6 +84,7 @@ public class ShardTermVectorService extends AbstractIndexShardComponent {
final Term uidTerm = new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(request.type(), request.id()));
Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), uidTerm));
boolean docFromTranslog = get.source() != null;
AggregatedDfs dfs = null;
/* fetched from translog is treated as an artificial document */
if (docFromTranslog) {
@ -100,7 +107,10 @@ public class ShardTermVectorService extends AbstractIndexShardComponent {
if (topLevelFields == null) {
topLevelFields = termVectorsByField;
}
termVectorResponse.setFields(termVectorsByField, request.selectedFields(), request.getFlags(), topLevelFields);
if (useDfs(request)) {
dfs = getAggregatedDfs(termVectorsByField, request);
}
termVectorResponse.setFields(termVectorsByField, request.selectedFields(), request.getFlags(), topLevelFields, dfs);
termVectorResponse.setExists(true);
termVectorResponse.setArtificial(!docFromTranslog);
}
@ -117,7 +127,10 @@ public class ShardTermVectorService extends AbstractIndexShardComponent {
if (selectedFields != null) {
termVectorsByField = addGeneratedTermVectors(get, termVectorsByField, request, selectedFields);
}
termVectorResponse.setFields(termVectorsByField, request.selectedFields(), request.getFlags(), topLevelFields);
if (useDfs(request)) {
dfs = getAggregatedDfs(termVectorsByField, request);
}
termVectorResponse.setFields(termVectorsByField, request.selectedFields(), request.getFlags(), topLevelFields, dfs);
termVectorResponse.setDocVersion(docIdAndVersion.version);
termVectorResponse.setExists(true);
} else {
@ -315,4 +328,14 @@ public class ShardTermVectorService extends AbstractIndexShardComponent {
}
}
private boolean useDfs(TermVectorRequest request) {
return request.dfs() && (request.fieldStatistics() || request.termStatistics());
}
private AggregatedDfs getAggregatedDfs(Fields termVectorFields, TermVectorRequest request) throws IOException {
DfsOnlyRequest dfsOnlyRequest = new DfsOnlyRequest(termVectorFields, new String[]{request.index()},
new String[]{request.type()}, request.selectedFields());
DfsOnlyResponse response = dfsAction.execute(dfsOnlyRequest).actionGet();
return response.getDfs();
}
}

View File

@ -84,6 +84,7 @@ public class RestTermVectorAction extends BaseRestHandler {
termVectorRequest.termStatistics(request.paramAsBoolean("term_statistics", termVectorRequest.termStatistics()));
termVectorRequest.fieldStatistics(request.paramAsBoolean("fieldStatistics", termVectorRequest.fieldStatistics()));
termVectorRequest.fieldStatistics(request.paramAsBoolean("field_statistics", termVectorRequest.fieldStatistics()));
termVectorRequest.dfs(request.paramAsBoolean("dfs", termVectorRequest.dfs()));
}
static public void addFieldStringsFromParameter(TermVectorRequest termVectorRequest, String fields) {

View File

@ -32,7 +32,9 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.mapper.core.AbstractFieldMapper;
import org.hamcrest.Matcher;
import org.junit.Test;
import java.io.IOException;
@ -403,7 +405,6 @@ public class GetTermVectorTests extends AbstractTermVectorTests {
@Test
public void testRandomPayloadWithDelimitedPayloadTokenFilter() throws ElasticsearchException, IOException {
//create the test document
int encoding = randomIntBetween(0, 2);
String encodingString = "";
@ -1018,4 +1019,110 @@ public class GetTermVectorTests extends AbstractTermVectorTests {
private static String indexOrAlias() {
return randomBoolean() ? "test" : "alias";
}
@Test
public void testDfs() throws ElasticsearchException, ExecutionException, InterruptedException, IOException {
logger.info("Setting up the index ...");
ImmutableSettings.Builder settings = settingsBuilder()
.put(indexSettings())
.put("index.analysis.analyzer", "standard")
.put("index.number_of_shards", randomIntBetween(2, 10)); // we need at least 2 shards
assertAcked(prepareCreate("test")
.setSettings(settings)
.addMapping("type1", "text", "type=string"));
ensureGreen();
int numDocs = scaledRandomIntBetween(25, 100);
logger.info("Indexing {} documents...", numDocs);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
builders.add(client().prepareIndex("test", "type1", i + "").setSource("text", "cat"));
}
indexRandom(true, builders);
XContentBuilder expectedStats = jsonBuilder()
.startObject()
.startObject("text")
.startObject("field_statistics")
.field("sum_doc_freq", numDocs)
.field("doc_count", numDocs)
.field("sum_ttf", numDocs)
.endObject()
.startObject("terms")
.startObject("cat")
.field("doc_freq", numDocs)
.field("ttf", numDocs)
.endObject()
.endObject()
.endObject()
.endObject();
logger.info("Without dfs 'cat' should appear strictly less than {} times.", numDocs);
TermVectorResponse response = client().prepareTermVector("test", "type1", randomIntBetween(0, numDocs - 1) + "")
.setSelectedFields("text")
.setFieldStatistics(true)
.setTermStatistics(true)
.get();
checkStats(response.getFields(), expectedStats, false);
logger.info("With dfs 'cat' should appear exactly {} times.", numDocs);
response = client().prepareTermVector("test", "type1", randomIntBetween(0, numDocs - 1) + "")
.setSelectedFields("text")
.setFieldStatistics(true)
.setTermStatistics(true)
.setDfs(true)
.get();
checkStats(response.getFields(), expectedStats, true);
}
private void checkStats(Fields fields, XContentBuilder xContentBuilder, boolean isEqual) throws IOException {
Map<String, Object> stats = JsonXContent.jsonXContent.createParser(xContentBuilder.bytes()).map();
assertThat("number of fields expected:", fields.size(), equalTo(stats.size()));
for (String fieldName : fields) {
logger.info("Checking field statistics for field: {}", fieldName);
Terms terms = fields.terms(fieldName);
Map<String, Integer> fieldStatistics = getFieldStatistics(stats, fieldName);
String msg = "field: " + fieldName + " ";
assertThat(msg + "sum_doc_freq:",
(int) terms.getSumDocFreq(),
equalOrLessThanTo(fieldStatistics.get("sum_doc_freq"), isEqual));
assertThat(msg + "doc_count:",
terms.getDocCount(),
equalOrLessThanTo(fieldStatistics.get("doc_count"), isEqual));
assertThat(msg + "sum_ttf:",
(int) terms.getSumTotalTermFreq(),
equalOrLessThanTo(fieldStatistics.get("sum_ttf"), isEqual));
final TermsEnum termsEnum = terms.iterator(null);
BytesRef text;
while((text = termsEnum.next()) != null) {
String term = text.utf8ToString();
logger.info("Checking term statistics for term: ({}, {})", fieldName, term);
Map<String, Integer> termStatistics = getTermStatistics(stats, fieldName, term);
msg = "term: (" + fieldName + "," + term + ") ";
assertThat(msg + "doc_freq:",
termsEnum.docFreq(),
equalOrLessThanTo(termStatistics.get("doc_freq"), isEqual));
assertThat(msg + "ttf:",
(int) termsEnum.totalTermFreq(),
equalOrLessThanTo(termStatistics.get("ttf"), isEqual));
}
}
}
private Map<String, Integer> getFieldStatistics(Map<String, Object> stats, String fieldName) throws IOException {
return (Map<String, Integer>) ((Map<String, Object>) stats.get(fieldName)).get("field_statistics");
}
private Map<String, Integer> getTermStatistics(Map<String, Object> stats, String fieldName, String term) {
return (Map<String, Integer>) ((Map<String, Object>) ((Map<String, Object>) stats.get(fieldName)).get("terms")).get(term);
}
private Matcher<Integer> equalOrLessThanTo(Integer value, boolean isEqual) {
if (isEqual) {
return equalTo(value);
}
return lessThan(value);
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.bench.BenchmarkAction;
import org.elasticsearch.action.bench.BenchmarkService;
import org.elasticsearch.action.bench.BenchmarkStatusAction;
import org.elasticsearch.action.exists.ExistsAction;
import org.elasticsearch.action.termvector.dfs.TransportDfsOnlyAction;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.repositories.VerifyNodeRepositoryAction;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -146,5 +147,7 @@ public class ActionNamesTests extends ElasticsearchIntegrationTest {
post_1_4_actions.add(SearchServiceTransportAction.FETCH_ID_SCROLL_ACTION_NAME);
post_1_4_actions.add(VerifyRepositoryAction.NAME);
post_1_4_actions.add(VerifyNodeRepositoryAction.ACTION_NAME);
post_1_4_actions.add(TransportDfsOnlyAction.NAME);
post_1_4_actions.add(TransportDfsOnlyAction.NAME + "[s]");
}
}