Make ip fields backward-compatible at query time. #18593

The fact that ip fields used a different doc values representation in 2.x causes
issues when querying 2.x and 5.0 indices in the same request. This changes 2.x
doc values on ip fields/2.x to be hidden behind binary doc values that use the
same encoding as 5.0. This way the coordinating node will be able to merge shard
responses that have different major versions.

One known issue is that this makes sorting/aggregating slower on ip fields for
indices that have been generated with elasticsearch 2.x.
This commit is contained in:
Adrien Grand 2016-05-26 15:26:17 +02:00
parent 85bf48b4c1
commit adf4712164
6 changed files with 457 additions and 11 deletions

View File

@ -35,20 +35,21 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.network.Cidrs;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData.NumericType;
import org.elasticsearch.index.fielddata.plain.DocValuesIndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.core.LegacyLongFieldMapper;
import org.elasticsearch.index.mapper.core.LegacyLongFieldMapper.CustomLongNumericField;
import org.elasticsearch.index.mapper.core.LegacyNumberFieldMapper;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.DocValueFormat;
import org.joda.time.DateTimeZone;
@ -249,7 +250,14 @@ public class LegacyIpFieldMapper extends LegacyNumberFieldMapper {
@Override
public IndexFieldData.Builder fielddataBuilder() {
failIfNoDocValues();
return new DocValuesIndexFieldData.Builder().numericType(NumericType.LONG);
return new IndexFieldData.Builder() {
@Override
public IndexFieldData<?> build(IndexSettings indexSettings,
MappedFieldType fieldType, IndexFieldDataCache cache,
CircuitBreakerService breakerService, MapperService mapperService) {
return new LegacyIpIndexFieldData(indexSettings.getIndex(), name());
}
};
}
@Override

View File

@ -0,0 +1,145 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper.ip;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.AtomicFieldData;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.Nested;
import org.elasticsearch.index.fielddata.fieldcomparator.BytesRefFieldComparatorSource;
import org.elasticsearch.search.MultiValueMode;
final class LegacyIpIndexFieldData implements IndexFieldData<AtomicFieldData> {
protected final Index index;
protected final String fieldName;
protected final ESLogger logger;
public LegacyIpIndexFieldData(Index index, String fieldName) {
this.index = index;
this.fieldName = fieldName;
this.logger = Loggers.getLogger(getClass());
}
public final String getFieldName() {
return fieldName;
}
public final void clear() {
// nothing to do
}
public final void clear(IndexReader reader) {
// nothing to do
}
public final Index index() {
return index;
}
@Override
public AtomicFieldData load(LeafReaderContext context) {
return new AtomicFieldData() {
@Override
public void close() {
// no-op
}
@Override
public long ramBytesUsed() {
return 0;
}
@Override
public ScriptDocValues<?> getScriptValues() {
throw new UnsupportedOperationException("Cannot run scripts on ip fields");
}
@Override
public SortedBinaryDocValues getBytesValues() {
SortedNumericDocValues values;
try {
values = DocValues.getSortedNumeric(context.reader(), fieldName);
} catch (IOException e) {
throw new IllegalStateException("Cannot load doc values", e);
}
return new SortedBinaryDocValues() {
final ByteBuffer scratch = ByteBuffer.allocate(4);
@Override
public BytesRef valueAt(int index) {
// we do not need to reorder ip addresses since both the numeric
// encoding of LegacyIpFieldMapper and the binary encoding of
// IpFieldMapper match the sort order of ip addresses
long ip = values.valueAt(index);
scratch.putInt(0, (int) ip);
InetAddress inet;
try {
inet = InetAddress.getByAddress(scratch.array());
} catch (UnknownHostException e) {
throw new IllegalStateException("Cannot happen", e);
}
byte[] encoded = InetAddressPoint.encode(inet);
return new BytesRef(encoded);
}
@Override
public void setDocument(int docId) {
values.setDocument(docId);
}
@Override
public int count() {
return values.count();
}
};
}
};
}
@Override
public AtomicFieldData loadDirect(LeafReaderContext context)
throws Exception {
return load(context);
}
@Override
public IndexFieldData.XFieldComparatorSource comparatorSource(
Object missingValue, MultiValueMode sortMode, Nested nested) {
return new BytesRefFieldComparatorSource(this, missingValue, sortMode, nested);
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.index.mapper.ip.LegacyIpFieldMapper;
import org.joda.time.DateTimeZone;
import java.io.IOException;
@ -284,12 +283,12 @@ public interface DocValueFormat extends NamedWriteable {
@Override
public String format(long value) {
return LegacyIpFieldMapper.longToIp(value);
throw new UnsupportedOperationException();
}
@Override
public String format(double value) {
return format((long) value);
throw new UnsupportedOperationException();
}
@Override
@ -301,13 +300,12 @@ public interface DocValueFormat extends NamedWriteable {
@Override
public long parseLong(String value, boolean roundUp, Callable<Long> now) {
// TODO: throw exception in 6.0
return LegacyIpFieldMapper.ipToLong(value);
throw new UnsupportedOperationException();
}
@Override
public double parseDouble(String value, boolean roundUp, Callable<Long> now) {
return parseLong(value, roundUp, now);
throw new UnsupportedOperationException();
}
@Override

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -104,8 +105,16 @@ public final class BinaryRangeAggregator extends BucketsAggregator {
collectBucket(sub, doc, bucket);
}
};
} else {
SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
return new SortedBinaryRangeLeafCollector(values, ranges, sub) {
@Override
protected void doCollect(LeafBucketCollector sub, int doc, long bucket)
throws IOException {
collectBucket(sub, doc, bucket);
}
};
}
throw new IllegalArgumentException("binary range aggregation expects a values source that supports ordinals");
}
static abstract class SortedSetRangeLeafCollector extends LeafBucketCollectorBase {
@ -214,6 +223,99 @@ public final class BinaryRangeAggregator extends BucketsAggregator {
protected abstract void doCollect(LeafBucketCollector sub, int doc, long bucket) throws IOException;
}
static abstract class SortedBinaryRangeLeafCollector extends LeafBucketCollectorBase {
final Range[] ranges;
final BytesRef[] maxTos;
final SortedBinaryDocValues values;
final LeafBucketCollector sub;
SortedBinaryRangeLeafCollector(SortedBinaryDocValues values,
Range[] ranges, LeafBucketCollector sub) {
super(sub, values);
for (int i = 1; i < ranges.length; ++i) {
if (RANGE_COMPARATOR.compare(ranges[i-1], ranges[i]) > 0) {
throw new IllegalArgumentException("Ranges must be sorted");
}
}
this.values = values;
this.sub = sub;
this.ranges = ranges;
maxTos = new BytesRef[ranges.length];
if (ranges.length > 0) {
maxTos[0] = ranges[0].to;
}
for (int i = 1; i < ranges.length; ++i) {
if (compare(ranges[i].to, maxTos[i-1], -1) >= 0) {
maxTos[i] = ranges[i].to;
} else {
maxTos[i] = maxTos[i-1];
}
}
}
@Override
public void collect(int doc, long bucket) throws IOException {
values.setDocument(doc);
final int valuesCount = values.count();
for (int i = 0, lo = 0; i < valuesCount; ++i) {
final BytesRef value = values.valueAt(i);
lo = collect(doc, value, bucket, lo);
}
}
private int collect(int doc, BytesRef value, long bucket, int lowBound) throws IOException {
int lo = lowBound, hi = ranges.length - 1; // all candidates are between these indexes
int mid = (lo + hi) >>> 1;
while (lo <= hi) {
if (compare(value, ranges[mid].from, 1) < 0) {
hi = mid - 1;
} else if (compare(value, maxTos[mid], -1) >= 0) {
lo = mid + 1;
} else {
break;
}
mid = (lo + hi) >>> 1;
}
if (lo > hi) return lo; // no potential candidate
// binary search the lower bound
int startLo = lo, startHi = mid;
while (startLo <= startHi) {
final int startMid = (startLo + startHi) >>> 1;
if (compare(value, maxTos[startMid], -1) >= 0) {
startLo = startMid + 1;
} else {
startHi = startMid - 1;
}
}
// binary search the upper bound
int endLo = mid, endHi = hi;
while (endLo <= endHi) {
final int endMid = (endLo + endHi) >>> 1;
if (compare(value, ranges[endMid].from, 1) < 0) {
endHi = endMid - 1;
} else {
endLo = endMid + 1;
}
}
assert startLo == lowBound || compare(value, maxTos[startLo - 1], -1) >= 0;
assert endHi == ranges.length - 1 || compare(value, ranges[endHi + 1].from, 1) < 0;
for (int i = startLo; i <= endHi; ++i) {
if (compare(value, ranges[i].to, -1) < 0) {
doCollect(sub, doc, bucket * ranges.length + i);
}
}
return endHi + 1;
}
protected abstract void doCollect(LeafBucketCollector sub, int doc, long bucket) throws IOException;
}
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
InternalBinaryRange.Bucket[] buckets = new InternalBinaryRange.Bucket[ranges.length];

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.bwcompat;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import java.util.Collection;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.range.Range;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
@ESIntegTestCase.SuiteScopeTestCase
public class IpFieldBwCompatIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(InternalSettingsPlugin.class); // uses index.merge.enabled
}
@Override
public void setupSuiteScopeCluster() throws Exception {
assertAcked(prepareCreate("old_index")
.setSettings(IndexMetaData.SETTING_VERSION_CREATED, Version.V_2_3_3.id)
.addMapping("type", "ip_field", "type=ip"));
assertAcked(prepareCreate("new_index")
.addMapping("type", "ip_field", "type=ip"));
indexRandom(true,
client().prepareIndex("old_index", "type", "1").setSource("ip_field", "127.0.0.1"),
client().prepareIndex("new_index", "type", "1").setSource("ip_field", "127.0.0.1"),
client().prepareIndex("new_index", "type", "2").setSource("ip_field", "::1"));
}
public void testSort() {
SearchResponse response = client().prepareSearch("old_index", "new_index")
.addSort(SortBuilders.fieldSort("ip_field")).get();
assertNoFailures(response);
assertEquals(3, response.getHits().totalHits());
assertEquals("::1", response.getHits().getAt(0).getSortValues()[0]);
assertEquals("127.0.0.1", response.getHits().getAt(1).getSortValues()[0]);
assertEquals("127.0.0.1", response.getHits().getAt(2).getSortValues()[0]);
}
public void testRangeAgg() {
SearchResponse response = client().prepareSearch("old_index", "new_index")
.addAggregation(AggregationBuilders.ipRange("ip_range").field("ip_field")
.addMaskRange("127.0.0.1/16")
.addMaskRange("::1/64")).get();
assertNoFailures(response);
assertEquals(3, response.getHits().totalHits());
Range range = response.getAggregations().get("ip_range");
assertEquals(2, range.getBuckets().size());
assertEquals("::1/64", range.getBuckets().get(0).getKeyAsString());
assertEquals(3, range.getBuckets().get(0).getDocCount());
assertEquals("127.0.0.1/16", range.getBuckets().get(1).getKeyAsString());
assertEquals(2, range.getBuckets().get(1).getDocCount());
}
public void testTermsAgg() {
SearchResponse response = client().prepareSearch("old_index", "new_index")
.addAggregation(AggregationBuilders.terms("ip_terms").field("ip_field")).get();
assertNoFailures(response);
assertEquals(3, response.getHits().totalHits());
Terms terms = response.getAggregations().get("ip_terms");
assertEquals(2, terms.getBuckets().size());
assertEquals(2, terms.getBucketByKey("127.0.0.1").getDocCount());
assertEquals(1, terms.getBucketByKey("::1").getDocCount());
}
}

View File

@ -26,7 +26,9 @@ import java.util.Set;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.range.BinaryRangeAggregator.SortedBinaryRangeLeafCollector;
import org.elasticsearch.search.aggregations.bucket.range.BinaryRangeAggregator.SortedSetRangeLeafCollector;
import org.elasticsearch.test.ESTestCase;
@ -139,4 +141,101 @@ public class BinaryRangeAggregatorTests extends ESTestCase {
doTestSortedSetRangeLeafCollector(5);
}
}
private static class FakeSortedBinaryDocValues extends SortedBinaryDocValues {
private final BytesRef[] terms;
long[] ords;
FakeSortedBinaryDocValues(BytesRef[] terms) {
this.terms = terms;
}
@Override
public void setDocument(int docID) {
// no-op
}
@Override
public int count() {
return ords.length;
}
@Override
public BytesRef valueAt(int index) {
return terms[(int) ords[index]];
}
}
private void doTestSortedBinaryRangeLeafCollector(int maxNumValuesPerDoc) throws Exception {
final Set<BytesRef> termSet = new HashSet<>();
final int numTerms = TestUtil.nextInt(random(), maxNumValuesPerDoc, 100);
while (termSet.size() < numTerms) {
termSet.add(new BytesRef(TestUtil.randomSimpleString(random(), randomInt(2))));
}
final BytesRef[] terms = termSet.toArray(new BytesRef[0]);
Arrays.sort(terms);
final int numRanges = randomIntBetween(1, 10);
BinaryRangeAggregator.Range[] ranges = new BinaryRangeAggregator.Range[numRanges];
for (int i = 0; i < numRanges; ++i) {
ranges[i] = new BinaryRangeAggregator.Range(Integer.toString(i),
randomBoolean() ? null : new BytesRef(TestUtil.randomSimpleString(random(), randomInt(2))),
randomBoolean() ? null : new BytesRef(TestUtil.randomSimpleString(random(), randomInt(2))));
}
Arrays.sort(ranges, BinaryRangeAggregator.RANGE_COMPARATOR);
FakeSortedBinaryDocValues values = new FakeSortedBinaryDocValues(terms);
final int[] counts = new int[ranges.length];
SortedBinaryRangeLeafCollector collector = new SortedBinaryRangeLeafCollector(values, ranges, null) {
@Override
protected void doCollect(LeafBucketCollector sub, int doc, long bucket) throws IOException {
counts[(int) bucket]++;
}
};
final int[] expectedCounts = new int[ranges.length];
final int maxDoc = randomIntBetween(5, 10);
for (int doc = 0; doc < maxDoc; ++doc) {
LongHashSet ordinalSet = new LongHashSet();
final int numValues = randomInt(maxNumValuesPerDoc);
while (ordinalSet.size() < numValues) {
ordinalSet.add(random().nextInt(terms.length));
}
final long[] ords = ordinalSet.toArray();
Arrays.sort(ords);
values.ords = ords;
// simulate aggregation
collector.collect(doc);
// now do it the naive way
for (int i = 0; i < ranges.length; ++i) {
for (long ord : ords) {
BytesRef term = terms[(int) ord];
if ((ranges[i].from == null || ranges[i].from.compareTo(term) <= 0)
&& (ranges[i].to == null || ranges[i].to.compareTo(term) > 0)) {
expectedCounts[i]++;
break;
}
}
}
}
assertArrayEquals(expectedCounts, counts);
}
public void testSortedBinaryRangeLeafCollectorSingleValued() throws Exception {
final int iters = randomInt(10);
for (int i = 0; i < iters; ++i) {
doTestSortedBinaryRangeLeafCollector(1);
}
}
public void testSortedBinaryRangeLeafCollectorMultiValued() throws Exception {
final int iters = randomInt(10);
for (int i = 0; i < iters; ++i) {
doTestSortedBinaryRangeLeafCollector(5);
}
}
}