From b26d86293f080fd996b4f223efb2fa9f7dbfda4e Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 9 Feb 2011 21:37:42 +0200 Subject: [PATCH] terms facet on an IP field returns terms as numbers, not IPs, closes #678. --- .../facet/terms/InternalTermsFacet.java | 2 + .../facet/terms/TermsFacetProcessor.java | 6 +- .../facet/terms/ip/InternalIpTermsFacet.java | 262 ++++++++++++++++++ .../facet/terms/ip/TermsIpFacetCollector.java | 236 ++++++++++++++++ 4 files changed, 505 insertions(+), 1 deletion(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ip/InternalIpTermsFacet.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ip/TermsIpFacetCollector.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/InternalTermsFacet.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/InternalTermsFacet.java index f5c6202c46c..ba88156acb1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/InternalTermsFacet.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/InternalTermsFacet.java @@ -25,6 +25,7 @@ import org.elasticsearch.search.facet.terms.bytes.InternalByteTermsFacet; import org.elasticsearch.search.facet.terms.doubles.InternalDoubleTermsFacet; import org.elasticsearch.search.facet.terms.floats.InternalFloatTermsFacet; import org.elasticsearch.search.facet.terms.ints.InternalIntTermsFacet; +import org.elasticsearch.search.facet.terms.ip.InternalIpTermsFacet; import org.elasticsearch.search.facet.terms.longs.InternalLongTermsFacet; import org.elasticsearch.search.facet.terms.shorts.InternalShortTermsFacet; import org.elasticsearch.search.facet.terms.strings.InternalStringTermsFacet; @@ -44,6 +45,7 @@ public abstract class InternalTermsFacet implements TermsFacet, InternalFacet { InternalFloatTermsFacet.registerStream(); InternalShortTermsFacet.registerStream(); InternalByteTermsFacet.registerStream(); + InternalIpTermsFacet.registerStream(); } public abstract Facet reduce(String name, List facets); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetProcessor.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetProcessor.java index 9a56441cbb6..3bbfac5f28b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetProcessor.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetProcessor.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.field.data.FieldDataType; import org.elasticsearch.index.mapper.FieldMapper; +import org.elasticsearch.index.mapper.xcontent.ip.IpFieldMapper; import org.elasticsearch.search.facet.Facet; import org.elasticsearch.search.facet.FacetCollector; import org.elasticsearch.search.facet.FacetProcessor; @@ -36,6 +37,7 @@ import org.elasticsearch.search.facet.terms.doubles.TermsDoubleFacetCollector; import org.elasticsearch.search.facet.terms.floats.TermsFloatFacetCollector; import org.elasticsearch.search.facet.terms.index.IndexNameFacetCollector; import org.elasticsearch.search.facet.terms.ints.TermsIntFacetCollector; +import org.elasticsearch.search.facet.terms.ip.TermsIpFacetCollector; import org.elasticsearch.search.facet.terms.longs.TermsLongFacetCollector; import org.elasticsearch.search.facet.terms.shorts.TermsShortFacetCollector; import org.elasticsearch.search.facet.terms.strings.FieldsTermsStringFacetCollector; @@ -139,7 +141,9 @@ public class TermsFacetProcessor extends AbstractComponent implements FacetProce FieldMapper fieldMapper = context.mapperService().smartNameFieldMapper(field); if (fieldMapper != null) { - if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.LONG) { + if (fieldMapper instanceof IpFieldMapper) { + return new TermsIpFacetCollector(facetName, field, size, comparatorType, allTerms, context, scriptLang, script, params); + } else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.LONG) { return new TermsLongFacetCollector(facetName, field, size, comparatorType, allTerms, context, scriptLang, script, params); } else if (fieldMapper.fieldDataType() == FieldDataType.DefaultTypes.DOUBLE) { return new TermsDoubleFacetCollector(facetName, field, size, comparatorType, allTerms, context, scriptLang, script, params); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ip/InternalIpTermsFacet.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ip/InternalIpTermsFacet.java new file mode 100644 index 00000000000..9dc7cb12c9d --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ip/InternalIpTermsFacet.java @@ -0,0 +1,262 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.ip; + +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.iterator.TLongIntIterator; +import org.elasticsearch.common.trove.map.hash.TLongIntHashMap; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.mapper.xcontent.ip.IpFieldMapper; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.terms.InternalTermsFacet; +import org.elasticsearch.search.facet.terms.TermsFacet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +/** + * @author kimchy (shay.banon) + */ +public class InternalIpTermsFacet extends InternalTermsFacet { + + private static final String STREAM_TYPE = "ipTerms"; + + public static void registerStream() { + Streams.registerStream(STREAM, STREAM_TYPE); + } + + static Stream STREAM = new Stream() { + @Override public Facet readFacet(String type, StreamInput in) throws IOException { + return readTermsFacet(in); + } + }; + + @Override public String streamType() { + return STREAM_TYPE; + } + + public static class LongEntry implements Entry { + + long term; + int count; + + public LongEntry(long term, int count) { + this.term = term; + this.count = count; + } + + public String term() { + return IpFieldMapper.longToIp(term); + } + + public String getTerm() { + return term(); + } + + @Override public Number termAsNumber() { + return term; + } + + @Override public Number getTermAsNumber() { + return termAsNumber(); + } + + public int count() { + return count; + } + + public int getCount() { + return count(); + } + + @Override public int compareTo(Entry o) { + long anotherVal = ((LongEntry) o).term; + if (term < anotherVal) { + return -1; + } + if (term == anotherVal) { + int i = count - o.count(); + if (i == 0) { + i = System.identityHashCode(this) - System.identityHashCode(o); + } + return i; + } + return 1; + } + } + + private String name; + + int requiredSize; + + long missing; + + Collection entries = ImmutableList.of(); + + ComparatorType comparatorType; + + InternalIpTermsFacet() { + } + + public InternalIpTermsFacet(String name, ComparatorType comparatorType, int requiredSize, Collection entries, long missing) { + this.name = name; + this.comparatorType = comparatorType; + this.requiredSize = requiredSize; + this.entries = entries; + this.missing = missing; + } + + @Override public String name() { + return this.name; + } + + @Override public String getName() { + return this.name; + } + + @Override public String type() { + return TYPE; + } + + @Override public String getType() { + return type(); + } + + @Override public List entries() { + if (!(entries instanceof List)) { + entries = ImmutableList.copyOf(entries); + } + return (List) entries; + } + + @Override public List getEntries() { + return entries(); + } + + @SuppressWarnings({"unchecked"}) @Override public Iterator iterator() { + return (Iterator) entries.iterator(); + } + + @Override public long missingCount() { + return this.missing; + } + + @Override public long getMissingCount() { + return missingCount(); + } + + private static ThreadLocal> aggregateCache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new TLongIntHashMap()); + } + }; + + + @Override public Facet reduce(String name, List facets) { + if (facets.size() == 1) { + return facets.get(0); + } + InternalIpTermsFacet first = (InternalIpTermsFacet) facets.get(0); + TLongIntHashMap aggregated = aggregateCache.get().get(); + aggregated.clear(); + long missing = 0; + + for (Facet facet : facets) { + InternalIpTermsFacet mFacet = (InternalIpTermsFacet) facet; + missing += mFacet.missingCount(); + for (LongEntry entry : mFacet.entries) { + aggregated.adjustOrPutValue(entry.term, entry.count(), entry.count()); + } + } + + BoundedTreeSet ordered = new BoundedTreeSet(first.comparatorType.comparator(), first.requiredSize); + for (TLongIntIterator it = aggregated.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new LongEntry(it.key(), it.value())); + } + first.entries = ordered; + first.missing = missing; + return first; + } + + static final class Fields { + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString MISSING = new XContentBuilderString("missing"); + static final XContentBuilderString TERMS = new XContentBuilderString("terms"); + static final XContentBuilderString TERM = new XContentBuilderString("term"); + static final XContentBuilderString COUNT = new XContentBuilderString("count"); + } + + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name); + builder.field(Fields._TYPE, TermsFacet.TYPE); + builder.field(Fields.MISSING, missing); + builder.startArray(Fields.TERMS); + for (LongEntry entry : entries) { + builder.startObject(); + builder.field(Fields.TERM, entry.term()); // displayed as string + builder.field(Fields.COUNT, entry.count()); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + public static InternalIpTermsFacet readTermsFacet(StreamInput in) throws IOException { + InternalIpTermsFacet facet = new InternalIpTermsFacet(); + facet.readFrom(in); + return facet; + } + + @Override public void readFrom(StreamInput in) throws IOException { + name = in.readUTF(); + comparatorType = ComparatorType.fromId(in.readByte()); + requiredSize = in.readVInt(); + missing = in.readVLong(); + + int size = in.readVInt(); + entries = new ArrayList(size); + for (int i = 0; i < size; i++) { + entries.add(new LongEntry(in.readLong(), in.readVInt())); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(name); + out.writeByte(comparatorType.id()); + out.writeVInt(requiredSize); + out.writeVLong(missing); + + out.writeVInt(entries.size()); + for (LongEntry entry : entries) { + out.writeLong(entry.term); + out.writeVInt(entry.count()); + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ip/TermsIpFacetCollector.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ip/TermsIpFacetCollector.java new file mode 100644 index 00000000000..a2e636880c9 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facet/terms/ip/TermsIpFacetCollector.java @@ -0,0 +1,236 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms.ip; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.Scorer; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.collect.BoundedTreeSet; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.thread.ThreadLocals; +import org.elasticsearch.common.trove.iterator.TLongIntIterator; +import org.elasticsearch.common.trove.map.hash.TLongIntHashMap; +import org.elasticsearch.index.cache.field.data.FieldDataCache; +import org.elasticsearch.index.field.data.FieldDataType; +import org.elasticsearch.index.field.data.longs.LongFieldData; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.script.SearchScript; +import org.elasticsearch.search.facet.AbstractFacetCollector; +import org.elasticsearch.search.facet.Facet; +import org.elasticsearch.search.facet.FacetPhaseExecutionException; +import org.elasticsearch.search.facet.terms.TermsFacet; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; + +/** + * @author kimchy (shay.banon) + */ +public class TermsIpFacetCollector extends AbstractFacetCollector { + + static ThreadLocal>> cache = new ThreadLocal>>() { + @Override protected ThreadLocals.CleanableValue> initialValue() { + return new ThreadLocals.CleanableValue>(new ArrayDeque()); + } + }; + + + private final FieldDataCache fieldDataCache; + + private final String indexFieldName; + + private final TermsFacet.ComparatorType comparatorType; + + private final int size; + + private final int numberOfShards; + + private final FieldDataType fieldDataType; + + private LongFieldData fieldData; + + private final StaticAggregatorValueProc aggregator; + + private final SearchScript script; + + public TermsIpFacetCollector(String facetName, String fieldName, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, + String scriptLang, String script, Map params) { + super(facetName); + this.fieldDataCache = context.fieldDataCache(); + this.size = size; + this.comparatorType = comparatorType; + this.numberOfShards = context.numberOfShards(); + + MapperService.SmartNameFieldMappers smartMappers = context.mapperService().smartName(fieldName); + if (smartMappers == null || !smartMappers.hasMapper()) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] doesn't have a type, can't run terms long facet collector on it"); + } else { + // add type filter if there is exact doc mapper associated with it + if (smartMappers.hasDocMapper()) { + setFilter(context.filterCache().cache(smartMappers.docMapper().typeFilter())); + } + + if (smartMappers.mapper().fieldDataType() != FieldDataType.DefaultTypes.LONG) { + throw new ElasticSearchIllegalArgumentException("Field [" + fieldName + "] is not of long type, can't run terms long facet collector on it"); + } + + this.indexFieldName = smartMappers.mapper().names().indexName(); + this.fieldDataType = smartMappers.mapper().fieldDataType(); + } + + if (script != null) { + this.script = context.scriptService().search(context.lookup(), scriptLang, script, params); + } else { + this.script = null; + } + + if (this.script == null) { + aggregator = new StaticAggregatorValueProc(popFacets()); + } else { + aggregator = new AggregatorValueProc(popFacets(), this.script); + } + + if (allTerms) { + try { + for (IndexReader reader : context.searcher().subReaders()) { + LongFieldData fieldData = (LongFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName); + fieldData.forEachValue(aggregator); + } + } catch (Exception e) { + throw new FacetPhaseExecutionException(facetName, "failed to load all terms", e); + } + } + } + + @Override public void setScorer(Scorer scorer) throws IOException { + if (script != null) { + script.setScorer(scorer); + } + } + + @Override protected void doSetNextReader(IndexReader reader, int docBase) throws IOException { + fieldData = (LongFieldData) fieldDataCache.cache(fieldDataType, reader, indexFieldName); + if (script != null) { + script.setNextReader(reader); + } + } + + @Override protected void doCollect(int doc) throws IOException { + fieldData.forEachValueInDoc(doc, aggregator); + } + + @Override public Facet facet() { + TLongIntHashMap facets = aggregator.facets(); + if (facets.isEmpty()) { + pushFacets(facets); + return new InternalIpTermsFacet(facetName, comparatorType, size, ImmutableList.of(), aggregator.missing()); + } else { + // we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size * numberOfShards); + for (TLongIntIterator it = facets.iterator(); it.hasNext();) { + it.advance(); + ordered.add(new InternalIpTermsFacet.LongEntry(it.key(), it.value())); + } + pushFacets(facets); + return new InternalIpTermsFacet(facetName, comparatorType, size, ordered, aggregator.missing()); + } + } + + static TLongIntHashMap popFacets() { + Deque deque = cache.get().get(); + if (deque.isEmpty()) { + deque.add(new TLongIntHashMap()); + } + TLongIntHashMap facets = deque.pollFirst(); + facets.clear(); + return facets; + } + + static void pushFacets(TLongIntHashMap facets) { + facets.clear(); + Deque deque = cache.get().get(); + if (deque != null) { + deque.add(facets); + } + } + + public static class AggregatorValueProc extends StaticAggregatorValueProc { + + private final SearchScript script; + + public AggregatorValueProc(TLongIntHashMap facets, SearchScript script) { + super(facets); + this.script = script; + } + + @Override public void onValue(int docId, long value) { + if (script != null) { + script.setNextDocId(docId); + script.setNextVar("term", value); + Object scriptValue = script.run(); + if (scriptValue == null) { + return; + } + if (scriptValue instanceof Boolean) { + if (!((Boolean) scriptValue)) { + return; + } + } else { + value = ((Number) scriptValue).longValue(); + } + } + super.onValue(docId, value); + } + } + + public static class StaticAggregatorValueProc implements LongFieldData.ValueInDocProc, LongFieldData.ValueProc { + + private final TLongIntHashMap facets; + + private int missing; + + public StaticAggregatorValueProc(TLongIntHashMap facets) { + this.facets = facets; + } + + @Override public void onValue(long value) { + facets.putIfAbsent(value, 0); + } + + @Override public void onValue(int docId, long value) { + facets.adjustOrPutValue(value, 1, 1); + } + + @Override public void onMissing(int docId) { + missing++; + } + + public final TLongIntHashMap facets() { + return facets; + } + + public final int missing() { + return this.missing; + } + } +}