terms facet on an IP field returns terms as numbers, not IPs, closes #678.

This commit is contained in:
kimchy 2011-02-09 21:37:42 +02:00
parent 0e595532f7
commit b26d86293f
4 changed files with 505 additions and 1 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.search.facet.terms.bytes.InternalByteTermsFacet;
import org.elasticsearch.search.facet.terms.doubles.InternalDoubleTermsFacet;
import org.elasticsearch.search.facet.terms.floats.InternalFloatTermsFacet;
import org.elasticsearch.search.facet.terms.ints.InternalIntTermsFacet;
import org.elasticsearch.search.facet.terms.ip.InternalIpTermsFacet;
import org.elasticsearch.search.facet.terms.longs.InternalLongTermsFacet;
import org.elasticsearch.search.facet.terms.shorts.InternalShortTermsFacet;
import org.elasticsearch.search.facet.terms.strings.InternalStringTermsFacet;
@ -44,6 +45,7 @@ public abstract class InternalTermsFacet implements TermsFacet, InternalFacet {
InternalFloatTermsFacet.registerStream();
InternalShortTermsFacet.registerStream();
InternalByteTermsFacet.registerStream();
InternalIpTermsFacet.registerStream();
}
public abstract Facet reduce(String name, List<Facet> facets);

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.xcontent.ip.IpFieldMapper;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetCollector;
import org.elasticsearch.search.facet.FacetProcessor;
@ -36,6 +37,7 @@ import org.elasticsearch.search.facet.terms.doubles.TermsDoubleFacetCollector;
import org.elasticsearch.search.facet.terms.floats.TermsFloatFacetCollector;
import org.elasticsearch.search.facet.terms.index.IndexNameFacetCollector;
import org.elasticsearch.search.facet.terms.ints.TermsIntFacetCollector;
import org.elasticsearch.search.facet.terms.ip.TermsIpFacetCollector;
import org.elasticsearch.search.facet.terms.longs.TermsLongFacetCollector;
import org.elasticsearch.search.facet.terms.shorts.TermsShortFacetCollector;
import org.elasticsearch.search.facet.terms.strings.FieldsTermsStringFacetCollector;
@ -139,7 +141,9 @@ public class TermsFacetProcessor extends AbstractComponent implements FacetProce
FieldMapper fieldMapper = context.mapperService().smartNameFieldMapper(field);
if (fieldMapper != null) {
if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.LONG) {
if (fieldMapper instanceof IpFieldMapper) {
return new TermsIpFacetCollector(facetName, field, size, comparatorType, allTerms, context, scriptLang, script, params);
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.LONG) {
return new TermsLongFacetCollector(facetName, field, size, comparatorType, allTerms, context, scriptLang, script, params);
} else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.DOUBLE) {
return new TermsDoubleFacetCollector(facetName, field, size, comparatorType, allTerms, context, scriptLang, script, params);

View File

@ -0,0 +1,262 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.facet.terms.ip;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TLongIntIterator;
import org.elasticsearch.common.trove.map.hash.TLongIntHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.mapper.xcontent.ip.IpFieldMapper;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.terms.InternalTermsFacet;
import org.elasticsearch.search.facet.terms.TermsFacet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class InternalIpTermsFacet extends InternalTermsFacet {
private static final String STREAM_TYPE = "ipTerms";
public static void registerStream() {
Streams.registerStream(STREAM, STREAM_TYPE);
}
static Stream STREAM = new Stream() {
@Override public Facet readFacet(String type, StreamInput in) throws IOException {
return readTermsFacet(in);
}
};
@Override public String streamType() {
return STREAM_TYPE;
}
public static class LongEntry implements Entry {
long term;
int count;
public LongEntry(long term, int count) {
this.term = term;
this.count = count;
}
public String term() {
return IpFieldMapper.longToIp(term);
}
public String getTerm() {
return term();
}
@Override public Number termAsNumber() {
return term;
}
@Override public Number getTermAsNumber() {
return termAsNumber();
}
public int count() {
return count;
}
public int getCount() {
return count();
}
@Override public int compareTo(Entry o) {
long anotherVal = ((LongEntry) o).term;
if (term < anotherVal) {
return -1;
}
if (term == anotherVal) {
int i = count - o.count();
if (i == 0) {
i = System.identityHashCode(this) - System.identityHashCode(o);
}
return i;
}
return 1;
}
}
private String name;
int requiredSize;
long missing;
Collection<LongEntry> entries = ImmutableList.of();
ComparatorType comparatorType;
InternalIpTermsFacet() {
}
public InternalIpTermsFacet(String name, ComparatorType comparatorType, int requiredSize, Collection<LongEntry> entries, long missing) {
this.name = name;
this.comparatorType = comparatorType;
this.requiredSize = requiredSize;
this.entries = entries;
this.missing = missing;
}
@Override public String name() {
return this.name;
}
@Override public String getName() {
return this.name;
}
@Override public String type() {
return TYPE;
}
@Override public String getType() {
return type();
}
@Override public List<LongEntry> entries() {
if (!(entries instanceof List)) {
entries = ImmutableList.copyOf(entries);
}
return (List<LongEntry>) entries;
}
@Override public List<LongEntry> getEntries() {
return entries();
}
@SuppressWarnings({"unchecked"}) @Override public Iterator<Entry> iterator() {
return (Iterator) entries.iterator();
}
@Override public long missingCount() {
return this.missing;
}
@Override public long getMissingCount() {
return missingCount();
}
private static ThreadLocal<ThreadLocals.CleanableValue<TLongIntHashMap>> aggregateCache = new ThreadLocal<ThreadLocals.CleanableValue<TLongIntHashMap>>() {
@Override protected ThreadLocals.CleanableValue<TLongIntHashMap> initialValue() {
return new ThreadLocals.CleanableValue<TLongIntHashMap>(new TLongIntHashMap());
}
};
@Override public Facet reduce(String name, List<Facet> facets) {
if (facets.size() == 1) {
return facets.get(0);
}
InternalIpTermsFacet first = (InternalIpTermsFacet) facets.get(0);
TLongIntHashMap aggregated = aggregateCache.get().get();
aggregated.clear();
long missing = 0;
for (Facet facet : facets) {
InternalIpTermsFacet mFacet = (InternalIpTermsFacet) facet;
missing += mFacet.missingCount();
for (LongEntry entry : mFacet.entries) {
aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count());
}
}
BoundedTreeSet<LongEntry> ordered = new BoundedTreeSet<LongEntry>(first.comparatorType.comparator(), first.requiredSize);
for (TLongIntIterator it = aggregated.iterator(); it.hasNext();) {
it.advance();
ordered.add(new LongEntry(it.key(), it.value()));
}
first.entries = ordered;
first.missing = missing;
return first;
}
static final class Fields {
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString MISSING = new XContentBuilderString("missing");
static final XContentBuilderString TERMS = new XContentBuilderString("terms");
static final XContentBuilderString TERM = new XContentBuilderString("term");
static final XContentBuilderString COUNT = new XContentBuilderString("count");
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name);
builder.field(Fields._TYPE, TermsFacet.TYPE);
builder.field(Fields.MISSING, missing);
builder.startArray(Fields.TERMS);
for (LongEntry entry : entries) {
builder.startObject();
builder.field(Fields.TERM, entry.term()); // displayed as string
builder.field(Fields.COUNT, entry.count());
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
}
public static InternalIpTermsFacet readTermsFacet(StreamInput in) throws IOException {
InternalIpTermsFacet facet = new InternalIpTermsFacet();
facet.readFrom(in);
return facet;
}
@Override public void readFrom(StreamInput in) throws IOException {
name = in.readUTF();
comparatorType = ComparatorType.fromId(in.readByte());
requiredSize = in.readVInt();
missing = in.readVLong();
int size = in.readVInt();
entries = new ArrayList<LongEntry>(size);
for (int i = 0; i < size; i++) {
entries.add(new LongEntry(in.readLong(), in.readVInt()));
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(name);
out.writeByte(comparatorType.id());
out.writeVInt(requiredSize);
out.writeVLong(missing);
out.writeVInt(entries.size());
for (LongEntry entry : entries) {
out.writeLong(entry.term);
out.writeVInt(entry.count());
}
}
}

View File

@ -0,0 +1,236 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.facet.terms.ip;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.trove.iterator.TLongIntIterator;
import org.elasticsearch.common.trove.map.hash.TLongIntHashMap;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.longs.LongFieldData;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.facet.AbstractFacetCollector;
import org.elasticsearch.search.facet.Facet;
import org.elasticsearch.search.facet.FacetPhaseExecutionException;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
public class TermsIpFacetCollector extends AbstractFacetCollector {
static ThreadLocal<ThreadLocals.CleanableValue<Deque<TLongIntHashMap>>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Deque<TLongIntHashMap>>>() {
@Override protected ThreadLocals.CleanableValue<Deque<TLongIntHashMap>> initialValue() {
return new ThreadLocals.CleanableValue<Deque<TLongIntHashMap>>(new ArrayDeque<TLongIntHashMap>());
}
};
private final FieldDataCache fieldDataCache;
private final String indexFieldName;
private final TermsFacet.ComparatorType comparatorType;
private final int size;
private final int numberOfShards;
private final FieldDataType fieldDataType;
private LongFieldData fieldData;
private final StaticAggregatorValueProc aggregator;
private final SearchScript script;
public TermsIpFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
String scriptLang, String script, Map<String, Object> params) {
super(facetName);
this.fieldDataCache = context.fieldDataCache();
this.size = size;
this.comparatorType = comparatorType;
this.numberOfShards = context.numberOfShards();
MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName);
if (smartMappers == null || !smartMappers.hasMapper()) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms long facet collector on it");
} else {
// add type filter if there is exact doc mapper associated with it
if (smartMappers.hasDocMapper()) {
setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter()));
}
if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.LONG) {
throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of long type, can't run terms long facet collector on it");
}
this.indexFieldName = smartMappers.mapper().names().indexName();
this.fieldDataType = smartMappers.mapper().fieldDataType();
}
if (script != null) {
this.script = context.scriptService().search(context.lookup(), scriptLang, script, params);
} else {
this.script = null;
}
if (this.script == null) {
aggregator = new StaticAggregatorValueProc(popFacets());
} else {
aggregator = new AggregatorValueProc(popFacets(), this.script);
}
if (allTerms) {
try {
for (IndexReader reader : context.searcher().subReaders()) {
LongFieldData fieldData = (LongFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
fieldData.forEachValue(aggregator);
}
} catch (Exception e) {
throw new FacetPhaseExecutionException(facetName, "failed to load all terms", e);
}
}
}
@Override public void setScorer(Scorer scorer) throws IOException {
if (script != null) {
script.setScorer(scorer);
}
}
@Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException {
fieldData = (LongFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName);
if (script != null) {
script.setNextReader(reader);
}
}
@Override protected void doCollect(int doc) throws IOException {
fieldData.forEachValueInDoc(doc, aggregator);
}
@Override public Facet facet() {
TLongIntHashMap facets = aggregator.facets();
if (facets.isEmpty()) {
pushFacets(facets);
return new InternalIpTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalIpTermsFacet.LongEntry>of(), aggregator.missing());
} else {
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalIpTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalIpTermsFacet.LongEntry>(comparatorType.comparator(), size * numberOfShards);
for (TLongIntIterator it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalIpTermsFacet.LongEntry(it.key(), it.value()));
}
pushFacets(facets);
return new InternalIpTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing());
}
}
static TLongIntHashMap popFacets() {
Deque<TLongIntHashMap> deque = cache.get().get();
if (deque.isEmpty()) {
deque.add(new TLongIntHashMap());
}
TLongIntHashMap facets = deque.pollFirst();
facets.clear();
return facets;
}
static void pushFacets(TLongIntHashMap facets) {
facets.clear();
Deque<TLongIntHashMap> deque = cache.get().get();
if (deque != null) {
deque.add(facets);
}
}
public static class AggregatorValueProc extends StaticAggregatorValueProc {
private final SearchScript script;
public AggregatorValueProc(TLongIntHashMap facets, SearchScript script) {
super(facets);
this.script = script;
}
@Override public void onValue(int docId, long value) {
if (script != null) {
script.setNextDocId(docId);
script.setNextVar("term", value);
Object scriptValue = script.run();
if (scriptValue == null) {
return;
}
if (scriptValue instanceof Boolean) {
if (!((Boolean) scriptValue)) {
return;
}
} else {
value = ((Number) scriptValue).longValue();
}
}
super.onValue(docId, value);
}
}
public static class StaticAggregatorValueProc implements LongFieldData.ValueInDocProc, LongFieldData.ValueProc {
private final TLongIntHashMap facets;
private int missing;
public StaticAggregatorValueProc(TLongIntHashMap facets) {
this.facets = facets;
}
@Override public void onValue(long value) {
facets.putIfAbsent(value, 0);
}
@Override public void onValue(int docId, long value) {
facets.adjustOrPutValue(value, 1, 1);
}
@Override public void onMissing(int docId) {
missing++;
}
public final TLongIntHashMap facets() {
return facets;
}
public final int missing() {
return this.missing;
}
}
}