From 36991192729d244f211f8c29e810c8437967c449 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Thu, 23 Jul 2015 00:53:23 +0200 Subject: [PATCH 01/12] Remove unused QueryParseContext argument in MappedFieldType#rangeQuery() The rangeQuery() method in MappedFieldType and some overwriting subtypes takes a nullable QueryParseContext argument which is unused and can be deleted. This is also useful for the current query parsing refactoring, since we want to avoid passing the context object around unnecessarily. --- .../classic/MapperQueryParser.java | 4 ++-- .../index/mapper/MappedFieldType.java | 4 ++-- .../index/mapper/core/ByteFieldMapper.java | 5 +--- .../index/mapper/core/DateFieldMapper.java | 15 ++++++++---- .../index/mapper/core/DoubleFieldMapper.java | 5 ++-- .../index/mapper/core/FloatFieldMapper.java | 5 ++-- .../index/mapper/core/IntegerFieldMapper.java | 5 +--- .../index/mapper/core/LongFieldMapper.java | 5 +--- .../index/mapper/core/ShortFieldMapper.java | 5 +--- .../index/mapper/ip/IpFieldMapper.java | 5 +--- .../index/query/ExistsQueryParser.java | 3 +-- .../index/query/MissingQueryParser.java | 2 +- .../index/query/RangeQueryParser.java | 4 ++-- .../geo/IndexedGeoBoundingBoxQuery.java | 10 ++++---- .../indices/ttl/IndicesTTLService.java | 2 +- .../mapper/date/SimpleDateMappingTests.java | 24 +++++++++---------- 16 files changed, 45 insertions(+), 58 deletions(-) diff --git a/core/src/main/java/org/apache/lucene/queryparser/classic/MapperQueryParser.java b/core/src/main/java/org/apache/lucene/queryparser/classic/MapperQueryParser.java index 065617ce5ee..4cfdc256418 100644 --- a/core/src/main/java/org/apache/lucene/queryparser/classic/MapperQueryParser.java +++ b/core/src/main/java/org/apache/lucene/queryparser/classic/MapperQueryParser.java @@ -374,9 +374,9 @@ public class MapperQueryParser extends QueryParser { Query rangeQuery; if (currentFieldType instanceof DateFieldMapper.DateFieldType && settings.timeZone() != null) { DateFieldMapper.DateFieldType dateFieldType = (DateFieldMapper.DateFieldType) this.currentFieldType; - rangeQuery = dateFieldType.rangeQuery(part1, part2, startInclusive, endInclusive, settings.timeZone(), null, parseContext); + rangeQuery = dateFieldType.rangeQuery(part1, part2, startInclusive, endInclusive, settings.timeZone(), null); } else { - rangeQuery = currentFieldType.rangeQuery(part1, part2, startInclusive, endInclusive, parseContext); + rangeQuery = currentFieldType.rangeQuery(part1, part2, startInclusive, endInclusive); } return rangeQuery; } catch (RuntimeException e) { diff --git a/core/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java b/core/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java index fff041090a8..7dc78ddd5e0 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java @@ -30,7 +30,6 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lucene.BytesRefs; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Fuzziness; import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.fielddata.FieldDataType; @@ -186,6 +185,7 @@ public abstract class MappedFieldType extends FieldType { fieldDataType = new FieldDataType(typeName()); } + @Override public abstract MappedFieldType clone(); @Override @@ -449,7 +449,7 @@ public abstract class MappedFieldType extends FieldType { return new TermsQuery(names.indexName(), bytesRefs); } - public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) { + public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) { return new TermRangeQuery(names().indexName(), lowerTerm == null ? null : indexedValueForSearch(lowerTerm), upperTerm == null ? null : indexedValueForSearch(upperTerm), diff --git a/core/src/main/java/org/elasticsearch/index/mapper/core/ByteFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/core/ByteFieldMapper.java index 92944f7e8fd..61b22a1ee26 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/core/ByteFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/core/ByteFieldMapper.java @@ -30,7 +30,6 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Explicit; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Fuzziness; @@ -42,8 +41,6 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.query.QueryParseContext; - import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -163,7 +160,7 @@ public class ByteFieldMapper extends NumberFieldMapper { } @Override - public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) { + public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) { return NumericRangeQuery.newIntRange(names().indexName(), numericPrecisionStep(), lowerTerm == null ? null : (int)parseValue(lowerTerm), upperTerm == null ? null : (int)parseValue(upperTerm), diff --git a/core/src/main/java/org/elasticsearch/index/mapper/core/DateFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/core/DateFieldMapper.java index 5a96541d91b..c14b6b2ae11 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/core/DateFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/core/DateFieldMapper.java @@ -51,12 +51,15 @@ import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.core.LongFieldMapper.CustomLongNumericField; -import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.search.internal.SearchContext; import org.joda.time.DateTimeZone; import java.io.IOException; -import java.util.*; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -124,6 +127,7 @@ public class DateFieldMapper extends NumberFieldMapper { return fieldMapper; } + @Override protected void setupFieldType(BuilderContext context) { if (Version.indexCreated(context.indexSettings()).before(Version.V_2_0_0_beta1) && !fieldType().dateTimeFormatter().format().contains("epoch_")) { @@ -277,6 +281,7 @@ public class DateFieldMapper extends NumberFieldMapper { this.dateMathParser = ref.dateMathParser; } + @Override public DateFieldType clone() { return new DateFieldType(this); } @@ -390,8 +395,8 @@ public class DateFieldMapper extends NumberFieldMapper { } @Override - public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) { - return rangeQuery(lowerTerm, upperTerm, includeLower, includeUpper, null, null, context); + public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) { + return rangeQuery(lowerTerm, upperTerm, includeLower, includeUpper, null, null); } @Override @@ -419,7 +424,7 @@ public class DateFieldMapper extends NumberFieldMapper { ); } - public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable DateTimeZone timeZone, @Nullable DateMathParser forcedDateParser, @Nullable QueryParseContext context) { + public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable DateTimeZone timeZone, @Nullable DateMathParser forcedDateParser) { return new LateParsingQuery(lowerTerm, upperTerm, includeLower, includeUpper, timeZone, forcedDateParser); } diff --git a/core/src/main/java/org/elasticsearch/index/mapper/core/DoubleFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/core/DoubleFieldMapper.java index 49ea3c2e25a..7f06c223e62 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/core/DoubleFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/core/DoubleFieldMapper.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.mapper.core; import com.carrotsearch.hppc.DoubleArrayList; + import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.document.Field; @@ -32,7 +33,6 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Explicit; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Fuzziness; @@ -46,7 +46,6 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.query.QueryParseContext; import java.io.IOException; import java.util.Iterator; @@ -169,7 +168,7 @@ public class DoubleFieldMapper extends NumberFieldMapper { } @Override - public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) { + public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) { return NumericRangeQuery.newDoubleRange(names().indexName(), numericPrecisionStep(), lowerTerm == null ? null : parseDoubleValue(lowerTerm), upperTerm == null ? null : parseDoubleValue(upperTerm), diff --git a/core/src/main/java/org/elasticsearch/index/mapper/core/FloatFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/core/FloatFieldMapper.java index 8012672c369..caeb2d7a188 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/core/FloatFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/core/FloatFieldMapper.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.mapper.core; import com.carrotsearch.hppc.FloatArrayList; + import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.document.Field; @@ -32,7 +33,6 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Explicit; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -47,7 +47,6 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.query.QueryParseContext; import java.io.IOException; import java.util.Iterator; @@ -170,7 +169,7 @@ public class FloatFieldMapper extends NumberFieldMapper { } @Override - public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) { + public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) { return NumericRangeQuery.newFloatRange(names().indexName(), numericPrecisionStep(), lowerTerm == null ? null : parseValue(lowerTerm), upperTerm == null ? null : parseValue(upperTerm), diff --git a/core/src/main/java/org/elasticsearch/index/mapper/core/IntegerFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/core/IntegerFieldMapper.java index 1271de9808b..868cfeb4380 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/core/IntegerFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/core/IntegerFieldMapper.java @@ -31,7 +31,6 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Explicit; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -44,8 +43,6 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.query.QueryParseContext; - import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -172,7 +169,7 @@ public class IntegerFieldMapper extends NumberFieldMapper { } @Override - public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) { + public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) { return NumericRangeQuery.newIntRange(names().indexName(), numericPrecisionStep(), lowerTerm == null ? null : parseValue(lowerTerm), upperTerm == null ? null : parseValue(upperTerm), diff --git a/core/src/main/java/org/elasticsearch/index/mapper/core/LongFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/core/LongFieldMapper.java index 9542b508a19..4130c902586 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/core/LongFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/core/LongFieldMapper.java @@ -31,7 +31,6 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Explicit; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -44,8 +43,6 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.query.QueryParseContext; - import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -170,7 +167,7 @@ public class LongFieldMapper extends NumberFieldMapper { } @Override - public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) { + public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) { return NumericRangeQuery.newLongRange(names().indexName(), numericPrecisionStep(), lowerTerm == null ? null : parseLongValue(lowerTerm), upperTerm == null ? null : parseLongValue(upperTerm), diff --git a/core/src/main/java/org/elasticsearch/index/mapper/core/ShortFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/core/ShortFieldMapper.java index b40d570e69d..81ed6cc3bac 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/core/ShortFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/core/ShortFieldMapper.java @@ -31,7 +31,6 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Explicit; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -44,8 +43,6 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.query.QueryParseContext; - import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -168,7 +165,7 @@ public class ShortFieldMapper extends NumberFieldMapper { } @Override - public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) { + public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) { return NumericRangeQuery.newIntRange(names().indexName(), numericPrecisionStep(), lowerTerm == null ? null : (int)parseValue(lowerTerm), upperTerm == null ? null : (int)parseValue(upperTerm), diff --git a/core/src/main/java/org/elasticsearch/index/mapper/ip/IpFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/ip/IpFieldMapper.java index 8e4e7c48783..1ac34df5063 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/ip/IpFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/ip/IpFieldMapper.java @@ -29,7 +29,6 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.common.Explicit; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -47,8 +46,6 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.core.LongFieldMapper; import org.elasticsearch.index.mapper.core.LongFieldMapper.CustomLongNumericField; import org.elasticsearch.index.mapper.core.NumberFieldMapper; -import org.elasticsearch.index.query.QueryParseContext; - import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -209,7 +206,7 @@ public class IpFieldMapper extends NumberFieldMapper { } @Override - public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) { + public Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper) { return NumericRangeQuery.newLongRange(names().indexName(), numericPrecisionStep(), lowerTerm == null ? null : parseValue(lowerTerm), upperTerm == null ? null : parseValue(upperTerm), diff --git a/core/src/main/java/org/elasticsearch/index/query/ExistsQueryParser.java b/core/src/main/java/org/elasticsearch/index/query/ExistsQueryParser.java index 2617d143e43..0ce578caad5 100644 --- a/core/src/main/java/org/elasticsearch/index/query/ExistsQueryParser.java +++ b/core/src/main/java/org/elasticsearch/index/query/ExistsQueryParser.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.MappedFieldType; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.internal.FieldNamesFieldMapper; import org.elasticsearch.index.mapper.object.ObjectMapper; @@ -111,7 +110,7 @@ public class ExistsQueryParser implements QueryParser { } // if _field_names are not indexed, we need to go the slow way if (filter == null && fieldType != null) { - filter = fieldType.rangeQuery(null, null, true, true, parseContext); + filter = fieldType.rangeQuery(null, null, true, true); } if (filter == null) { filter = new TermRangeQuery(field, null, null, true, true); diff --git a/core/src/main/java/org/elasticsearch/index/query/MissingQueryParser.java b/core/src/main/java/org/elasticsearch/index/query/MissingQueryParser.java index f76478ffba0..6ef19d7e4f6 100644 --- a/core/src/main/java/org/elasticsearch/index/query/MissingQueryParser.java +++ b/core/src/main/java/org/elasticsearch/index/query/MissingQueryParser.java @@ -133,7 +133,7 @@ public class MissingQueryParser implements QueryParser { } // if _field_names are not indexed, we need to go the slow way if (filter == null && fieldType != null) { - filter = fieldType.rangeQuery(null, null, true, true, parseContext); + filter = fieldType.rangeQuery(null, null, true, true); } if (filter == null) { filter = new TermRangeQuery(field, null, null, true, true); diff --git a/core/src/main/java/org/elasticsearch/index/query/RangeQueryParser.java b/core/src/main/java/org/elasticsearch/index/query/RangeQueryParser.java index bf1af881192..6e4cd451ee9 100644 --- a/core/src/main/java/org/elasticsearch/index/query/RangeQueryParser.java +++ b/core/src/main/java/org/elasticsearch/index/query/RangeQueryParser.java @@ -123,14 +123,14 @@ public class RangeQueryParser implements QueryParser { MappedFieldType mapper = parseContext.fieldMapper(fieldName); if (mapper != null) { if (mapper instanceof DateFieldMapper.DateFieldType) { - query = ((DateFieldMapper.DateFieldType) mapper).rangeQuery(from, to, includeLower, includeUpper, timeZone, forcedDateParser, parseContext); + query = ((DateFieldMapper.DateFieldType) mapper).rangeQuery(from, to, includeLower, includeUpper, timeZone, forcedDateParser); } else { if (timeZone != null) { throw new QueryParsingException(parseContext, "[range] time_zone can not be applied to non date field [" + fieldName + "]"); } //LUCENE 4 UPGRADE Mapper#rangeQuery should use bytesref as well? - query = mapper.rangeQuery(from, to, includeLower, includeUpper, parseContext); + query = mapper.rangeQuery(from, to, includeLower, includeUpper); } } if (query == null) { diff --git a/core/src/main/java/org/elasticsearch/index/search/geo/IndexedGeoBoundingBoxQuery.java b/core/src/main/java/org/elasticsearch/index/search/geo/IndexedGeoBoundingBoxQuery.java index 265eaca49dc..117914518b4 100644 --- a/core/src/main/java/org/elasticsearch/index/search/geo/IndexedGeoBoundingBoxQuery.java +++ b/core/src/main/java/org/elasticsearch/index/search/geo/IndexedGeoBoundingBoxQuery.java @@ -45,16 +45,16 @@ public class IndexedGeoBoundingBoxQuery { private static Query westGeoBoundingBoxFilter(GeoPoint topLeft, GeoPoint bottomRight, GeoPointFieldMapper.GeoPointFieldType fieldType) { BooleanQuery filter = new BooleanQuery(); filter.setMinimumNumberShouldMatch(1); - filter.add(fieldType.lonFieldType().rangeQuery(null, bottomRight.lon(), true, true, null), Occur.SHOULD); - filter.add(fieldType.lonFieldType().rangeQuery(topLeft.lon(), null, true, true, null), Occur.SHOULD); - filter.add(fieldType.latFieldType().rangeQuery(bottomRight.lat(), topLeft.lat(), true, true, null), Occur.MUST); + filter.add(fieldType.lonFieldType().rangeQuery(null, bottomRight.lon(), true, true), Occur.SHOULD); + filter.add(fieldType.lonFieldType().rangeQuery(topLeft.lon(), null, true, true), Occur.SHOULD); + filter.add(fieldType.latFieldType().rangeQuery(bottomRight.lat(), topLeft.lat(), true, true), Occur.MUST); return new ConstantScoreQuery(filter); } private static Query eastGeoBoundingBoxFilter(GeoPoint topLeft, GeoPoint bottomRight, GeoPointFieldMapper.GeoPointFieldType fieldType) { BooleanQuery filter = new BooleanQuery(); - filter.add(fieldType.lonFieldType().rangeQuery(topLeft.lon(), bottomRight.lon(), true, true, null), Occur.MUST); - filter.add(fieldType.latFieldType().rangeQuery(bottomRight.lat(), topLeft.lat(), true, true, null), Occur.MUST); + filter.add(fieldType.lonFieldType().rangeQuery(topLeft.lon(), bottomRight.lon(), true, true), Occur.MUST); + filter.add(fieldType.latFieldType().rangeQuery(bottomRight.lat(), topLeft.lat(), true, true), Occur.MUST); return new ConstantScoreQuery(filter); } } diff --git a/core/src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java b/core/src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java index 2eb1c7d4a05..de44343a29d 100644 --- a/core/src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java +++ b/core/src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java @@ -196,7 +196,7 @@ public class IndicesTTLService extends AbstractLifecycleComponent shardsToPurge) { for (IndexShard shardToPurge : shardsToPurge) { - Query query = shardToPurge.indexService().mapperService().smartNameFieldType(TTLFieldMapper.NAME).rangeQuery(null, System.currentTimeMillis(), false, true, null); + Query query = shardToPurge.indexService().mapperService().smartNameFieldType(TTLFieldMapper.NAME).rangeQuery(null, System.currentTimeMillis(), false, true); Engine.Searcher searcher = shardToPurge.acquireSearcher("indices_ttl"); try { logger.debug("[{}][{}] purging shard", shardToPurge.routingEntry().index(), shardToPurge.routingEntry().id()); diff --git a/core/src/test/java/org/elasticsearch/index/mapper/date/SimpleDateMappingTests.java b/core/src/test/java/org/elasticsearch/index/mapper/date/SimpleDateMappingTests.java index fc641b7e4e1..8db412ffc83 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/date/SimpleDateMappingTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/date/SimpleDateMappingTests.java @@ -61,7 +61,7 @@ import static org.elasticsearch.index.mapper.string.SimpleStringMappingTests.doc import static org.hamcrest.Matchers.*; public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { - + public void testAutomaticDateParser() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") .startObject("properties").endObject() @@ -93,12 +93,12 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { fieldMapper = defaultMapper.mappers().smartNameFieldMapper("wrong_date3"); assertThat(fieldMapper, instanceOf(StringFieldMapper.class)); } - + public void testParseLocal() { assertThat(Locale.GERMAN, equalTo(LocaleUtils.parse("de"))); assertThat(Locale.GERMANY, equalTo(LocaleUtils.parse("de_DE"))); assertThat(new Locale("de","DE","DE"), equalTo(LocaleUtils.parse("de_DE_DE"))); - + try { LocaleUtils.parse("de_DE_DE_DE"); fail(); @@ -108,7 +108,7 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { assertThat(Locale.ROOT, equalTo(LocaleUtils.parse(""))); assertThat(Locale.ROOT, equalTo(LocaleUtils.parse("ROOT"))); } - + public void testLocale() throws IOException { assumeFalse("Locals are buggy on JDK9EA", Constants.JRE_IS_MINIMUM_JAVA9 && systemPropertyAsBoolean("tests.security.manager", false)); String mapping = XContentFactory.jsonBuilder() @@ -169,7 +169,7 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { private void assertNumericTokensEqual(ParsedDocument doc, DocumentMapper defaultMapper, String fieldA, String fieldB) throws IOException { assertThat(doc.rootDoc().getField(fieldA).tokenStream(defaultMapper.mappers().indexAnalyzer(), null), notNullValue()); assertThat(doc.rootDoc().getField(fieldB).tokenStream(defaultMapper.mappers().indexAnalyzer(), null), notNullValue()); - + TokenStream tokenStream = doc.rootDoc().getField(fieldA).tokenStream(defaultMapper.mappers().indexAnalyzer(), null); tokenStream.reset(); NumericTermAttribute nta = tokenStream.addAttribute(NumericTermAttribute.class); @@ -177,7 +177,7 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { while(tokenStream.incrementToken()) { values.add(nta.getRawValue()); } - + tokenStream = doc.rootDoc().getField(fieldB).tokenStream(defaultMapper.mappers().indexAnalyzer(), null); tokenStream.reset(); nta = tokenStream.addAttribute(NumericTermAttribute.class); @@ -223,7 +223,7 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { assertThat(doc.rootDoc().get("date_field"), equalTo("1262304000000")); assertThat(doc.rootDoc().get("date_field_x"), equalTo("2010-01-01")); } - + public void testHourFormat() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") .field("date_detection", false) @@ -242,14 +242,14 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { NumericRangeQuery rangeQuery; try { SearchContext.setCurrent(new TestSearchContext()); - rangeQuery = (NumericRangeQuery) defaultMapper.mappers().smartNameFieldMapper("date_field").fieldType().rangeQuery("10:00:00", "11:00:00", true, true, null).rewrite(null); + rangeQuery = (NumericRangeQuery) defaultMapper.mappers().smartNameFieldMapper("date_field").fieldType().rangeQuery("10:00:00", "11:00:00", true, true).rewrite(null); } finally { SearchContext.removeCurrent(); } assertThat(rangeQuery.getMax(), equalTo(new DateTime(TimeValue.timeValueHours(11).millis(), DateTimeZone.UTC).getMillis())); assertThat(rangeQuery.getMin(), equalTo(new DateTime(TimeValue.timeValueHours(10).millis(), DateTimeZone.UTC).getMillis())); } - + public void testDayWithoutYearFormat() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") .field("date_detection", false) @@ -268,14 +268,14 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { NumericRangeQuery rangeQuery; try { SearchContext.setCurrent(new TestSearchContext()); - rangeQuery = (NumericRangeQuery) defaultMapper.mappers().smartNameFieldMapper("date_field").fieldType().rangeQuery("Jan 02 10:00:00", "Jan 02 11:00:00", true, true, null).rewrite(null); + rangeQuery = (NumericRangeQuery) defaultMapper.mappers().smartNameFieldMapper("date_field").fieldType().rangeQuery("Jan 02 10:00:00", "Jan 02 11:00:00", true, true).rewrite(null); } finally { SearchContext.removeCurrent(); } assertThat(rangeQuery.getMax(), equalTo(new DateTime(TimeValue.timeValueHours(35).millis(), DateTimeZone.UTC).getMillis())); assertThat(rangeQuery.getMin(), equalTo(new DateTime(TimeValue.timeValueHours(34).millis(), DateTimeZone.UTC).getMillis())); } - + public void testIgnoreMalformedOption() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") .startObject("properties") @@ -377,7 +377,7 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { Map mergedConfig = getConfigurationViaXContent(mergedFieldMapper); assertThat(mergedConfig.get("format"), is("EEE MMM dd HH:mm:ss.S Z yyyy||EEE MMM dd HH:mm:ss.SSS Z yyyy||yyyy-MM-dd'T'HH:mm:ss.SSSZZ")); } - + public void testDefaultDocValues() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") .startObject("properties").startObject("date_field").field("type", "date").endObject().endObject() From cafc7078e228ab696d0689ec8b2119cb1626e9cd Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 21 Jul 2015 16:44:58 +0200 Subject: [PATCH 02/12] Removing TransportSingleCustomOperationAction in favour of TransportSingleShardAction to clean up code. The TransportSingleCustomOperationAction `prefer_local` option has been removed as it isn't worth the effort. The TransportSingleShardAction will execute the operation on the receiving node if a concrete list doesn't provide a list of candite shards routings to perform the operation on. --- .../admin/indices/analyze/AnalyzeRequest.java | 6 +- .../analyze/AnalyzeRequestBuilder.java | 13 +- .../analyze/TransportAnalyzeAction.java | 4 +- .../get/GetFieldMappingsIndexRequest.java | 20 +- .../TransportGetFieldMappingsIndexAction.java | 4 +- .../action/explain/ExplainRequest.java | 2 +- .../explain/TransportExplainAction.java | 2 +- .../elasticsearch/action/get/GetRequest.java | 2 +- .../action/get/MultiGetShardRequest.java | 6 + .../action/get/TransportGetAction.java | 2 +- .../get/TransportShardMultiGetAction.java | 3 +- .../TransportShardMultiPercolateAction.java | 8 +- .../custom/SingleCustomOperationRequest.java | 142 -------- .../SingleCustomOperationRequestBuilder.java | 56 ---- .../TransportSingleCustomOperationAction.java | 314 ------------------ .../single/shard/SingleShardRequest.java | 31 +- .../shard/TransportSingleShardAction.java | 53 ++- .../MultiTermVectorsShardRequest.java | 6 + .../termvectors/TermVectorsRequest.java | 2 +- .../TransportShardMultiTermsVectorAction.java | 2 +- .../TransportTermVectorsAction.java | 2 +- .../indices/analyze/RestAnalyzeAction.java | 3 - docs/reference/migration/migrate_2_0.asciidoc | 6 + 23 files changed, 117 insertions(+), 572 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequest.java delete mode 100644 core/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequestBuilder.java delete mode 100644 core/src/main/java/org/elasticsearch/action/support/single/custom/TransportSingleCustomOperationAction.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequest.java index 655defeddaf..6482e340d1a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequest.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.analyze; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -32,7 +32,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * A request to analyze a text associated with a specific index. Allow to provide * the actual analyzer name to perform the analysis with. */ -public class AnalyzeRequest extends SingleCustomOperationRequest { +public class AnalyzeRequest extends SingleShardRequest { private String[] text; @@ -114,7 +114,7 @@ public class AnalyzeRequest extends SingleCustomOperationRequest @Override public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = super.validate(); + ActionRequestValidationException validationException = null; if (text == null || text.length == 0) { validationException = addValidationError("text is missing", validationException); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequestBuilder.java index 2707419e304..9ed02e6be1c 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequestBuilder.java @@ -18,13 +18,13 @@ */ package org.elasticsearch.action.admin.indices.analyze; -import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequestBuilder; +import org.elasticsearch.action.support.single.shard.SingleShardOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; /** * */ -public class AnalyzeRequestBuilder extends SingleCustomOperationRequestBuilder { +public class AnalyzeRequestBuilder extends SingleShardOperationRequestBuilder { public AnalyzeRequestBuilder(ElasticsearchClient client, AnalyzeAction action) { super(client, action, new AnalyzeRequest()); @@ -34,15 +34,6 @@ public class AnalyzeRequestBuilder extends SingleCustomOperationRequestBuilder { +public class TransportAnalyzeAction extends TransportSingleShardAction { private final IndicesService indicesService; private final IndicesAnalysisService indicesAnalysisService; diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsIndexRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsIndexRequest.java index b8a14801199..fefcce66ffa 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsIndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsIndexRequest.java @@ -19,16 +19,17 @@ package org.elasticsearch.action.admin.indices.mapping.get; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; -class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest { +class GetFieldMappingsIndexRequest extends SingleShardRequest { private boolean probablySingleFieldRequest; private boolean includeDefaults; @@ -42,7 +43,6 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest { +public class TransportGetFieldMappingsIndexAction extends TransportSingleShardAction { private static final String ACTION_NAME = GetFieldMappingsAction.NAME + "[index]"; diff --git a/core/src/main/java/org/elasticsearch/action/explain/ExplainRequest.java b/core/src/main/java/org/elasticsearch/action/explain/ExplainRequest.java index 8a8eaee36cf..e7d703ed33d 100644 --- a/core/src/main/java/org/elasticsearch/action/explain/ExplainRequest.java +++ b/core/src/main/java/org/elasticsearch/action/explain/ExplainRequest.java @@ -152,7 +152,7 @@ public class ExplainRequest extends SingleShardRequest { @Override public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = super.validate(); + ActionRequestValidationException validationException = super.validateNonNullIndex(); if (type == null) { validationException = ValidateActions.addValidationError("type is missing", validationException); } diff --git a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index c8312613361..ed8de6c8ef9 100644 --- a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -88,7 +88,7 @@ public class TransportExplainAction extends TransportSingleShardAction { @Override public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = super.validate(); + ActionRequestValidationException validationException = super.validateNonNullIndex(); if (type == null) { validationException = ValidateActions.addValidationError("type is missing", validationException); } diff --git a/core/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java b/core/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java index 8a6d552807b..a45c1c1280a 100644 --- a/core/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java +++ b/core/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.get; import com.carrotsearch.hppc.IntArrayList; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.single.shard.SingleShardRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -54,6 +55,11 @@ public class MultiGetShardRequest extends SingleShardRequest(); } + @Override + public ActionRequestValidationException validate() { + return super.validateNonNullIndex(); + } + @Override public String[] indices() { List indices = new ArrayList<>(); diff --git a/core/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequest.java b/core/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequest.java deleted file mode 100644 index e1fc74dcb28..00000000000 --- a/core/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequest.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.support.single.custom; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; - -/** - * - */ -public abstract class SingleCustomOperationRequest extends ActionRequest implements IndicesRequest { - - ShardId internalShardId; - - private boolean threadedOperation = true; - private boolean preferLocal = true; - private String index; - - protected SingleCustomOperationRequest() { - } - - protected SingleCustomOperationRequest(ActionRequest request) { - super(request); - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - /** - * Controls if the operation will be executed on a separate thread when executed locally. - */ - public boolean operationThreaded() { - return threadedOperation; - } - - /** - * Controls if the operation will be executed on a separate thread when executed locally. - */ - @SuppressWarnings("unchecked") - public final T operationThreaded(boolean threadedOperation) { - this.threadedOperation = threadedOperation; - return (T) this; - } - - /** - * if this operation hits a node with a local relevant shard, should it be preferred - * to be executed on, or just do plain round robin. Defaults to true - */ - @SuppressWarnings("unchecked") - public final T preferLocal(boolean preferLocal) { - this.preferLocal = preferLocal; - return (T) this; - } - - @SuppressWarnings("unchecked") - public T index(String index) { - this.index = index; - return (T)this; - } - - public String index() { - return index; - } - - @Override - public IndicesOptions indicesOptions() { - return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); - } - - @Override - public String[] indices() { - if (index == null) { - return Strings.EMPTY_ARRAY; - } - return new String[]{index}; - } - - /** - * if this operation hits a node with a local relevant shard, should it be preferred - * to be executed on, or just do plain round robin. Defaults to true - */ - public boolean preferLocalShard() { - return this.preferLocal; - } - - public void beforeLocalFork() { - - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - if (in.readBoolean()) { - internalShardId = ShardId.readShardId(in); - } - preferLocal = in.readBoolean(); - readIndex(in); - } - - protected void readIndex(StreamInput in) throws IOException { - index = in.readOptionalString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeOptionalStreamable(internalShardId); - out.writeBoolean(preferLocal); - writeIndex(out); - } - - protected void writeIndex(StreamOutput out) throws IOException { - out.writeOptionalString(index); - } -} - diff --git a/core/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequestBuilder.java deleted file mode 100644 index 46019916f46..00000000000 --- a/core/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequestBuilder.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.support.single.custom; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.client.IndicesAdminClient; - -/** - */ -public abstract class SingleCustomOperationRequestBuilder, Response extends ActionResponse, RequestBuilder extends SingleCustomOperationRequestBuilder> - extends ActionRequestBuilder { - - - protected SingleCustomOperationRequestBuilder(ElasticsearchClient client, Action action, Request request) { - super(client, action, request); - } - - /** - * Controls if the operation will be executed on a separate thread when executed locally. - */ - @SuppressWarnings("unchecked") - public final RequestBuilder setOperationThreaded(boolean threadedOperation) { - request.operationThreaded(threadedOperation); - return (RequestBuilder) this; - } - - /** - * if this operation hits a node with a local relevant shard, should it be preferred - * to be executed on, or just do plain round robin. Defaults to true - */ - @SuppressWarnings("unchecked") - public final RequestBuilder setPreferLocal(boolean preferLocal) { - request.preferLocal(preferLocal); - return (RequestBuilder) this; - } -} diff --git a/core/src/main/java/org/elasticsearch/action/support/single/custom/TransportSingleCustomOperationAction.java b/core/src/main/java/org/elasticsearch/action/support/single/custom/TransportSingleCustomOperationAction.java deleted file mode 100644 index 12b62885cbd..00000000000 --- a/core/src/main/java/org/elasticsearch/action/support/single/custom/TransportSingleCustomOperationAction.java +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.support.single.custom; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.NoShardAvailableActionException; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardsIterator; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; - -/** - * Transport action used to send a read request to one of the shards that belong to an index. - * Supports retrying another shard in case of failure. - */ -public abstract class TransportSingleCustomOperationAction extends HandledTransportAction { - - protected final ClusterService clusterService; - protected final TransportService transportService; - - final String transportShardAction; - final String executor; - - protected TransportSingleCustomOperationAction(Settings settings, String actionName, ThreadPool threadPool, - ClusterService clusterService, TransportService transportService, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Class request, String executor) { - super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); - this.clusterService = clusterService; - this.transportService = transportService; - - this.transportShardAction = actionName + "[s]"; - this.executor = executor; - - transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler()); - } - - @Override - protected void doExecute(Request request, ActionListener listener) { - new AsyncSingleAction(request, listener).start(); - } - - /** - * Can return null to execute on this local node. - */ - protected abstract ShardsIterator shards(ClusterState state, InternalRequest request); - - /** - * Operation to be executed at the shard level. Can be called with shardId set to null, meaning that there is no - * shard involved and the operation just needs to be executed on the local node. - */ - protected abstract Response shardOperation(Request request, ShardId shardId); - - protected abstract Response newResponse(); - - protected ClusterBlockException checkGlobalBlock(ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.READ); - } - - protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) { - return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.concreteIndex()); - } - - protected abstract boolean resolveIndex(Request request); - - private class AsyncSingleAction { - - private final ActionListener listener; - - private final ShardsIterator shardsIt; - - private final InternalRequest internalRequest; - - private final DiscoveryNodes nodes; - - private AsyncSingleAction(Request request, ActionListener listener) { - this.listener = listener; - - ClusterState clusterState = clusterService.state(); - nodes = clusterState.nodes(); - ClusterBlockException blockException = checkGlobalBlock(clusterState); - if (blockException != null) { - throw blockException; - } - - String concreteSingleIndex; - if (resolveIndex(request)) { - concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request); - } else { - concreteSingleIndex = request.index(); - } - this.internalRequest = new InternalRequest(request, concreteSingleIndex); - - blockException = checkRequestBlock(clusterState, internalRequest); - if (blockException != null) { - throw blockException; - } - this.shardsIt = shards(clusterState, internalRequest); - } - - public void start() { - performFirst(); - } - - private void onFailure(ShardRouting shardRouting, Throwable e) { - if (logger.isTraceEnabled() && e != null) { - logger.trace(shardRouting.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); - } - perform(e); - } - - /** - * First get should try and use a shard that exists on a local node for better performance - */ - private void performFirst() { - if (shardsIt == null) { - // just execute it on the local node - if (internalRequest.request().operationThreaded()) { - internalRequest.request().beforeLocalFork(); - threadPool.executor(executor).execute(new Runnable() { - @Override - public void run() { - try { - Response response = shardOperation(internalRequest.request(), null); - listener.onResponse(response); - } catch (Throwable e) { - onFailure(null, e); - } - } - }); - return; - } else { - try { - final Response response = shardOperation(internalRequest.request(), null); - listener.onResponse(response); - return; - } catch (Throwable e) { - onFailure(null, e); - } - } - return; - } - - if (internalRequest.request().preferLocalShard()) { - boolean foundLocal = false; - ShardRouting shardX; - while ((shardX = shardsIt.nextOrNull()) != null) { - final ShardRouting shard = shardX; - if (shard.currentNodeId().equals(nodes.localNodeId())) { - foundLocal = true; - if (internalRequest.request().operationThreaded()) { - internalRequest.request().beforeLocalFork(); - threadPool.executor(executor).execute(new Runnable() { - @Override - public void run() { - try { - Response response = shardOperation(internalRequest.request(), shard.shardId()); - listener.onResponse(response); - } catch (Throwable e) { - shardsIt.reset(); - onFailure(shard, e); - } - } - }); - return; - } else { - try { - final Response response = shardOperation(internalRequest.request(), shard.shardId()); - listener.onResponse(response); - return; - } catch (Throwable e) { - shardsIt.reset(); - onFailure(shard, e); - } - } - } - } - if (!foundLocal) { - // no local node get, go remote - shardsIt.reset(); - perform(null); - } - } else { - perform(null); - } - } - - private void perform(final Throwable lastException) { - final ShardRouting shard = shardsIt == null ? null : shardsIt.nextOrNull(); - if (shard == null) { - Throwable failure = lastException; - if (failure == null) { - failure = new NoShardAvailableActionException(null, "No shard available for [" + internalRequest.request() + "]"); - } else { - if (logger.isDebugEnabled()) { - logger.debug("failed to execute [" + internalRequest.request() + "]", failure); - } - } - listener.onFailure(failure); - } else { - if (shard.currentNodeId().equals(nodes.localNodeId())) { - // we don't prefer local shard, so try and do it here - if (!internalRequest.request().preferLocalShard()) { - try { - if (internalRequest.request().operationThreaded()) { - internalRequest.request().beforeLocalFork(); - threadPool.executor(executor).execute(new Runnable() { - @Override - public void run() { - try { - Response response = shardOperation(internalRequest.request(), shard.shardId()); - listener.onResponse(response); - } catch (Throwable e) { - onFailure(shard, e); - } - } - }); - } else { - final Response response = shardOperation(internalRequest.request(), shard.shardId()); - listener.onResponse(response); - } - } catch (Throwable e) { - onFailure(shard, e); - } - } else { - perform(lastException); - } - } else { - DiscoveryNode node = nodes.get(shard.currentNodeId()); - internalRequest.request().internalShardId = shard.shardId(); - transportService.sendRequest(node, transportShardAction, internalRequest.request(), new BaseTransportResponseHandler() { - @Override - public Response newInstance() { - return newResponse(); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public void handleResponse(final Response response) { - listener.onResponse(response); - } - - @Override - public void handleException(TransportException exp) { - onFailure(shard, exp); - } - }); - } - } - } - } - - private class ShardTransportHandler implements TransportRequestHandler { - - @Override - public void messageReceived(final Request request, final TransportChannel channel) throws Exception { - Response response = shardOperation(request, request.internalShardId); - channel.sendResponse(response); - } - } - - /** - * Internal request class that gets built on each node. Holds the original request plus additional info. - */ - protected class InternalRequest { - final Request request; - final String concreteIndex; - - InternalRequest(Request request, String concreteIndex) { - this.request = request; - this.concreteIndex = concreteIndex; - } - - public Request request() { - return request; - } - - public String concreteIndex() { - return concreteIndex; - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/support/single/shard/SingleShardRequest.java b/core/src/main/java/org/elasticsearch/action/support/single/shard/SingleShardRequest.java index 180ea877618..edc54735f80 100644 --- a/core/src/main/java/org/elasticsearch/action/support/single/shard/SingleShardRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/single/shard/SingleShardRequest.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -35,10 +36,17 @@ import java.io.IOException; */ public abstract class SingleShardRequest extends ActionRequest implements IndicesRequest { - ShardId internalShardId; - - protected String index; public static final IndicesOptions INDICES_OPTIONS = IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + + /** + * The concrete index name + * + * Whether index property is optional depends on the concrete implementation. If index property is required the + * concrete implementation should use {@link #validateNonNullIndex()} to check if the index property has been set + */ + @Nullable + protected String index; + ShardId internalShardId; private boolean threadedOperation = true; protected SingleShardRequest() { @@ -57,8 +65,10 @@ public abstract class SingleShardRequest extends A this.index = index; } - @Override - public ActionRequestValidationException validate() { + /** + * @return a validation exception if the index property hasn't been set + */ + protected ActionRequestValidationException validateNonNullIndex() { ActionRequestValidationException validationException = null; if (index == null) { validationException = ValidateActions.addValidationError("index is missing", validationException); @@ -66,6 +76,13 @@ public abstract class SingleShardRequest extends A return validationException; } + /** + * @return The concrete index this request is targeted for or null if index is optional. + * Whether index property is optional depends on the concrete implementation. If index property + * is required the concrete implementation should use {@link #validateNonNullIndex()} to check + * if the index property has been set + */ + @Nullable public String index() { return index; } @@ -111,7 +128,7 @@ public abstract class SingleShardRequest extends A if (in.readBoolean()) { internalShardId = ShardId.readShardId(in); } - index = in.readString(); + index = in.readOptionalString(); // no need to pass threading over the network, they are always false when coming throw a thread pool } @@ -119,7 +136,7 @@ public abstract class SingleShardRequest extends A public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeOptionalStreamable(internalShardId); - out.writeString(index); + out.writeOptionalString(index); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index ad174613e0e..8af13cb50ff 100644 --- a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -32,9 +32,10 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; @@ -43,7 +44,9 @@ import org.elasticsearch.transport.*; import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; /** - * A base class for single shard read operations. + * A base class for operations that need to perform a read operation on a single shard copy. If the operation fails, + * the read operation can be performed on other shard copies. Concrete implementations can provide their own list + * of candidate shards to try the read operation on. */ public abstract class TransportSingleShardAction extends TransportAction { @@ -88,7 +91,7 @@ public abstract class TransportSingleShardActionnull the execute + * the operation locally (the node that received the request) + */ + @Nullable + protected abstract ShardsIterator shards(ClusterState state, InternalRequest request); class AsyncSingleAction { private final ActionListener listener; - private final ShardIterator shardIt; + private final ShardsIterator shardIt; private final InternalRequest internalRequest; private final DiscoveryNodes nodes; private volatile Throwable lastFailure; @@ -126,7 +134,7 @@ public abstract class TransportSingleShardAction() { + @Override + public Response newInstance() { + return newResponse(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(final Response response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + perform(exp); + } + }); + } else { + perform(null); + } } private void onFailure(ShardRouting shardRouting, Throwable e) { @@ -163,10 +196,10 @@ public abstract class TransportSingleShardAction() { diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/MultiTermVectorsShardRequest.java b/core/src/main/java/org/elasticsearch/action/termvectors/MultiTermVectorsShardRequest.java index c33e32eabb4..ee02e69b557 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/MultiTermVectorsShardRequest.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/MultiTermVectorsShardRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.termvectors; import com.carrotsearch.hppc.IntArrayList; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.single.shard.SingleShardRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -47,6 +48,11 @@ public class MultiTermVectorsShardRequest extends SingleShardRequest(); } + @Override + public ActionRequestValidationException validate() { + return super.validateNonNullIndex(); + } + public int shardId() { return this.shardId; } diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java index 645e8707448..e8d5921b382 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java @@ -472,7 +472,7 @@ public class TermVectorsRequest extends SingleShardRequest i @Override public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = super.validate(); + ActionRequestValidationException validationException = super.validateNonNullIndex(); if (type == null) { validationException = ValidateActions.addValidationError("type is missing", validationException); } diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java b/core/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java index 2633b72a38f..1aeacc5f7df 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java @@ -63,7 +63,7 @@ public class TransportShardMultiTermsVectorAction extends TransportSingleShardAc } @Override - protected boolean resolveIndex() { + protected boolean resolveIndex(MultiTermVectorsShardRequest request) { return false; } diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index 0a909618c00..64ff6e06adc 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -65,7 +65,7 @@ public class TransportTermVectorsAction extends TransportSingleShardAction Date: Thu, 23 Jul 2015 23:58:42 +0900 Subject: [PATCH 03/12] Interval time is different from the doc value. I think the value is 30. --- .../java/org/elasticsearch/watcher/ResourceWatcherService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java b/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java index 4719bd997d7..5ff6525a428 100644 --- a/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java +++ b/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java @@ -50,7 +50,7 @@ public class ResourceWatcherService extends AbstractLifecycleComponent Date: Wed, 22 Jul 2015 09:43:12 -0600 Subject: [PATCH 04/12] Add shadow indicator when using shadow replicas --- .../cluster/metadata/MetaDataCreateIndexService.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 1b97802a449..c3773cec62c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -420,7 +420,10 @@ public class MetaDataCreateIndexService extends AbstractComponent { .put(indexMetaData, false) .build(); - logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}", request.index(), request.cause(), templateNames, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet()); + String maybeShadowIndicator = IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings()) ? "s" : ""; + logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}{}], mappings {}", + request.index(), request.cause(), templateNames, indexMetaData.numberOfShards(), + indexMetaData.numberOfReplicas(), maybeShadowIndicator, mappings.keySet()); ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); if (!request.blocks().isEmpty()) { From 878301c795e6ad2c47c620c893b3324a564e0d91 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 23 Jul 2015 11:25:58 -0400 Subject: [PATCH 05/12] Dump stack traces of all threads on test failure due to AssertionError Closes #12425 --- .../test/ElasticsearchTestCase.java | 43 ++----------- .../org/elasticsearch/test/StackTraces.java | 63 +++++++++++++++++++ .../AssertionErrorThreadDumpPrinter.java | 37 +++++++++++ 3 files changed, 104 insertions(+), 39 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/test/StackTraces.java create mode 100644 core/src/test/java/org/elasticsearch/test/junit/listeners/AssertionErrorThreadDumpPrinter.java diff --git a/core/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java b/core/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java index 73a107dca29..a25ca6f26cb 100644 --- a/core/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java @@ -54,6 +54,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.test.cache.recycler.MockBigArrays; import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler; +import org.elasticsearch.test.junit.listeners.AssertionErrorThreadDumpPrinter; import org.elasticsearch.test.junit.listeners.LoggingListener; import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; import org.elasticsearch.test.search.MockSearchService; @@ -78,7 +79,8 @@ import static com.google.common.collect.Lists.newArrayList; */ @Listeners({ ReproduceInfoPrinter.class, - LoggingListener.class + LoggingListener.class, + AssertionErrorThreadDumpPrinter.class }) // remove this entire annotation on upgrade to 5.3! @ThreadLeakFilters(defaultFilters = true, filters = { @@ -550,44 +552,7 @@ public abstract class ElasticsearchTestCase extends LuceneTestCase { protected static final void printStackDump(ESLogger logger) { // print stack traces if we can't create any native thread anymore Map allStackTraces = Thread.getAllStackTraces(); - logger.error(formatThreadStacks(allStackTraces)); - } - - /** Dump threads and their current stack trace. */ - private static String formatThreadStacks(Map threads) { - StringBuilder message = new StringBuilder(); - int cnt = 1; - final Formatter f = new Formatter(message, Locale.ENGLISH); - for (Map.Entry e : threads.entrySet()) { - if (e.getKey().isAlive()) { - f.format(Locale.ENGLISH, "\n %2d) %s", cnt++, threadName(e.getKey())).flush(); - } - if (e.getValue().length == 0) { - message.append("\n at (empty stack)"); - } else { - for (StackTraceElement ste : e.getValue()) { - message.append("\n at ").append(ste); - } - } - } - return message.toString(); - } - - private static String threadName(Thread t) { - return "Thread[" + - "id=" + t.getId() + - ", name=" + t.getName() + - ", state=" + t.getState() + - ", group=" + groupName(t.getThreadGroup()) + - "]"; - } - - private static String groupName(ThreadGroup threadGroup) { - if (threadGroup == null) { - return "{null group}"; - } else { - return threadGroup.getName(); - } + logger.error(StackTraces.formatThreadStacks(allStackTraces)); } /** diff --git a/core/src/test/java/org/elasticsearch/test/StackTraces.java b/core/src/test/java/org/elasticsearch/test/StackTraces.java new file mode 100644 index 00000000000..85cd76d815e --- /dev/null +++ b/core/src/test/java/org/elasticsearch/test/StackTraces.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test; + +import java.util.Formatter; +import java.util.Locale; +import java.util.Map; + +public class StackTraces { + /** Dump threads and their current stack trace. */ + public static String formatThreadStacks(Map threads) { + StringBuilder message = new StringBuilder(); + int cnt = 1; + final Formatter f = new Formatter(message, Locale.ENGLISH); + for (Map.Entry e : threads.entrySet()) { + if (e.getKey().isAlive()) { + f.format(Locale.ENGLISH, "\n %2d) %s", cnt++, threadName(e.getKey())).flush(); + } + if (e.getValue().length == 0) { + message.append("\n at (empty stack)"); + } else { + for (StackTraceElement ste : e.getValue()) { + message.append("\n at ").append(ste); + } + } + } + return message.toString(); + } + + private static String groupName(ThreadGroup threadGroup) { + if (threadGroup == null) { + return "{null group}"; + } else { + return threadGroup.getName(); + } + } + + private static String threadName(Thread t) { + return "Thread[" + + "id=" + t.getId() + + ", name=" + t.getName() + + ", state=" + t.getState() + + ", group=" + groupName(t.getThreadGroup()) + + "]"; + } +} diff --git a/core/src/test/java/org/elasticsearch/test/junit/listeners/AssertionErrorThreadDumpPrinter.java b/core/src/test/java/org/elasticsearch/test/junit/listeners/AssertionErrorThreadDumpPrinter.java new file mode 100644 index 00000000000..4dc625ace43 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/test/junit/listeners/AssertionErrorThreadDumpPrinter.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.junit.listeners; + +import org.elasticsearch.test.StackTraces; +import org.junit.runner.notification.Failure; +import org.junit.runner.notification.RunListener; + +import java.util.Map; + +public class AssertionErrorThreadDumpPrinter extends RunListener { + @Override + public void testFailure(Failure failure) throws Exception { + if (failure.getException() instanceof AssertionError) { + Map allStackTraces = Thread.getAllStackTraces(); + String threadStacks = StackTraces.formatThreadStacks(allStackTraces); + System.err.println(threadStacks); + } + } +} From 8472775477396a5e95943945e1c6283e8c4a7fa8 Mon Sep 17 00:00:00 2001 From: jaymode Date: Thu, 23 Jul 2015 08:43:25 -0400 Subject: [PATCH 06/12] copy the classloader from the original settings when checking for prompts Today, when a user provides settings and specifies a classloader to be used, the classloader gets dropped when we copy the settings to check for prompt entries. This change copies the classloader when replacing the prompt placeholders and adds a test to ensure the InternalSettingsPreparer always retains the classloader. Closes #12340 --- .../node/internal/InternalSettingsPreparer.java | 2 +- .../internal/InternalSettingsPreparerTests.java | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/node/internal/InternalSettingsPreparer.java b/core/src/main/java/org/elasticsearch/node/internal/InternalSettingsPreparer.java index 7a824cd1ed0..09de3a118aa 100644 --- a/core/src/main/java/org/elasticsearch/node/internal/InternalSettingsPreparer.java +++ b/core/src/main/java/org/elasticsearch/node/internal/InternalSettingsPreparer.java @@ -180,7 +180,7 @@ public class InternalSettingsPreparer { static Settings replacePromptPlaceholders(Settings settings, Terminal terminal) { UnmodifiableIterator> iter = settings.getAsMap().entrySet().iterator(); - Settings.Builder builder = Settings.builder(); + Settings.Builder builder = Settings.builder().classLoader(settings.getClassLoaderIfSet()); while (iter.hasNext()) { Map.Entry entry = iter.next(); diff --git a/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java b/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java index 2574b62ffb7..2830435055e 100644 --- a/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java +++ b/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java @@ -29,6 +29,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.net.URL; +import java.net.URLClassLoader; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -220,4 +222,19 @@ public class InternalSettingsPreparerTests extends ElasticsearchTestCase { assertThat(settings.get("name"), is("prompted name 0")); assertThat(settings.get("node.name"), is("prompted name 0")); } + + @Test + public void testPreserveSettingsClassloader() { + final ClassLoader classLoader = URLClassLoader.newInstance(new URL[0]); + Settings settings = settingsBuilder() + .put("foo", "bar") + .put("path.home", createTempDir()) + .classLoader(classLoader) + .build(); + + Tuple tuple = InternalSettingsPreparer.prepareSettings(settings, randomBoolean()); + + Settings preparedSettings = tuple.v1(); + assertThat(preparedSettings.getClassLoaderIfSet(), is(classLoader)); + } } From fc90c2affdcf7d8303444bf06ad6c70a9a761c00 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 23 Jul 2015 16:13:06 +0200 Subject: [PATCH 07/12] Cancel replica recovery on another sync option copy found When a replica is initializing from the primary, and we find a better node that has full sync id match, it is better to cancel the existing replica allocation and allocate it to the new node with sync id match (eventually) --- .../cluster/routing/UnassignedInfo.java | 6 +- .../gateway/GatewayAllocator.java | 1 + .../gateway/ReplicaShardAllocator.java | 65 ++++++++++++++++++- .../cluster/routing/UnassignedInfoTests.java | 3 +- .../gateway/ReplicaShardAllocatorTests.java | 52 +++++++++++++++ 5 files changed, 122 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index c546e250bed..a09b0349365 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -95,7 +95,11 @@ public class UnassignedInfo implements ToXContent, Writeable { /** * When a shard moves from started back to initializing, for example, during shadow replica */ - REINITIALIZED; + REINITIALIZED, + /** + * A better replica location is identified and causes the existing replica allocation to be cancelled. + */ + REALLOCATED_REPLICA; } private final Reason reason; diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 4357a81961b..7b6da4dae27 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -126,6 +126,7 @@ public class GatewayAllocator extends AbstractComponent { }); // sort for priority ordering changed |= primaryShardAllocator.allocateUnassigned(allocation); + changed |= replicaShardAllocator.processExistingRecoveries(allocation); changed |= replicaShardAllocator.allocateUnassigned(allocation); return changed; } diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 2d0d38d0bd2..4bf6be893e3 100644 --- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -25,11 +25,11 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectLongCursor; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Nullable; @@ -40,7 +40,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; -import java.util.Iterator; import java.util.Map; /** @@ -51,6 +50,62 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { super(settings); } + /** + * Process existing recoveries of replicas and see if we need to cancel them if we find a better + * match. Today, a better match is one that has full sync id match compared to not having one in + * the previous recovery. + */ + public boolean processExistingRecoveries(RoutingAllocation allocation) { + boolean changed = false; + for (RoutingNodes.RoutingNodesIterator nodes = allocation.routingNodes().nodes(); nodes.hasNext(); ) { + nodes.next(); + for (RoutingNodes.RoutingNodeIterator it = nodes.nodeShards(); it.hasNext(); ) { + ShardRouting shard = it.next(); + if (shard.primary() == true) { + continue; + } + if (shard.initializing() == false) { + continue; + } + if (shard.relocatingNodeId() != null) { + continue; + } + + AsyncShardFetch.FetchResult shardStores = fetchData(shard, allocation); + if (shardStores.hasData() == false) { + logger.trace("{}: fetching new stores for initializing shard", shard); + continue; // still fetching + } + + ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard); + assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; + TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores); + if (primaryStore == null || primaryStore.allocated() == false) { + // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) + // just let the recovery find it out, no need to do anything about it for the initializing shard + logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard); + continue; + } + + MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores); + if (matchingNodes.getNodeWithHighestMatch() != null) { + DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); + DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); + if (currentNode.equals(nodeWithHighestMatch) == false + && matchingNodes.isNodeMatchBySyncID(currentNode) == false + && matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch) == true) { + // we found a better match that has a full sync id match, the existing allocation is not fully synced + // so we found a better one, cancel this one + it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, + "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]")); + changed = true; + } + } + } + } + return changed; + } + public boolean allocateUnassigned(RoutingAllocation allocation) { boolean changed = false; final RoutingNodes routingNodes = allocation.routingNodes(); @@ -236,7 +291,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { highestMatchNode = cursor.key; } } - nodeWithHighestMatch = highestMatchNode; + this.nodeWithHighestMatch = highestMatchNode; } /** @@ -248,6 +303,10 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { return this.nodeWithHighestMatch; } + public boolean isNodeMatchBySyncID(DiscoveryNode node) { + return nodesToSize.get(node) == Long.MAX_VALUE; + } + /** * Did we manage to find any data, regardless how well they matched or not. */ diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index f4617a9ee52..29e46ff1b36 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -60,7 +60,8 @@ public class UnassignedInfoTests extends ElasticsearchAllocationTestCase { UnassignedInfo.Reason.ALLOCATION_FAILED, UnassignedInfo.Reason.NODE_LEFT, UnassignedInfo.Reason.REROUTE_CANCELLED, - UnassignedInfo.Reason.REINITIALIZED}; + UnassignedInfo.Reason.REINITIALIZED, + UnassignedInfo.Reason.REALLOCATED_REPLICA}; for (int i = 0; i < order.length; i++) { assertThat(order[i].ordinal(), equalTo(i)); } diff --git a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index df78028750b..97c7ecdcfd9 100644 --- a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -219,6 +219,39 @@ public class ReplicaShardAllocatorTests extends ElasticsearchAllocationTestCase assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id())); } + @Test + public void testCancelRecoveryBetterSyncId() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) + .addData(node2, false, "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) + .addData(node3, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); + boolean changed = testAllocator.processExistingRecoveries(allocation); + assertThat(changed, equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId)); + } + + @Test + public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) + .addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) + .addData(node3, false, randomBoolean() ? "MATCH" : "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); + boolean changed = testAllocator.processExistingRecoveries(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); + } + + @Test + public void testNotCancellingRecovery() { + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + testAllocator.addData(node1, true, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")) + .addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); + boolean changed = testAllocator.processExistingRecoveries(allocation); + assertThat(changed, equalTo(false)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); + } + private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) { return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.INDEX_CREATED); } @@ -242,6 +275,25 @@ public class ReplicaShardAllocatorTests extends ElasticsearchAllocationTestCase return new RoutingAllocation(deciders, state.routingNodes(), state.nodes(), null); } + private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .add(IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId) + .addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10)) + .addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node2.id(), false, ShardRoutingState.INITIALIZING, 10)) + .build()) + ) + .build(); + ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); + return new RoutingAllocation(deciders, state.routingNodes(), state.nodes(), null); + } + class TestAllocator extends ReplicaShardAllocator { private Map data = null; From 316d1cb6c0336aa4db91168bf92b5964f600b634 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 22 Jul 2015 14:48:02 +0200 Subject: [PATCH 08/12] Core: Adapt IndicesClusterStateService to use allocation ids Also, as set of utility methods was introduced on ShardRouting to do various types of matching with other shard routings, giving control about what exactly should be matched (same shard id, same allocation id, all but version and shard info etc.). This is useful here, but also prepares the grounds for the change needed in #12387 (making ShardRouting.equals be strict and perform exact equality). Closes #12397 --- .../cluster/routing/AllocationId.java | 17 +- .../cluster/routing/RoutingNode.java | 2 +- .../cluster/routing/ShardRouting.java | 132 +++++++++++--- .../routing/allocation/AllocationService.java | 16 +- .../command/CancelAllocationCommand.java | 4 +- .../elasticsearch/index/shard/IndexShard.java | 15 +- .../cluster/IndicesClusterStateService.java | 19 +- .../cluster/routing/ShardRoutingTests.java | 172 ++++++++++++++++++ .../cluster/routing/TestShardRouting.java | 23 ++- .../index/shard/IndexShardTests.java | 2 +- 10 files changed, 333 insertions(+), 69 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java index ffcf9f1e80c..695b8e62252 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/AllocationId.java @@ -83,10 +83,10 @@ public class AllocationId implements ToXContent { /** * Creates a new allocation id representing a cancelled relocation. - * + *

* Note that this is expected to be called on the allocation id * of the *source* shard - * */ + */ public static AllocationId cancelRelocation(AllocationId allocationId) { assert allocationId.getRelocationId() != null; return new AllocationId(allocationId.getId(), null); @@ -94,7 +94,7 @@ public class AllocationId implements ToXContent { /** * Creates a new allocation id finalizing a relocation. - * + *

* Note that this is expected to be called on the allocation id * of the *target* shard and thus it only needs to clear the relocating id. */ @@ -120,9 +120,16 @@ public class AllocationId implements ToXContent { @Override public boolean equals(Object o) { - if (this == o) return true; + if (this == o) { + return true; + } + if (o == null) { + return false; + } AllocationId that = (AllocationId) o; - if (!id.equals(that.id)) return false; + if (!id.equals(that.id)) { + return false; + } return !(relocationId != null ? !relocationId.equals(that.relocationId) : that.relocationId != null); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 77b8f1b588a..b96eece2398 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -88,7 +88,7 @@ public class RoutingNode implements Iterable { void add(ShardRouting shard) { // TODO use Set with ShardIds for faster lookup. for (ShardRouting shardRouting : shards) { - if (shardRouting.shardId().equals(shard.shardId())) { + if (shardRouting.isSameShard(shard)) { throw new IllegalStateException("Trying to add a shard [" + shard.shardId().index().name() + "][" + shard.shardId().id() + "] to a node [" + nodeId + "] where it already exists"); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 16a42571fb4..e1b499fc606 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -420,7 +420,7 @@ public final class ShardRouting implements Streamable, ToXContent { void relocate(String relocatingNodeId) { ensureNotFrozen(); version++; - assert state == ShardRoutingState.STARTED : this; + assert state == ShardRoutingState.STARTED : "current shard has to be started in order to be relocated " + this; state = ShardRoutingState.RELOCATING; this.relocatingNodeId = relocatingNodeId; this.allocationId = AllocationId.newRelocation(allocationId); @@ -467,7 +467,7 @@ public final class ShardRouting implements Streamable, ToXContent { restoreSource = null; unassignedInfo = null; // we keep the unassigned data until the shard is started if (allocationId.getRelocationId() != null) { - // target relocation + // relocation target allocationId = AllocationId.finishRelocation(allocationId); } state = ShardRoutingState.STARTED; @@ -498,6 +498,106 @@ public final class ShardRouting implements Streamable, ToXContent { primary = false; } + /** returns true if this routing has the same shardId as another */ + public boolean isSameShard(ShardRouting other) { + return index.equals(other.index) && shardId == other.shardId; + } + + /** + * returns true if this routing has the same allocation ID as another. + *

+ * Note: if both shard routing has a null as their {@link #allocationId()}, this method returns false as the routing describe + * no allocation at all.. + **/ + public boolean isSameAllocation(ShardRouting other) { + boolean b = this.allocationId != null && other.allocationId != null && this.allocationId.getId().equals(other.allocationId.getId()); + assert b == false || this.currentNodeId.equals(other.currentNodeId) : "ShardRoutings have the same allocation id but not the same node. This [" + this + "], other [" + other + "]"; + return b; + } + + /** returns true if the routing is the relocation target of the given routing */ + public boolean isRelocationTargetOf(ShardRouting other) { + boolean b = this.allocationId != null && other.allocationId != null && this.state == ShardRoutingState.INITIALIZING && + this.allocationId.getId().equals(other.allocationId.getRelocationId()); + + assert b == false || other.state == ShardRoutingState.RELOCATING : + "ShardRouting is a relocation target but the source shard state isn't relocating. This [" + this + "], other [" + other + "]"; + + + assert b == false || other.allocationId.getId().equals(this.allocationId.getRelocationId()) : + "ShardRouting is a relocation target but the source id isn't equal to source's allocationId.getRelocationId. This [" + this + "], other [" + other + "]"; + + assert b == false || other.currentNodeId().equals(this.relocatingNodeId) : + "ShardRouting is a relocation target but source current node id isn't equal to target relocating node. This [" + this + "], other [" + other + "]"; + + assert b == false || this.currentNodeId().equals(other.relocatingNodeId) : + "ShardRouting is a relocation target but current node id isn't equal to source relocating node. This [" + this + "], other [" + other + "]"; + + assert b == false || isSameShard(other) : + "ShardRouting is a relocation target but both routings are not of the same shard. This [" + this + "], other [" + other + "]"; + + assert b == false || this.primary == other.primary : + "ShardRouting is a relocation target but primary flag is different. This [" + this + "], target [" + other + "]"; + + return b; + } + + /** returns true if the routing is the relocation source for the given routing */ + public boolean isRelocationSourceOf(ShardRouting other) { + boolean b = this.allocationId != null && other.allocationId != null && other.state == ShardRoutingState.INITIALIZING && + other.allocationId.getId().equals(this.allocationId.getRelocationId()); + + assert b == false || this.state == ShardRoutingState.RELOCATING : + "ShardRouting is a relocation source but shard state isn't relocating. This [" + this + "], other [" + other + "]"; + + + assert b == false || this.allocationId.getId().equals(other.allocationId.getRelocationId()) : + "ShardRouting is a relocation source but the allocation id isn't equal to other.allocationId.getRelocationId. This [" + this + "], other [" + other + "]"; + + assert b == false || this.currentNodeId().equals(other.relocatingNodeId) : + "ShardRouting is a relocation source but current node isn't equal to other's relocating node. This [" + this + "], other [" + other + "]"; + + assert b == false || other.currentNodeId().equals(this.relocatingNodeId) : + "ShardRouting is a relocation source but relocating node isn't equal to other's current node. This [" + this + "], other [" + other + "]"; + + assert b == false || isSameShard(other) : + "ShardRouting is a relocation source but both routings are not of the same shard. This [" + this + "], target [" + other + "]"; + + assert b == false || this.primary == other.primary : + "ShardRouting is a relocation source but primary flag is different. This [" + this + "], target [" + other + "]"; + + return b; + } + + /** returns true if the current routing is identical to the other routing in all but meta fields, i.e., version and unassigned info */ + public boolean equalsIgnoringMetaData(ShardRouting other) { + if (primary != other.primary) { + return false; + } + if (shardId != other.shardId) { + return false; + } + if (currentNodeId != null ? !currentNodeId.equals(other.currentNodeId) : other.currentNodeId != null) { + return false; + } + if (index != null ? !index.equals(other.index) : other.index != null) { + return false; + } + if (relocatingNodeId != null ? !relocatingNodeId.equals(other.relocatingNodeId) : other.relocatingNodeId != null) { + return false; + } + if (allocationId != null ? !allocationId.equals(other.allocationId) : other.allocationId != null) { + return false; + } + if (state != other.state) { + return false; + } + if (restoreSource != null ? !restoreSource.equals(other.restoreSource) : other.restoreSource != null) { + return false; + } + return true; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -508,32 +608,8 @@ public final class ShardRouting implements Streamable, ToXContent { return false; } ShardRouting that = (ShardRouting) o; - - if (primary != that.primary) { - return false; - } - if (shardId != that.shardId) { - return false; - } - if (currentNodeId != null ? !currentNodeId.equals(that.currentNodeId) : that.currentNodeId != null) { - return false; - } - if (index != null ? !index.equals(that.index) : that.index != null) { - return false; - } - if (relocatingNodeId != null ? !relocatingNodeId.equals(that.relocatingNodeId) : that.relocatingNodeId != null) { - return false; - } - if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) { - return false; - } - if (state != that.state) { - return false; - } - if (restoreSource != null ? !restoreSource.equals(that.restoreSource) : that.restoreSource != null) { - return false; - } - return true; + // TODO: add version + unassigned info check. see #12387 + return equalsIgnoringMetaData(that); } private long hashVersion = version - 1; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 4374b3cfe40..a2408a67355 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -338,7 +338,7 @@ public class AllocationService extends AbstractComponent { } for (ShardRouting shard : currentRoutingNode) { - if (shard.allocationId().getId().equals(startedShard.allocationId().getId())) { + if (shard.isSameAllocation(startedShard)) { if (shard.active()) { logger.trace("{} shard is already started, ignoring (routing: {})", startedShard.shardId(), startedShard); } else { @@ -363,8 +363,7 @@ public class AllocationService extends AbstractComponent { if (sourceRoutingNode != null) { while (sourceRoutingNode.hasNext()) { ShardRouting shard = sourceRoutingNode.next(); - if (shard.allocationId().getId().equals(startedShard.allocationId().getRelocationId())) { - assert shard.relocating() : "source shard for relocation is not marked as relocating. source " + shard + ", started target " + startedShard; + if (shard.isRelocationSourceOf(startedShard)) { dirty = true; sourceRoutingNode.remove(); break; @@ -397,7 +396,7 @@ public class AllocationService extends AbstractComponent { boolean matchedShard = false; while (matchedNode.hasNext()) { ShardRouting routing = matchedNode.next(); - if (routing.allocationId().getId().equals(failedShard.allocationId().getId())) { + if (routing.isSameAllocation(failedShard)) { matchedShard = true; logger.debug("{} failed shard {} found in routingNodes, failing it ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); break; @@ -428,7 +427,7 @@ public class AllocationService extends AbstractComponent { RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId()); if (relocatingFromNode != null) { for (ShardRouting shardRouting : relocatingFromNode) { - if (shardRouting.allocationId().getId().equals(failedShard.allocationId().getRelocationId())) { + if (shardRouting.isRelocationSourceOf(failedShard)) { logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", failedShard.shardId(), shardRouting, unassignedInfo.shortSummary()); routingNodes.cancelRelocation(shardRouting); break; @@ -441,7 +440,7 @@ public class AllocationService extends AbstractComponent { // and the shard copy needs to be marked as unassigned if (failedShard.relocatingNodeId() != null) { - // handle relocation source shards. we need to find the target initializing shard that is recovering from, and remove it... + // handle relocation source shards. we need to find the target initializing shard that is recovering, and remove it... assert failedShard.initializing() == false; // should have been dealt with and returned assert failedShard.relocating(); @@ -449,10 +448,7 @@ public class AllocationService extends AbstractComponent { if (initializingNode != null) { while (initializingNode.hasNext()) { ShardRouting shardRouting = initializingNode.next(); - if (shardRouting.allocationId().getId().equals(failedShard.allocationId().getRelocationId())) { - assert shardRouting.initializing() : shardRouting; - assert failedShard.allocationId().getId().equals(shardRouting.allocationId().getRelocationId()) - : "found target shard's allocation relocation id is different than source"; + if (shardRouting.isRelocationTargetOf(failedShard)) { logger.trace("{} is removed due to the failure of the source shard", shardRouting); initializingNode.remove(); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java index dafe7723526..f4204fde268 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java @@ -177,7 +177,7 @@ public class CancelAllocationCommand implements AllocationCommand { RoutingNode relocatingFromNode = allocation.routingNodes().node(shardRouting.relocatingNodeId()); if (relocatingFromNode != null) { for (ShardRouting fromShardRouting : relocatingFromNode) { - if (fromShardRouting.shardId().equals(shardRouting.shardId()) && fromShardRouting.state() == RELOCATING) { + if (fromShardRouting.isSameShard(shardRouting) && fromShardRouting.state() == RELOCATING) { allocation.routingNodes().cancelRelocation(fromShardRouting); break; } @@ -201,7 +201,7 @@ public class CancelAllocationCommand implements AllocationCommand { if (initializingNode != null) { while (initializingNode.hasNext()) { ShardRouting initializingShardRouting = initializingNode.next(); - if (initializingShardRouting.shardId().equals(shardRouting.shardId()) && initializingShardRouting.initializing()) { + if (initializingShardRouting.isRelocationTargetOf(shardRouting)) { initializingNode.remove(); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 900603bd7a1..fa75975b981 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.shard; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; - import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.search.QueryCachingPolicy; @@ -337,14 +336,16 @@ public class IndexShard extends AbstractIndexShardComponent { if (!newRouting.shardId().equals(shardId())) { throw new IllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]"); } + if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) { + throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting); + } try { if (currentRouting != null) { - assert newRouting.version() > currentRouting.version() : "expected: " + newRouting.version() + " > " + currentRouting.version(); if (!newRouting.primary() && currentRouting.primary()) { logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode"); } - // if its the same routing, return - if (currentRouting.equals(newRouting)) { + // if its the same routing except for some metadata info, return + if (currentRouting.equalsIgnoringMetaData(newRouting)) { this.shardRouting = newRouting; // might have a new version return; } @@ -723,12 +724,12 @@ public class IndexShard extends AbstractIndexShardComponent { public org.apache.lucene.util.Version minimumCompatibleVersion() { org.apache.lucene.util.Version luceneVersion = null; - for(Segment segment : engine().segments(false)) { + for (Segment segment : engine().segments(false)) { if (luceneVersion == null || luceneVersion.onOrAfter(segment.getVersion())) { luceneVersion = segment.getVersion(); } } - return luceneVersion == null ? Version.indexCreated(indexSettings).luceneVersion : luceneVersion; + return luceneVersion == null ? Version.indexCreated(indexSettings).luceneVersion : luceneVersion; } public SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException { @@ -1113,7 +1114,7 @@ public class IndexShard extends AbstractIndexShardComponent { } final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount()); - if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) { + if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) { logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount); mergeSchedulerConfig.setMaxMergeCount(maxMergeCount); change = true; diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index b17c639ae06..9dd4aabcf0d 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -507,7 +507,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent indexShard.routingEntry().version())) { - if (shardRouting.primary() && indexShard.routingEntry().primary() == false && shardRouting.initializing() && indexShard.allowsPrimaryPromotion() == false) { - logger.debug("{} reinitialize shard on primary promotion", indexShard.shardId()); - indexService.removeShard(shardId, "promoted to primary"); - } else { - // if we happen to remove the shardRouting by id above we don't need to jump in here! - indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false); - } + + if (shardHasBeenRemoved == false) { + // shadow replicas do not support primary promotion. The master would reinitialize the shard, giving it a new allocation, meaning we should be there. + assert (shardRouting.primary() && currentRoutingEntry.primary() == false) == false || indexShard.allowsPrimaryPromotion() : + "shard for doesn't support primary promotion but master promoted it with changing allocation. New routing " + shardRouting + ", current routing " + currentRoutingEntry; + indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false); } } if (shardRouting.initializing()) { - applyInitializingShard(event.state(),indexMetaData, shardRouting); + applyInitializingShard(event.state(), indexMetaData, shardRouting); } } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java index dd43b28ea05..81a187592b7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.test.ElasticsearchTestCase; @@ -48,6 +49,177 @@ public class ShardRoutingTests extends ElasticsearchTestCase { } } + public void testIsSameAllocation() { + ShardRouting unassignedShard0 = TestShardRouting.newShardRouting("test", 0, null, false, ShardRoutingState.UNASSIGNED, 1); + ShardRouting unassignedShard1 = TestShardRouting.newShardRouting("test", 1, null, false, ShardRoutingState.UNASSIGNED, 1); + ShardRouting initializingShard0 = TestShardRouting.newShardRouting("test", 0, "1", randomBoolean(), ShardRoutingState.INITIALIZING, 1); + ShardRouting initializingShard1 = TestShardRouting.newShardRouting("test", 1, "1", randomBoolean(), ShardRoutingState.INITIALIZING, 1); + ShardRouting startedShard0 = new ShardRouting(initializingShard0); + startedShard0.moveToStarted(); + ShardRouting startedShard1 = new ShardRouting(initializingShard1); + startedShard1.moveToStarted(); + + // test identity + assertTrue(initializingShard0.isSameAllocation(initializingShard0)); + + // test same allocation different state + assertTrue(initializingShard0.isSameAllocation(startedShard0)); + + // test unassigned is false even to itself + assertFalse(unassignedShard0.isSameAllocation(unassignedShard0)); + + // test different shards/nodes/state + assertFalse(unassignedShard0.isSameAllocation(unassignedShard1)); + assertFalse(unassignedShard0.isSameAllocation(initializingShard0)); + assertFalse(unassignedShard0.isSameAllocation(initializingShard1)); + assertFalse(unassignedShard0.isSameAllocation(startedShard1)); + } + + public void testIsSameShard() { + ShardRouting index1Shard0a = randomShardRouting("index1", 0); + ShardRouting index1Shard0b = randomShardRouting("index1", 0); + ShardRouting index1Shard1 = randomShardRouting("index1", 1); + ShardRouting index2Shard0 = randomShardRouting("index2", 0); + ShardRouting index2Shard1 = randomShardRouting("index2", 1); + + assertTrue(index1Shard0a.isSameShard(index1Shard0a)); + assertTrue(index1Shard0a.isSameShard(index1Shard0b)); + assertFalse(index1Shard0a.isSameShard(index1Shard1)); + assertFalse(index1Shard0a.isSameShard(index2Shard0)); + assertFalse(index1Shard0a.isSameShard(index2Shard1)); + } + + private ShardRouting randomShardRouting(String index, int shard) { + ShardRoutingState state = randomFrom(ShardRoutingState.values()); + return TestShardRouting.newShardRouting(index, shard, state == ShardRoutingState.UNASSIGNED ? null : "1", state != ShardRoutingState.UNASSIGNED && randomBoolean(), state, randomInt(5)); + } + + public void testIsSourceTargetRelocation() { + ShardRouting unassignedShard0 = TestShardRouting.newShardRouting("test", 0, null, false, ShardRoutingState.UNASSIGNED, 1); + ShardRouting initializingShard0 = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1); + ShardRouting initializingShard1 = TestShardRouting.newShardRouting("test", 1, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1); + ShardRouting startedShard0 = new ShardRouting(initializingShard0); + startedShard0.moveToStarted(); + ShardRouting startedShard1 = new ShardRouting(initializingShard1); + startedShard1.moveToStarted(); + ShardRouting sourceShard0a = new ShardRouting(startedShard0); + sourceShard0a.relocate("node2"); + ShardRouting targetShard0a = sourceShard0a.buildTargetRelocatingShard(); + ShardRouting sourceShard0b = new ShardRouting(startedShard0); + sourceShard0b.relocate("node2"); + ShardRouting sourceShard1 = new ShardRouting(startedShard1); + sourceShard1.relocate("node2"); + + // test true scenarios + assertTrue(targetShard0a.isRelocationTargetOf(sourceShard0a)); + assertTrue(sourceShard0a.isRelocationSourceOf(targetShard0a)); + + // test two shards are not mixed + assertFalse(targetShard0a.isRelocationTargetOf(sourceShard1)); + assertFalse(sourceShard1.isRelocationSourceOf(targetShard0a)); + + // test two allocations are not mixed + assertFalse(targetShard0a.isRelocationTargetOf(sourceShard0b)); + assertFalse(sourceShard0b.isRelocationSourceOf(targetShard0a)); + + // test different shard states + assertFalse(targetShard0a.isRelocationTargetOf(unassignedShard0)); + assertFalse(sourceShard0a.isRelocationTargetOf(unassignedShard0)); + assertFalse(unassignedShard0.isRelocationSourceOf(targetShard0a)); + assertFalse(unassignedShard0.isRelocationSourceOf(sourceShard0a)); + + assertFalse(targetShard0a.isRelocationTargetOf(initializingShard0)); + assertFalse(sourceShard0a.isRelocationTargetOf(initializingShard0)); + assertFalse(initializingShard0.isRelocationSourceOf(targetShard0a)); + assertFalse(initializingShard0.isRelocationSourceOf(sourceShard0a)); + + assertFalse(targetShard0a.isRelocationTargetOf(startedShard0)); + assertFalse(sourceShard0a.isRelocationTargetOf(startedShard0)); + assertFalse(startedShard0.isRelocationSourceOf(targetShard0a)); + assertFalse(startedShard0.isRelocationSourceOf(sourceShard0a)); + } + + public void testEqualsIgnoringVersion() { + ShardRouting routing = randomShardRouting("test", 0); + + ShardRouting otherRouting = new ShardRouting(routing); + + assertTrue("expected equality\nthis " + routing + ",\nother " + otherRouting, routing.equalsIgnoringMetaData(otherRouting)); + otherRouting = new ShardRouting(routing, 1); + assertTrue("expected equality\nthis " + routing + ",\nother " + otherRouting, routing.equalsIgnoringMetaData(otherRouting)); + + + otherRouting = new ShardRouting(routing); + Integer[] changeIds = new Integer[]{0, 1, 2, 3, 4, 5, 6}; + for (int changeId : randomSubsetOf(randomIntBetween(1, changeIds.length), changeIds)) { + switch (changeId) { + case 0: + // change index + otherRouting = TestShardRouting.newShardRouting(otherRouting.index() + "a", otherRouting.id(), otherRouting.currentNodeId(), otherRouting.relocatingNodeId(), + otherRouting.restoreSource(), otherRouting.primary(), otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo()); + break; + case 1: + // change shard id + otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id() + 1, otherRouting.currentNodeId(), otherRouting.relocatingNodeId(), + otherRouting.restoreSource(), otherRouting.primary(), otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo()); + break; + case 2: + // change current node + otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId() == null ? "1" : otherRouting.currentNodeId() + "_1", otherRouting.relocatingNodeId(), + otherRouting.restoreSource(), otherRouting.primary(), otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo()); + break; + case 3: + // change relocating node + otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId(), + otherRouting.relocatingNodeId() == null ? "1" : otherRouting.relocatingNodeId() + "_1", + otherRouting.restoreSource(), otherRouting.primary(), otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo()); + break; + case 4: + // change restore source + otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId(), otherRouting.relocatingNodeId(), + otherRouting.restoreSource() == null ? new RestoreSource(new SnapshotId("test", "s1"), Version.CURRENT, "test") : + new RestoreSource(otherRouting.restoreSource().snapshotId(), Version.CURRENT, otherRouting.index() + "_1"), + otherRouting.primary(), otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo()); + break; + case 5: + // change primary flag + otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId(), otherRouting.relocatingNodeId(), + otherRouting.restoreSource(), otherRouting.primary() == false, otherRouting.state(), otherRouting.version(), otherRouting.unassignedInfo()); + break; + case 6: + // change state + ShardRoutingState newState; + do { + newState = randomFrom(ShardRoutingState.values()); + } while (newState == otherRouting.state()); + + UnassignedInfo unassignedInfo = otherRouting.unassignedInfo(); + if (unassignedInfo == null && (newState == ShardRoutingState.UNASSIGNED || newState == ShardRoutingState.INITIALIZING)) { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test"); + } + + otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId(), otherRouting.relocatingNodeId(), + otherRouting.restoreSource(), otherRouting.primary(), newState, otherRouting.version(), unassignedInfo); + break; + } + + if (randomBoolean()) { + // change version + otherRouting = new ShardRouting(otherRouting, otherRouting.version() + 1); + } + + if (randomBoolean()) { + // change unassigned info + otherRouting = TestShardRouting.newShardRouting(otherRouting.index(), otherRouting.id(), otherRouting.currentNodeId(), otherRouting.relocatingNodeId(), + otherRouting.restoreSource(), otherRouting.primary(), otherRouting.state(), otherRouting.version(), + otherRouting.unassignedInfo() == null ? new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test") : + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, otherRouting.unassignedInfo().getMessage() + "_1")); + } + + logger.debug("comparing\nthis {} to\nother {}", routing, otherRouting); + assertFalse("expected non-equality\nthis " + routing + ",\nother " + otherRouting, routing.equalsIgnoringMetaData(otherRouting)); + } + } public void testFrozenOnRoutingTable() { MetaData metaData = MetaData.builder() diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java index 82d3afc6e91..9f1f8296a6b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/TestShardRouting.java @@ -19,6 +19,8 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.test.ElasticsearchTestCase; + /** * A helper that allows to create shard routing instances within tests, while not requiring to expose * different simplified constructors on the ShardRouting itself. @@ -26,19 +28,19 @@ package org.elasticsearch.cluster.routing; public class TestShardRouting { public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) { - return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, null, buildAllocationId(state), true); + return new ShardRouting(index, shardId, currentNodeId, null, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true); } public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, long version) { - return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, buildAllocationId(state), true); + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true); } public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, boolean primary, ShardRoutingState state, AllocationId allocationId, long version) { - return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, null, allocationId, true); + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version, buildUnassignedInfo(state), allocationId, true); } public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) { - return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, null, buildAllocationId(state), true); + return new ShardRouting(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, buildUnassignedInfo(state), buildAllocationId(state), true); } public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, @@ -61,4 +63,17 @@ public class TestShardRouting { throw new IllegalStateException("illegal state"); } } + + private static UnassignedInfo buildUnassignedInfo(ShardRoutingState state) { + switch (state) { + case UNASSIGNED: + case INITIALIZING: + return new UnassignedInfo(ElasticsearchTestCase.randomFrom(UnassignedInfo.Reason.values()), "auto generated for test"); + case STARTED: + case RELOCATING: + return null; + default: + throw new IllegalStateException("illegal state"); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0e1502aa5e9..dbedad75077 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -205,7 +205,7 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest { ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); - routing = TestShardRouting.newShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), null, null, routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.version() + 1); + routing = TestShardRouting.newShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), null, routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.allocationId(), shard.shardRouting.version() + 1); shard.updateRoutingEntry(routing, true); shard.deleteShardState(); From c781c89e2c6779e395ef1c782a4daa0fdcebdaa2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 24 Jul 2015 13:33:06 +0200 Subject: [PATCH 09/12] ShardUtils#getElasticsearchLeafReader() should use FilterLeafReader#getDelegate() instead of FilterLeafReader#unwrap If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that may be instance of ElasticsearchLeafReader. By using #getDelegate() method we can check each filter reader layer if it is instance of ElasticsearchLeafReader, so that we never skip over any wrapped filtered leaf reader and lose the shard id. --- .../main/java/org/elasticsearch/index/shard/ShardUtils.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java b/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java index cc9b6e2a5a6..5d7635904e1 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShardUtils.java @@ -63,7 +63,11 @@ public final class ShardUtils { if (reader instanceof ElasticsearchLeafReader) { return (ElasticsearchLeafReader) reader; } else { - return getElasticsearchLeafReader(FilterLeafReader.unwrap(reader)); + // We need to use FilterLeafReader#getDelegate and not FilterLeafReader#unwrap, because + // If there are multiple levels of filtered leaf readers then with the unwrap() method it immediately + // returns the most inner leaf reader and thus skipping of over any other filtered leaf reader that + // may be instance of ElasticsearchLeafReader. This can cause us to miss the shardId. + return getElasticsearchLeafReader(((FilterLeafReader) reader).getDelegate()); } } return null; From e997342da48fe23fac8f4eb4982ea1d650b4ec38 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 20 Jul 2015 10:05:44 +0200 Subject: [PATCH 10/12] Added IndexSearcherWrapper extension point. This extension point allows one IndexSearcherWrapper instance to intercept the searcher from the Engine before it is used for a opertion. --- .../elasticsearch/index/engine/Engine.java | 10 +- .../index/engine/EngineConfig.java | 10 +- .../index/engine/IndexSearcherWrapper.java | 45 +++++++++ .../engine/IndexSearcherWrappingService.java | 94 +++++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 6 +- .../index/shard/IndexShardModule.java | 7 ++ .../index/shard/ShadowIndexShard.java | 5 +- .../index/engine/InternalEngineTests.java | 40 ++++++-- .../index/engine/ShadowEngineTests.java | 3 +- 9 files changed, 197 insertions(+), 23 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java create mode 100644 core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 33ba72e65ca..14181cc4c31 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.engine; import com.google.common.base.Preconditions; - import org.apache.lucene.index.*; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -45,7 +44,6 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; -import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; @@ -57,11 +55,7 @@ import org.elasticsearch.index.translog.Translog; import java.io.Closeable; import java.io.IOException; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -288,7 +282,7 @@ public abstract class Engine implements Closeable { try { final Searcher retVal = newSearcher(source, searcher, manager); success = true; - return retVal; + return config().getWrappingService().wrap(engineConfig, retVal); } finally { if (!success) { manager.release(searcher); diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 576e6dadb0d..778509a97dd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -22,7 +22,6 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.MergePolicy; -import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.similarities.Similarity; @@ -35,6 +34,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.indexing.ShardIndexingService; +import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; @@ -77,6 +77,7 @@ public final class EngineConfig { private final boolean forceNewTranslog; private final QueryCache queryCache; private final QueryCachingPolicy queryCachingPolicy; + private final IndexSearcherWrappingService wrappingService; /** * Index setting for index concurrency / number of threadstates in the indexwriter. @@ -143,7 +144,7 @@ public final class EngineConfig { Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener, - TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) { + TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, IndexSearcherWrappingService wrappingService, TranslogConfig translogConfig) { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; @@ -157,6 +158,7 @@ public final class EngineConfig { this.similarity = similarity; this.codecService = codecService; this.failedEngineListener = failedEngineListener; + this.wrappingService = wrappingService; this.optimizeAutoGenerateId = indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false); this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush); this.indexConcurrency = indexSettings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65))); @@ -421,6 +423,10 @@ public final class EngineConfig { return queryCachingPolicy; } + public IndexSearcherWrappingService getWrappingService() { + return wrappingService; + } + /** * Returns the translog config for this engine */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java new file mode 100644 index 00000000000..c8a75f447b7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.IndexSearcher; + +/** + * Extension point to add custom functionality at request time to the {@link DirectoryReader} + * and {@link IndexSearcher} managed by the {@link Engine}. + */ +public interface IndexSearcherWrapper { + + /** + * @param reader The provided directory reader to be wrapped to add custom functionality + * @return a new directory reader wrapping the provided directory reader or if no wrapping was performed + * the provided directory reader + */ + DirectoryReader wrap(DirectoryReader reader); + + /** + * @param searcher The provided index searcher to be wrapped to add custom functionality + * @return a new index searcher wrapping the provided index searcher or if no wrapping was performed + * the provided index searcher + */ + IndexSearcher wrap(IndexSearcher searcher) throws EngineException; + +} diff --git a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java new file mode 100644 index 00000000000..a0ea90e024e --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.search.IndexSearcher; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.engine.Engine.Searcher; + +import java.util.Set; + +/** + * Service responsible for wrapping the {@link DirectoryReader} and {@link IndexSearcher} of a {@link Searcher} via the + * configured {@link IndexSearcherWrapper} instance. This allows custom functionally to be added the {@link Searcher} + * before being used to do an operation (search, get, field stats etc.) + */ +// TODO: This needs extension point is a bit hacky now, because the IndexSearch from the engine can only be wrapped once, +// if we allowed the IndexSearcher to be wrapped multiple times then a custom IndexSearcherWrapper needs have good +// control over its location in the wrapping chain +public final class IndexSearcherWrappingService { + + private final IndexSearcherWrapper wrapper; + + // for unit tests: + IndexSearcherWrappingService() { + this.wrapper = null; + } + + @Inject + // Use a Set parameter here, because constructor parameter can't be optional + // and I prefer to keep the `wrapper` field final. + public IndexSearcherWrappingService(Set wrappers) { + if (wrappers.size() > 1) { + throw new IllegalStateException("wrapping of the index searcher by more than one wrappers is forbidden, found the following wrappers [" + wrappers + "]"); + } + if (wrappers.isEmpty()) { + this.wrapper = null; + } else { + this.wrapper = wrappers.iterator().next(); + } + } + + /** + * If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher + * gets wrapped and a new {@link Searcher} instances is returned, otherwise the provided {@link Searcher} is returned. + * + * This is invoked each time a {@link Searcher} is requested to do an operation. (for example search) + */ + public Searcher wrap(EngineConfig engineConfig, final Searcher engineSearcher) throws EngineException { + if (wrapper == null) { + return engineSearcher; + } + + DirectoryReader reader = wrapper.wrap((DirectoryReader) engineSearcher.reader()); + IndexSearcher innerIndexSearcher = new IndexSearcher(reader); + innerIndexSearcher.setQueryCache(engineConfig.getQueryCache()); + innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy()); + innerIndexSearcher.setSimilarity(engineConfig.getSimilarity()); + // TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point + // For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten + // This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times + IndexSearcher indexSearcher = wrapper.wrap(innerIndexSearcher); + if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) { + return engineSearcher; + } else { + return new Engine.Searcher(engineSearcher.source(), indexSearcher) { + + @Override + public void close() throws ElasticsearchException { + engineSearcher.close(); + } + }; + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index fa75975b981..6ee6504bf42 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -168,6 +168,7 @@ public class IndexShard extends AbstractIndexShardComponent { protected volatile IndexShardState state; protected final AtomicReference currentEngineReference = new AtomicReference<>(); protected final EngineFactory engineFactory; + private final IndexSearcherWrappingService wrappingService; @Nullable private RecoveryState recoveryState; @@ -197,12 +198,13 @@ public class IndexShard extends AbstractIndexShardComponent { IndicesQueryCache indicesQueryCache, ShardPercolateService shardPercolateService, CodecService codecService, ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory, - ClusterService clusterService, ShardPath path, BigArrays bigArrays) { + ClusterService clusterService, ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) { super(shardId, indexSettingsService.getSettings()); this.codecService = codecService; this.warmer = warmer; this.deletionPolicy = deletionPolicy; this.similarityService = similarityService; + this.wrappingService = wrappingService; Preconditions.checkNotNull(store, "Store must be provided to the index shard"); Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard"); this.engineFactory = factory; @@ -1361,7 +1363,7 @@ public class IndexShard extends AbstractIndexShardComponent { }; return new EngineConfig(shardId, threadPool, indexingService, indexSettingsService.indexSettings(), warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, - mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig); + mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, wrappingService, translogConfig); } private static class IndexShardOperationCounter extends AbstractRefCounted { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java index c445ce442da..870cc4eee99 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java @@ -21,7 +21,10 @@ package org.elasticsearch.index.shard; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.IndexSearcherWrapper; +import org.elasticsearch.index.engine.IndexSearcherWrappingService; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.percolator.stats.ShardPercolateService; @@ -73,6 +76,10 @@ public class IndexShardModule extends AbstractModule { bind(StoreRecoveryService.class).asEagerSingleton(); bind(ShardPercolateService.class).asEagerSingleton(); bind(ShardTermVectorsService.class).asEagerSingleton(); + bind(IndexSearcherWrappingService.class).asEagerSingleton(); + // this injects an empty set in IndexSearcherWrappingService, otherwise guice can't construct IndexSearcherWrappingService + Multibinder multibinder + = Multibinder.newSetBinder(binder(), IndexSearcherWrapper.class); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 82eb458f6cb..7224e701751 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; +import org.elasticsearch.index.engine.IndexSearcherWrappingService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; @@ -66,13 +67,13 @@ public final class ShadowIndexShard extends IndexShard { IndexService indexService, @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, EngineFactory factory, ClusterService clusterService, - ShardPath path, BigArrays bigArrays) throws IOException { + ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException { super(shardId, indexSettingsService, indicesLifecycle, store, storeRecoveryService, threadPool, mapperService, queryParserService, indexCache, indexAliasesService, indicesQueryCache, shardPercolateService, codecService, termVectorsService, indexFieldDataService, indexService, warmer, deletionPolicy, similarityService, - factory, clusterService, path, bigArrays); + factory, clusterService, path, bigArrays, wrappingService); } /** diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 45a730a50f0..a64fddb503b 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -236,15 +236,15 @@ public class InternalEngineTests extends ElasticsearchTestCase { } - protected InternalEngine createEngine(Store store, Path translogPath) { - return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy()); + protected InternalEngine createEngine(Store store, Path translogPath, IndexSearcherWrapper... wrappers) { + return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy(), wrappers); } - protected InternalEngine createEngine(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) { - return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy), false); + protected InternalEngine createEngine(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy, IndexSearcherWrapper... wrappers) { + return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy, wrappers), false); } - public EngineConfig config(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) { + public EngineConfig config(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy, IndexSearcherWrapper... wrappers) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool); @@ -255,7 +255,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test } - }, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); + }, new TranslogHandler(shardId.index().getName()), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(new HashSet<>(Arrays.asList(wrappers))), translogConfig); return config; } @@ -493,6 +493,32 @@ public class InternalEngineTests extends ElasticsearchTestCase { ; } + @Test + public void testIndexSearcherWrapper() throws Exception { + final AtomicInteger counter = new AtomicInteger(); + IndexSearcherWrapper wrapper = new IndexSearcherWrapper() { + + @Override + public DirectoryReader wrap(DirectoryReader reader) { + counter.incrementAndGet(); + return reader; + } + + @Override + public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { + counter.incrementAndGet(); + return searcher; + } + }; + Store store = createStore(); + Path translog = createTempDir("translog-test"); + InternalEngine engine = createEngine(store, translog, wrapper); + Engine.Searcher searcher = engine.acquireSearcher("test"); + assertThat(counter.get(), equalTo(2)); + searcher.close(); + IOUtils.close(store, engine); + } + @Test public void testSimpleOperations() throws Exception { Engine.Searcher searchResult = engine.acquireSearcher("test"); @@ -1985,7 +2011,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings() , null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(), config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener() - , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); + , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig); try { new InternalEngine(brokenConfig, false); diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index 08210004591..bf89a5a7baf 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -47,7 +47,6 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; -import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; @@ -226,7 +225,7 @@ public class ShadowEngineTests extends ElasticsearchTestCase { @Override public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test - }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); + }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig); return config; } From 57cbce2824dba219792a5b7080f0f09c66c5c186 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 23 Jul 2015 09:37:30 +0200 Subject: [PATCH 11/12] Allocation: ThrottlingAllocationDecider should not counting relocating shards The ThrottlingAllocationDecider is responsible to limit the number of incoming/local recoveries on a node. It therefore shouldn't count shards marked as relocating which represent the source of the recovery. Closes #12409 --- .../allocation/decider/ThrottlingAllocationDecider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 2e7d5dd3cc6..59d6cd101c4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -38,7 +38,7 @@ import org.elasticsearch.node.settings.NodeSettingsService; * node. The default is 4 *

*

  • cluster.routing.allocation.node_concurrent_recoveries - - * restricts the number of concurrent recovery operations on a single node. The + * restricts the number of total concurrent shards initializing on a single node. The * default is 2
  • * *

    @@ -106,7 +106,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider { public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) { int currentRecoveries = 0; for (ShardRouting shard : node) { - if (shard.initializing() || shard.relocating()) { + if (shard.initializing()) { currentRecoveries++; } } From 76c03df035b777b446ab961469f63d93dd9bc981 Mon Sep 17 00:00:00 2001 From: Alex Ksikes Date: Fri, 17 Jul 2015 12:15:18 +0200 Subject: [PATCH 12/12] Make sure filter is correctly parsed for multi-term vectors This makes sure the `filter` parameter is correctly parsed in a multi-term vector request when using `ids` and `parameters`. Closes #12311 Closes #12312 --- .../termvectors/TermVectorsRequest.java | 1 + .../termvectors/TermVectorsUnitTests.java | 30 +++++++++++++++++-- .../action/termvectors/multiRequest3.json | 16 ++++++++++ 3 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/action/termvectors/multiRequest3.json diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java index e8d5921b382..cbf6e525777 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java @@ -167,6 +167,7 @@ public class TermVectorsRequest extends SingleShardRequest i this.version = other.version(); this.versionType = VersionType.fromValue(other.versionType().getValue()); this.startTime = other.startTime(); + this.filterSettings = other.filterSettings(); } public TermVectorsRequest(MultiGetRequest.Item item) { diff --git a/core/src/test/java/org/elasticsearch/action/termvectors/TermVectorsUnitTests.java b/core/src/test/java/org/elasticsearch/action/termvectors/TermVectorsUnitTests.java index f9c4d2f39f0..49f896c748d 100644 --- a/core/src/test/java/org/elasticsearch/action/termvectors/TermVectorsUnitTests.java +++ b/core/src/test/java/org/elasticsearch/action/termvectors/TermVectorsUnitTests.java @@ -302,8 +302,8 @@ public class TermVectorsUnitTests extends ElasticsearchTestCase { request = new MultiTermVectorsRequest(); request.add(new TermVectorsRequest(), bytes); checkParsedParameters(request); - } + void checkParsedParameters(MultiTermVectorsRequest request) { Set ids = new HashSet<>(); ids.add("1"); @@ -324,5 +324,31 @@ public class TermVectorsUnitTests extends ElasticsearchTestCase { assertThat(singleRequest.selectedFields(), equalTo(fields)); } } - + + @Test // issue #12311 + public void testMultiParserFilter() throws Exception { + byte[] data = Streams.copyToBytesFromClasspath("/org/elasticsearch/action/termvectors/multiRequest3.json"); + BytesReference bytes = new BytesArray(data); + MultiTermVectorsRequest request = new MultiTermVectorsRequest(); + request.add(new TermVectorsRequest(), bytes); + checkParsedFilterParameters(request); + } + + void checkParsedFilterParameters(MultiTermVectorsRequest multiRequest) { + int id = 1; + for (TermVectorsRequest request : multiRequest.requests) { + assertThat(request.index(), equalTo("testidx")); + assertThat(request.type(), equalTo("test")); + assertThat(request.id(), equalTo(id+"")); + assertNotNull(request.filterSettings()); + assertThat(request.filterSettings().maxNumTerms, equalTo(20)); + assertThat(request.filterSettings().minTermFreq, equalTo(1)); + assertThat(request.filterSettings().maxTermFreq, equalTo(20)); + assertThat(request.filterSettings().minDocFreq, equalTo(1)); + assertThat(request.filterSettings().maxDocFreq, equalTo(20)); + assertThat(request.filterSettings().minWordLength, equalTo(1)); + assertThat(request.filterSettings().maxWordLength, equalTo(20)); + id++; + } + } } diff --git a/core/src/test/java/org/elasticsearch/action/termvectors/multiRequest3.json b/core/src/test/java/org/elasticsearch/action/termvectors/multiRequest3.json new file mode 100644 index 00000000000..457f43cdc9a --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/termvectors/multiRequest3.json @@ -0,0 +1,16 @@ +{ + "ids": ["1","2"], + "parameters": { + "_index": "testidx", + "_type": "test", + "filter": { + "max_num_terms": 20, + "min_term_freq": 1, + "max_term_freq": 20, + "min_doc_freq": 1, + "max_doc_freq": 20, + "min_word_length": 1, + "max_word_length": 20 + } + } +} \ No newline at end of file