From c939bcb7f57b1810aaea5186a6853538a1083593 Mon Sep 17 00:00:00 2001 From: David Cho-Lerat Date: Tue, 30 May 2017 15:01:13 +0200 Subject: [PATCH 001/208] Correct some spelling in match-phrase-prefix docs (#24956) --- docs/reference/query-dsl/match-phrase-prefix-query.asciidoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/reference/query-dsl/match-phrase-prefix-query.asciidoc b/docs/reference/query-dsl/match-phrase-prefix-query.asciidoc index 40cfabdc96f..73f1be9143c 100644 --- a/docs/reference/query-dsl/match-phrase-prefix-query.asciidoc +++ b/docs/reference/query-dsl/match-phrase-prefix-query.asciidoc @@ -19,7 +19,7 @@ GET /_search It accepts the same parameters as the phrase type. In addition, it also accepts a `max_expansions` parameter (default `50`) that can control to how -many prefixes the last term will be expanded. It is highly recommended to set +many suffixes the last term will be expanded. It is highly recommended to set it to an acceptable value to control the execution time of the query. For example: @@ -43,7 +43,7 @@ GET /_search =================================================== The `match_phrase_prefix` query is a poor-man's autocomplete. It is very easy -to use, which let's you get started quickly with _search-as-you-type_ but it's +to use, which lets you get started quickly with _search-as-you-type_ but its results, which usually are good enough, can sometimes be confusing. Consider the query string `quick brown f`. This query works by creating a @@ -53,7 +53,7 @@ dictionary to find the first 50 terms that begin with `f`, and adds these terms to the phrase query. The problem is that the first 50 terms may not include the term `fox` so the -phase `quick brown fox` will not be found. This usually isn't a problem as +phrase `quick brown fox` will not be found. This usually isn't a problem as the user will continue to type more letters until the word they are looking for appears. From 491dc1186ae3a0f01a921d7761c51a8e6b040d50 Mon Sep 17 00:00:00 2001 From: David Cho-Lerat Date: Tue, 30 May 2017 15:41:18 +0200 Subject: [PATCH 002/208] Add missing word to terms-query.asciidoc (#24960) --- docs/reference/query-dsl/terms-query.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/query-dsl/terms-query.asciidoc b/docs/reference/query-dsl/terms-query.asciidoc index 9782bb9208f..1c034ec3894 100644 --- a/docs/reference/query-dsl/terms-query.asciidoc +++ b/docs/reference/query-dsl/terms-query.asciidoc @@ -92,7 +92,7 @@ GET /tweets/_search -------------------------------------------------- // CONSOLE -The structure of the external terms document can also include array of +The structure of the external terms document can also include an array of inner objects, for example: [source,js] From 9957bdf0ad88e3625ef42c95a80d5edc3aac02f4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 10:05:11 -0400 Subject: [PATCH 003/208] Handle primary failure handling replica response Today if the primary throws an exception while handling the replica response (e.g., because it is already closed while updating the local checkpoint for the replica), or because of a bug that causes an exception to be thrown in the replica operation listener, this exception is caught by the underlying transport handler plumbing and is translated into a response handler failure transport exception that is passed to the onFailure method of the replica operation listener. This causes the primary to turn around and fail the replica which is a disastrous and incorrect outcome as there's nothing wrong with the replica, it is the primary that is broken and deserves a paddlin'. This commit handles this situation by failing the primary. Relates #24926 --- .../replication/ReplicationOperation.java | 17 ++++-- .../ReplicationOperationTests.java | 53 ++++++++++++++++++- 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index ee8270e911c..09b6a4d8220 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -185,7 +186,15 @@ public class ReplicationOperation< @Override public void onResponse(ReplicaResponse response) { successfulShards.incrementAndGet(); - primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint()); + try { + primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint()); + } catch (final AlreadyClosedException e) { + // okay, the index was deleted or this shard was never activated after a relocation; fallthrough and finish normally + } catch (final Exception e) { + // fail the primary but fall through and let the rest of operation processing complete + final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard); + primary.failShard(message, e); + } decPendingAndFinishIfNeeded(); } @@ -321,7 +330,10 @@ public class ReplicationOperation< ShardRouting routingEntry(); /** - * fail the primary, typically due to the fact that the operation has learned the primary has been demoted by the master + * Fail the primary shard. + * + * @param message the failure message + * @param exception the exception that triggered the failure */ void failShard(String message, Exception exception); @@ -335,7 +347,6 @@ public class ReplicationOperation< */ PrimaryResultT perform(RequestT request) throws Exception; - /** * Notifies the primary of a local checkpoint for the given allocation. * diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 9fcc8c24353..88cf5769a48 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; @@ -56,7 +57,9 @@ import java.util.function.Supplier; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -191,8 +194,7 @@ public class ReplicationOperationTests extends ESTestCase { assertTrue(primaryFailed.compareAndSet(false, true)); } }; - final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, - () -> finalState); + final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, () -> finalState); op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); @@ -299,6 +301,53 @@ public class ReplicationOperationTests extends ESTestCase { } } + public void testPrimaryFailureHandlingReplicaResponse() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + + final Request request = new Request(shardId); + + final ClusterState state = stateWithActivePrimary(index, true, 1, 0); + final IndexMetaData indexMetaData = state.getMetaData().index(index); + final long primaryTerm = indexMetaData.primaryTerm(0); + final ShardRouting primaryRouting = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + + final boolean fatal = randomBoolean(); + final AtomicBoolean primaryFailed = new AtomicBoolean(); + final ReplicationOperation.Primary primary = new TestPrimary(primaryRouting, primaryTerm) { + + @Override + public void failShard(String message, Exception exception) { + primaryFailed.set(true); + } + + @Override + public void updateLocalCheckpointForShard(String allocationId, long checkpoint) { + if (primaryRouting.allocationId().getId().equals(allocationId)) { + super.updateLocalCheckpointForShard(allocationId, checkpoint); + } else { + if (fatal) { + throw new NullPointerException(); + } else { + throw new AlreadyClosedException("already closed"); + } + } + } + + }; + + final PlainActionFuture listener = new PlainActionFuture<>(); + final ReplicationOperation.Replicas replicas = new TestReplicaProxy(Collections.emptyMap()); + TestReplicationOperation operation = new TestReplicationOperation(request, primary, listener, replicas, () -> state); + operation.execute(); + + assertThat(primaryFailed.get(), equalTo(fatal)); + final ShardInfo shardInfo = listener.actionGet().getShardInfo(); + assertThat(shardInfo.getFailed(), equalTo(0)); + assertThat(shardInfo.getFailures(), arrayWithSize(0)); + assertThat(shardInfo.getSuccessful(), equalTo(1 + getExpectedReplicas(shardId, state).size())); + } + private Set getExpectedReplicas(ShardId shardId, ClusterState state) { Set expectedReplicas = new HashSet<>(); String localNodeId = state.nodes().getLocalNodeId(); From b8d7b83f8ecf96b8eff753df31422198ff295eda Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Tue, 30 May 2017 10:36:22 -0400 Subject: [PATCH 004/208] Correctly set doc_count when MovAvg "predicts" values on existing buckets (#24892) If the bucket already exists, due to non-overlapping series or missing data, the MovAvg creates a merged bucket with the existing aggs + the new prediction. This fixes a small bug where the doc_count was not being set correctly. Relates to #24327 --- .../movavg/MovAvgPipelineAggregator.java | 2 +- .../pipeline/moving/avg/MovAvgIT.java | 67 +++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java index 49422995c95..196f7cca473 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java @@ -161,7 +161,7 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { }).collect(Collectors.toList()); aggs.add(new InternalSimpleValue(name(), predictions[i], formatter, new ArrayList(), metaData())); - Bucket newBucket = factory.createBucket(newKey, 0, new InternalAggregations(aggs)); + Bucket newBucket = factory.createBucket(newKey, bucket.getDocCount(), new InternalAggregations(aggs)); // Overwrite the existing bucket with the new version newBuckets.set(lastValidPosition + i + 1, newBucket); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java index f24dfe42270..bbe6ecc3a4e 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java @@ -19,9 +19,12 @@ package org.elasticsearch.search.aggregations.pipeline.moving.avg; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.collect.EvictingQueue; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; @@ -41,6 +44,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuil import org.elasticsearch.test.ESIntegTestCase; import org.hamcrest.Matchers; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -67,6 +71,7 @@ import static org.hamcrest.core.IsNull.nullValue; public class MovAvgIT extends ESIntegTestCase { private static final String INTERVAL_FIELD = "l_value"; private static final String VALUE_FIELD = "v_value"; + private static final String VALUE_FIELD2 = "v_value2"; static int interval; static int numBuckets; @@ -1204,6 +1209,68 @@ public class MovAvgIT extends ESIntegTestCase { } } + public void testPredictWithNonEmptyBuckets() throws Exception { + + createIndex("predict_non_empty"); + BulkRequestBuilder bulkBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (int i = 0; i < 10; i++) { + bulkBuilder.add(client().prepareIndex("predict_non_empty", "type").setSource( + jsonBuilder().startObject().field(INTERVAL_FIELD, i) + .field(VALUE_FIELD, 10) + .field(VALUE_FIELD2, 10) + .endObject())); + } + for (int i = 10; i < 20; i++) { + // Extra so there is a bucket that only has second field + bulkBuilder.add(client().prepareIndex("predict_non_empty", "type").setSource( + jsonBuilder().startObject().field(INTERVAL_FIELD, i).field(VALUE_FIELD2, 10).endObject())); + } + + bulkBuilder.execute().actionGet(); + ensureSearchable(); + + SearchResponse response = client() + .prepareSearch("predict_non_empty") + .setTypes("type") + .addAggregation( + histogram("histo") + .field(INTERVAL_FIELD) + .interval(1) + .subAggregation(max("max").field(VALUE_FIELD)) + .subAggregation(max("max2").field(VALUE_FIELD2)) + .subAggregation( + movingAvg("movavg_values", "max") + .window(windowSize) + .modelBuilder(new SimpleModel.SimpleModelBuilder()) + .gapPolicy(BucketHelpers.GapPolicy.SKIP).predict(5))).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(20)); + + SimpleValue current = buckets.get(0).getAggregations().get("movavg_values"); + assertThat(current, nullValue()); + + for (int i = 1; i < 20; i++) { + Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat(bucket.getKey(), equalTo((double)i)); + assertThat(bucket.getDocCount(), equalTo(1L)); + SimpleValue movAvgAgg = bucket.getAggregations().get("movavg_values"); + if (i < 15) { + assertThat(movAvgAgg, notNullValue()); + assertThat(movAvgAgg.value(), equalTo(10d)); + } else { + assertThat(movAvgAgg, nullValue()); + } + } + } + private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedCountsIter, Iterator expectedValuesIter) { if (!expectedBucketIter.hasNext()) { fail("`expectedBucketIter` iterator ended before `actual` iterator, size mismatch"); From 6d9ce957d4aa2916ba2c3f467bda70fdf6325647 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 30 May 2017 10:39:22 -0400 Subject: [PATCH 005/208] Drop name from TokenizerFactory (#24869) Drops `TokenizerFactory#name`, replacing it with `CustomAnalyzer#getTokenizerName` which is much better targeted at its single use case inside the analysis API. Drops a test that I would have had to refactor which is duplicated by `AnalysisModuleTests`. To keep this change from blowing up in size I've left two mostly mechanical changes to be done in followups: 1. `TokenizerFactory` can now be entirely dropped and replaced with `Supplier`. 2. `AbstractTokenizerFactory`'s ctor still takes a `String` parameter where the name once was. --- .../analyze/TransportAnalyzeAction.java | 17 ++++-- .../analysis/AbstractTokenizerFactory.java | 13 +---- .../index/analysis/CustomAnalyzer.java | 17 ++++-- .../analysis/CustomAnalyzerProvider.java | 2 +- .../analysis/CustomNormalizerProvider.java | 1 + .../analysis/PreConfiguredTokenizer.java | 17 +----- .../index/analysis/TokenizerFactory.java | 5 +- .../indices/analysis/PreBuiltTokenizers.java | 13 ----- .../indices/analysis/DummyAnalysisPlugin.java | 56 ------------------- .../indices/analysis/DummyAnalyzer.java | 33 ----------- .../analysis/DummyAnalyzerProvider.java | 40 ------------- .../analysis/DummyCharFilterFactory.java | 36 ------------ .../analysis/DummyTokenFilterFactory.java | 33 ----------- .../analysis/DummyTokenizerFactory.java | 35 ------------ .../PreBuiltAnalyzerIntegrationIT.java | 37 +----------- 15 files changed, 32 insertions(+), 323 deletions(-) delete mode 100644 core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalysisPlugin.java delete mode 100644 core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalyzer.java delete mode 100644 core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalyzerProvider.java delete mode 100644 core/src/test/java/org/elasticsearch/indices/analysis/DummyCharFilterFactory.java delete mode 100644 core/src/test/java/org/elasticsearch/indices/analysis/DummyTokenFilterFactory.java delete mode 100644 core/src/test/java/org/elasticsearch/indices/analysis/DummyTokenizerFactory.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java index d7e299b1cf1..11566378085 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FastStringReader; import org.elasticsearch.common.settings.Settings; @@ -179,7 +180,8 @@ public class TransportAnalyzeAction extends TransportSingleShardAction tokenizerFactory = parseTokenizerFactory(request, indexAnalyzers, + analysisRegistry, environment); TokenFilterFactory[] tokenFilterFactories = new TokenFilterFactory[0]; tokenFilterFactories = getTokenFilterFactories(request, indexSettings, analysisRegistry, environment, tokenFilterFactories); @@ -187,7 +189,7 @@ public class TransportAnalyzeAction extends TransportSingleShardAction parseTokenizerFactory(AnalyzeRequest request, IndexAnalyzers indexAnalzyers, AnalysisRegistry analysisRegistry, Environment environment) throws IOException { + String name; TokenizerFactory tokenizerFactory; final AnalyzeRequest.NameOrDefinition tokenizer = request.tokenizer(); // parse anonymous settings @@ -568,6 +572,7 @@ public class TransportAnalyzeAction extends TransportSingleShardAction tokenizerFactoryFactory; @@ -576,18 +581,20 @@ public class TransportAnalyzeAction extends TransportSingleShardAction(name, tokenizerFactory); } private static IndexSettings getNaIndexSettings(Settings settings) { diff --git a/core/src/main/java/org/elasticsearch/index/analysis/AbstractTokenizerFactory.java b/core/src/main/java/org/elasticsearch/index/analysis/AbstractTokenizerFactory.java index dfa177a7fbf..bf6b2fd7c5b 100644 --- a/core/src/main/java/org/elasticsearch/index/analysis/AbstractTokenizerFactory.java +++ b/core/src/main/java/org/elasticsearch/index/analysis/AbstractTokenizerFactory.java @@ -25,23 +25,14 @@ import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.IndexSettings; public abstract class AbstractTokenizerFactory extends AbstractIndexComponent implements TokenizerFactory { - - private final String name; - protected final Version version; - - public AbstractTokenizerFactory(IndexSettings indexSettings, String name, Settings settings) { + // TODO drop `String ignored` in a followup + public AbstractTokenizerFactory(IndexSettings indexSettings, String ignored, Settings settings) { super(indexSettings); - this.name = name; this.version = Analysis.parseAnalysisVersion(this.indexSettings.getSettings(), settings, logger); } - @Override - public String name() { - return this.name; - } - public final Version version() { return version; } diff --git a/core/src/main/java/org/elasticsearch/index/analysis/CustomAnalyzer.java b/core/src/main/java/org/elasticsearch/index/analysis/CustomAnalyzer.java index 68799413907..d70b4628f53 100644 --- a/core/src/main/java/org/elasticsearch/index/analysis/CustomAnalyzer.java +++ b/core/src/main/java/org/elasticsearch/index/analysis/CustomAnalyzer.java @@ -27,6 +27,7 @@ import java.io.Reader; public final class CustomAnalyzer extends Analyzer { + private final String tokenizerName; private final TokenizerFactory tokenizerFactory; private final CharFilterFactory[] charFilters; @@ -36,12 +37,14 @@ public final class CustomAnalyzer extends Analyzer { private final int positionIncrementGap; private final int offsetGap; - public CustomAnalyzer(TokenizerFactory tokenizerFactory, CharFilterFactory[] charFilters, TokenFilterFactory[] tokenFilters) { - this(tokenizerFactory, charFilters, tokenFilters, 0, -1); + public CustomAnalyzer(String tokenizerName, TokenizerFactory tokenizerFactory, CharFilterFactory[] charFilters, + TokenFilterFactory[] tokenFilters) { + this(tokenizerName, tokenizerFactory, charFilters, tokenFilters, 0, -1); } - public CustomAnalyzer(TokenizerFactory tokenizerFactory, CharFilterFactory[] charFilters, TokenFilterFactory[] tokenFilters, - int positionIncrementGap, int offsetGap) { + public CustomAnalyzer(String tokenizerName, TokenizerFactory tokenizerFactory, CharFilterFactory[] charFilters, + TokenFilterFactory[] tokenFilters, int positionIncrementGap, int offsetGap) { + this.tokenizerName = tokenizerName; this.tokenizerFactory = tokenizerFactory; this.charFilters = charFilters; this.tokenFilters = tokenFilters; @@ -49,6 +52,12 @@ public final class CustomAnalyzer extends Analyzer { this.offsetGap = offsetGap; } + /** + * The name of the tokenizer as configured by the user. + */ + public String getTokenizerName() { + return tokenizerName; + } public TokenizerFactory tokenizerFactory() { return tokenizerFactory; diff --git a/core/src/main/java/org/elasticsearch/index/analysis/CustomAnalyzerProvider.java b/core/src/main/java/org/elasticsearch/index/analysis/CustomAnalyzerProvider.java index f1123045622..3bf5d43375c 100644 --- a/core/src/main/java/org/elasticsearch/index/analysis/CustomAnalyzerProvider.java +++ b/core/src/main/java/org/elasticsearch/index/analysis/CustomAnalyzerProvider.java @@ -80,7 +80,7 @@ public class CustomAnalyzerProvider extends AbstractIndexAnalyzerProvider create.apply(version); } } } diff --git a/core/src/main/java/org/elasticsearch/index/analysis/TokenizerFactory.java b/core/src/main/java/org/elasticsearch/index/analysis/TokenizerFactory.java index 6ca9d457cbc..be96dbd6560 100644 --- a/core/src/main/java/org/elasticsearch/index/analysis/TokenizerFactory.java +++ b/core/src/main/java/org/elasticsearch/index/analysis/TokenizerFactory.java @@ -21,9 +21,6 @@ package org.elasticsearch.index.analysis; import org.apache.lucene.analysis.Tokenizer; -public interface TokenizerFactory { - - String name(); - +public interface TokenizerFactory { // TODO replace with Supplier Tokenizer create(); } diff --git a/core/src/main/java/org/elasticsearch/indices/analysis/PreBuiltTokenizers.java b/core/src/main/java/org/elasticsearch/indices/analysis/PreBuiltTokenizers.java index 52e7ff6c9c4..9cc9ed1ea23 100644 --- a/core/src/main/java/org/elasticsearch/indices/analysis/PreBuiltTokenizers.java +++ b/core/src/main/java/org/elasticsearch/indices/analysis/PreBuiltTokenizers.java @@ -38,8 +38,6 @@ import org.elasticsearch.index.analysis.TokenFilterFactory; import org.elasticsearch.index.analysis.TokenizerFactory; import org.elasticsearch.indices.analysis.PreBuiltCacheFactory.CachingStrategy; -import java.util.Locale; - public enum PreBuiltTokenizers { STANDARD(CachingStrategy.ONE) { @@ -148,14 +146,8 @@ public enum PreBuiltTokenizers { public synchronized TokenizerFactory getTokenizerFactory(final Version version) { TokenizerFactory tokenizerFactory = cache.get(version); if (tokenizerFactory == null) { - final String finalName = name().toLowerCase(Locale.ROOT); if (getMultiTermComponent(version) != null) { tokenizerFactory = new MultiTermAwareTokenizerFactory() { - @Override - public String name() { - return finalName; - } - @Override public Tokenizer create() { return PreBuiltTokenizers.this.create(version); @@ -168,11 +160,6 @@ public enum PreBuiltTokenizers { }; } else { tokenizerFactory = new TokenizerFactory() { - @Override - public String name() { - return finalName; - } - @Override public Tokenizer create() { return PreBuiltTokenizers.this.create(version); diff --git a/core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalysisPlugin.java b/core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalysisPlugin.java deleted file mode 100644 index bbfeacfc590..00000000000 --- a/core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalysisPlugin.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.analysis; - -import org.apache.lucene.analysis.Analyzer; -import org.elasticsearch.index.analysis.AnalyzerProvider; -import org.elasticsearch.index.analysis.CharFilterFactory; -import org.elasticsearch.index.analysis.TokenFilterFactory; -import org.elasticsearch.index.analysis.TokenizerFactory; -import org.elasticsearch.indices.analysis.AnalysisModule.AnalysisProvider; -import org.elasticsearch.plugins.AnalysisPlugin; -import org.elasticsearch.plugins.Plugin; - -import java.util.Map; - -import static java.util.Collections.singletonMap; - -public class DummyAnalysisPlugin extends Plugin implements AnalysisPlugin { - @Override - public Map> getCharFilters() { - return singletonMap("dummy_char_filter", (a, b, c, d) -> new DummyCharFilterFactory()); - } - - @Override - public Map> getTokenFilters() { - return singletonMap("dummy_token_filter", (a, b, c, d) -> new DummyTokenFilterFactory()); - } - - @Override - public Map> getTokenizers() { - return singletonMap("dummy_tokenizer", (a, b, c, d) -> new DummyTokenizerFactory()); - } - - @Override - public Map>> getAnalyzers() { - return singletonMap("dummy", (a, b, c, d) -> new DummyAnalyzerProvider()); - } - -} diff --git a/core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalyzer.java b/core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalyzer.java deleted file mode 100644 index 61b5d2eb319..00000000000 --- a/core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalyzer.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.analysis; - -import org.apache.lucene.analysis.StopwordAnalyzerBase; - -public class DummyAnalyzer extends StopwordAnalyzerBase { - - protected DummyAnalyzer() { - } - - @Override - protected TokenStreamComponents createComponents(String fieldName) { - return null; - } -} diff --git a/core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalyzerProvider.java b/core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalyzerProvider.java deleted file mode 100644 index 68beb817d70..00000000000 --- a/core/src/test/java/org/elasticsearch/indices/analysis/DummyAnalyzerProvider.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.analysis; - -import org.elasticsearch.index.analysis.AnalyzerProvider; -import org.elasticsearch.index.analysis.AnalyzerScope; - -public class DummyAnalyzerProvider implements AnalyzerProvider { - @Override - public String name() { - return "dummy"; - } - - @Override - public AnalyzerScope scope() { - return AnalyzerScope.INDICES; - } - - @Override - public DummyAnalyzer get() { - return new DummyAnalyzer(); - } -} diff --git a/core/src/test/java/org/elasticsearch/indices/analysis/DummyCharFilterFactory.java b/core/src/test/java/org/elasticsearch/indices/analysis/DummyCharFilterFactory.java deleted file mode 100644 index 8c5896e59ec..00000000000 --- a/core/src/test/java/org/elasticsearch/indices/analysis/DummyCharFilterFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.analysis; - -import org.elasticsearch.index.analysis.CharFilterFactory; - -import java.io.Reader; - -public class DummyCharFilterFactory implements CharFilterFactory { - @Override - public String name() { - return "dummy_char_filter"; - } - - @Override - public Reader create(Reader reader) { - return null; - } -} diff --git a/core/src/test/java/org/elasticsearch/indices/analysis/DummyTokenFilterFactory.java b/core/src/test/java/org/elasticsearch/indices/analysis/DummyTokenFilterFactory.java deleted file mode 100644 index 489e4dce7b8..00000000000 --- a/core/src/test/java/org/elasticsearch/indices/analysis/DummyTokenFilterFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.analysis; - -import org.apache.lucene.analysis.TokenStream; -import org.elasticsearch.index.analysis.TokenFilterFactory; - -public class DummyTokenFilterFactory implements TokenFilterFactory { - @Override public String name() { - return "dummy_token_filter"; - } - - @Override public TokenStream create(TokenStream tokenStream) { - return null; - } -} diff --git a/core/src/test/java/org/elasticsearch/indices/analysis/DummyTokenizerFactory.java b/core/src/test/java/org/elasticsearch/indices/analysis/DummyTokenizerFactory.java deleted file mode 100644 index a27c6ae7dba..00000000000 --- a/core/src/test/java/org/elasticsearch/indices/analysis/DummyTokenizerFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.analysis; - -import org.apache.lucene.analysis.Tokenizer; -import org.elasticsearch.index.analysis.TokenizerFactory; - -public class DummyTokenizerFactory implements TokenizerFactory { - @Override - public String name() { - return "dummy_tokenizer"; - } - - @Override - public Tokenizer create() { - return null; - } -} diff --git a/core/src/test/java/org/elasticsearch/indices/analysis/PreBuiltAnalyzerIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/analysis/PreBuiltAnalyzerIntegrationIT.java index d6e93ce559e..7722795525d 100644 --- a/core/src/test/java/org/elasticsearch/indices/analysis/PreBuiltAnalyzerIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/analysis/PreBuiltAnalyzerIntegrationIT.java @@ -46,7 +46,7 @@ import static org.hamcrest.Matchers.notNullValue; public class PreBuiltAnalyzerIntegrationIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(DummyAnalysisPlugin.class, InternalSettingsPlugin.class); + return Arrays.asList(InternalSettingsPlugin.class); } public void testThatPreBuiltAnalyzersAreNotClosedOnIndexClose() throws Exception { @@ -114,41 +114,6 @@ public class PreBuiltAnalyzerIntegrationIT extends ESIntegTestCase { assertLuceneAnalyzersAreNotClosed(loadedAnalyzers); } - /** - * Test case for #5030: Upgrading analysis plugins fails - * See https://github.com/elastic/elasticsearch/issues/5030 - */ - public void testThatPluginAnalyzersCanBeUpdated() throws Exception { - final XContentBuilder mapping = jsonBuilder().startObject() - .startObject("type") - .startObject("properties") - .startObject("foo") - .field("type", "text") - .field("analyzer", "dummy") - .endObject() - .startObject("bar") - .field("type", "text") - .field("analyzer", "my_dummy") - .endObject() - .endObject() - .endObject() - .endObject(); - - Settings versionSettings = settings(randomVersion(random())) - .put("index.analysis.analyzer.my_dummy.type", "custom") - .put("index.analysis.analyzer.my_dummy.filter", "my_dummy_token_filter") - .put("index.analysis.analyzer.my_dummy.char_filter", "my_dummy_char_filter") - .put("index.analysis.analyzer.my_dummy.tokenizer", "my_dummy_tokenizer") - .put("index.analysis.tokenizer.my_dummy_tokenizer.type", "dummy_tokenizer") - .put("index.analysis.filter.my_dummy_token_filter.type", "dummy_token_filter") - .put("index.analysis.char_filter.my_dummy_char_filter.type", "dummy_char_filter") - .build(); - - client().admin().indices().prepareCreate("test-analysis-dummy").addMapping("type", mapping).setSettings(versionSettings).get(); - - ensureGreen(); - } - private void assertThatAnalyzersHaveBeenLoaded(Map> expectedLoadedAnalyzers) { for (Map.Entry> entry : expectedLoadedAnalyzers.entrySet()) { for (Version version : entry.getValue()) { From 0b3be42c10af7f79430b7b7c6dec38019f179add Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Fri, 26 May 2017 09:33:10 -0600 Subject: [PATCH 006/208] Prevent Index & Delete request primaryTerm getter/setter, setShardId setter --- .../action/delete/DeleteRequest.java | 31 ++++++++++++++++++ .../action/index/IndexRequest.java | 32 +++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 72d8c4e5857..776117794ba 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -221,4 +222,34 @@ public class DeleteRequest extends ReplicatedWriteRequest impleme public String toString() { return "delete {[" + index + "][" + type + "][" + id + "]}"; } + + /** + * Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't + * do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or + * use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set. + */ + @Override + public long primaryTerm() { + throw new UnsupportedOperationException("primary term should never be set on DeleteRequest"); + } + + /** + * Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't + * do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or + * use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set. + */ + @Override + public void primaryTerm(long term) { + throw new UnsupportedOperationException("primary term should never be set on DeleteRequest"); + } + + /** + * Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't + * do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or + * use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set. + */ + @Override + public DeleteRequest setShardId(ShardId shardId) { + throw new UnsupportedOperationException("shard id should never be set on DeleteRequest"); + } } diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 552780e7395..5667bf5f9d5 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.shard.ShardId; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -608,4 +609,35 @@ public class IndexRequest extends ReplicatedWriteRequest implement public long getAutoGeneratedTimestamp() { return autoGeneratedTimestamp; } + + /** + * Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't + * do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or + * use because the IndexRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set. + */ + @Override + public long primaryTerm() { + throw new UnsupportedOperationException("primary term should never be set on IndexRequest"); + } + + /** + * Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't + * do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or + * use because the IndexRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set. + */ + @Override + public void primaryTerm(long term) { + throw new UnsupportedOperationException("primary term should never be set on IndexRequest"); + } + + /** + * Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't + * do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or + * use because the IndexRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set. + */ + @Override + public IndexRequest setShardId(ShardId shardId) { + throw new UnsupportedOperationException("shard id should never be set on IndexRequest"); + } + } From 15fc71249c8b90e35e86cc43061ec3cc06ae5a2f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 11:15:07 -0400 Subject: [PATCH 007/208] Fix typo in comment in ReplicationOperation.java Within two lines of each other appears "fallthrough" and "fall through", both typed by the same person who should have been paying better attention and only one of these is correct and the inconsistency is bothersome. This commit fixes the errant one. --- .../action/support/replication/ReplicationOperation.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 09b6a4d8220..5623d9bbc11 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -189,7 +189,7 @@ public class ReplicationOperation< try { primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint()); } catch (final AlreadyClosedException e) { - // okay, the index was deleted or this shard was never activated after a relocation; fallthrough and finish normally + // okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally } catch (final Exception e) { // fail the primary but fall through and let the rest of operation processing complete final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard); From ddbc4687f6fbed13032698871ed42b21c5c93fd9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 11:39:36 -0400 Subject: [PATCH 008/208] Introduce clean transition on primary promotion This commit introduces a clean transition from the old primary term to the new primary term when a replica is promoted primary. To accomplish this, we delay all operations before incrementing the primary term. The delay is guaranteed to be in place before we increment the term, and then all operations that are delayed are executed after the delay is removed which asynchronously happens on another thread. This thread does not progress until in-flight operations that were executing are completed, and after these operations drain, the delayed operations re-acquire permits and are executed. Relates #24925 --- .../elasticsearch/index/shard/IndexShard.java | 29 +- .../shard/IndexShardOperationPermits.java | 208 +++++++++--- .../IndexShardOperationPermitsTests.java | 320 +++++++++++++++++- .../index/shard/IndexShardTests.java | 108 ++++++ 4 files changed, 601 insertions(+), 64 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5f1f94f72f7..6a790261252 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -131,6 +131,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -332,12 +333,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * notifies the shard of an increase in the primary term + * Notifies the shard of an increase in the primary term. + * + * @param newPrimaryTerm the new primary term */ - public void updatePrimaryTerm(final long newTerm) { + public void updatePrimaryTerm(final long newPrimaryTerm) { assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard"; synchronized (mutex) { - if (newTerm != primaryTerm) { + if (newPrimaryTerm != primaryTerm) { // Note that due to cluster state batching an initializing primary shard term can failed and re-assigned // in one state causing it's term to be incremented. Note that if both current shard state and new // shard state are initializing, we could replace the current shard and reinitialize it. It is however @@ -354,10 +357,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl "a started primary shard should never update its term; " + "shard " + shardRouting + ", " + "current term [" + primaryTerm + "], " - + "new term [" + newTerm + "]"; - assert newTerm > primaryTerm : - "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]"; - primaryTerm = newTerm; + + "new term [" + newPrimaryTerm + "]"; + assert newPrimaryTerm > primaryTerm : + "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]"; + /* + * Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we + * increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is + * incremented. + */ + final CountDownLatch latch = new CountDownLatch(1); + indexShardOperationPermits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + latch::await, + e -> failShard("exception during primary term transition", e)); + primaryTerm = newPrimaryTerm; + latch.countDown(); } } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index fea26168efa..83a372dd453 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -20,12 +20,14 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Assertions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; import org.elasticsearch.threadpool.ThreadPool; @@ -36,20 +38,35 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Supplier; +/** + * Tracks shard operation permits. Each operation on the shard obtains a permit. When we need to block operations (e.g., to transition + * between terms) we immediately delay all operations to a queue, obtain all available permits, and wait for outstanding operations to drain + * and return their permits. Delayed operations will acquire permits and be completed after the operation that blocked all operations has + * completed. + */ final class IndexShardOperationPermits implements Closeable { + private final ShardId shardId; private final Logger logger; private final ThreadPool threadPool; private static final int TOTAL_PERMITS = Integer.MAX_VALUE; - // fair semaphore to ensure that blockOperations() does not starve under thread contention - final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); - @Nullable private List> delayedOperations; // operations that are delayed + final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved + private final List> delayedOperations = new ArrayList<>(); // operations that are delayed private volatile boolean closed; + private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this - IndexShardOperationPermits(ShardId shardId, Logger logger, ThreadPool threadPool) { + /** + * Construct operation permits for the specified shards. + * + * @param shardId the shard + * @param logger the logger for the shard + * @param threadPool the thread pool (used to execute delayed operations) + */ + IndexShardOperationPermits(final ShardId shardId, final Logger logger, final ThreadPool threadPool) { this.shardId = shardId; this.logger = logger; this.threadPool = threadPool; @@ -61,99 +78,170 @@ final class IndexShardOperationPermits implements Closeable { } /** - * Wait for in-flight operations to finish and executes onBlocked under the guarantee that no new operations are started. Queues - * operations that are occurring in the meanwhile and runs them once onBlocked has executed. + * Wait for in-flight operations to finish and executes {@code onBlocked} under the guarantee that no new operations are started. Queues + * operations that are occurring in the meanwhile and runs them once {@code onBlocked} has executed. * - * @param timeout the maximum time to wait for the in-flight operations block - * @param timeUnit the time unit of the {@code timeout} argument + * @param timeout the maximum time to wait for the in-flight operations block + * @param timeUnit the time unit of the {@code timeout} argument * @param onBlocked the action to run once the block has been acquired - * @throws InterruptedException if calling thread is interrupted - * @throws TimeoutException if timed out waiting for in-flight operations to finish + * @param the type of checked exception thrown by {@code onBlocked} + * @throws InterruptedException if calling thread is interrupted + * @throws TimeoutException if timed out waiting for in-flight operations to finish * @throws IndexShardClosedException if operation permit has been closed */ - public void blockOperations(long timeout, TimeUnit timeUnit, CheckedRunnable onBlocked) throws - InterruptedException, TimeoutException, E { + void blockOperations( + final long timeout, + final TimeUnit timeUnit, + final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { if (closed) { throw new IndexShardClosedException(shardId); } + delayOperations(); try { - if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { - assert semaphore.availablePermits() == 0; - try { - onBlocked.run(); - } finally { - semaphore.release(TOTAL_PERMITS); - } - } else { - throw new TimeoutException("timed out during blockOperations"); - } + doBlockOperations(timeout, timeUnit, onBlocked); } finally { - final List> queuedActions; + releaseDelayedOperations(); + } + } + + /** + * Immediately delays operations and on another thread waits for in-flight operations to finish and then executes {@code onBlocked} + * under the guarantee that no new operations are started. Delayed operations are run after {@code onBlocked} has executed. After + * operations are delayed and the blocking is forked to another thread, returns to the caller. If a failure occurs while blocking + * operations or executing {@code onBlocked} then the {@code onFailure} handler will be invoked. + * + * @param timeout the maximum time to wait for the in-flight operations block + * @param timeUnit the time unit of the {@code timeout} argument + * @param onBlocked the action to run once the block has been acquired + * @param onFailure the action to run if a failure occurs while blocking operations + * @param the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread) + */ + void asyncBlockOperations( + final long timeout, final TimeUnit timeUnit, final CheckedRunnable onBlocked, final Consumer onFailure) { + delayOperations(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { + @Override + public void onFailure(final Exception e) { + onFailure.accept(e); + } + + @Override + protected void doRun() throws Exception { + doBlockOperations(timeout, timeUnit, onBlocked); + } + + @Override + public void onAfter() { + releaseDelayedOperations(); + } + }); + } + + private void delayOperations() { + synchronized (this) { + if (delayed) { + throw new IllegalStateException("operations are already delayed"); + } else { + assert delayedOperations.isEmpty(); + delayed = true; + } + } + } + + private void doBlockOperations( + final long timeout, + final TimeUnit timeUnit, + final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { + if (Assertions.ENABLED) { + // since delayed is not volatile, we have to synchronize even here for visibility synchronized (this) { - queuedActions = delayedOperations; - delayedOperations = null; + assert delayed; } - if (queuedActions != null) { - // Try acquiring permits on fresh thread (for two reasons): - // - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled. - // Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by - // ThreadedActionListener if the queue of the thread pool on which it submits is full. - // - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure - // handler is executed on the calling thread. This should not be the recovery thread as it would delay the recovery. - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - for (ActionListener queuedAction : queuedActions) { - acquire(queuedAction, null, false); - } - }); + } + if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { + assert semaphore.availablePermits() == 0; + try { + onBlocked.run(); + } finally { + semaphore.release(TOTAL_PERMITS); } + } else { + throw new TimeoutException("timeout while blocking operations"); + } + } + + private void releaseDelayedOperations() { + final List> queuedActions; + synchronized (this) { + assert delayed; + queuedActions = new ArrayList<>(delayedOperations); + delayedOperations.clear(); + delayed = false; + } + if (!queuedActions.isEmpty()) { + /* + * Try acquiring permits on fresh thread (for two reasons): + * - blockOperations can be called on a recovery thread which can be expected to be interrupted when recovery is cancelled; + * interruptions are bad here as permit acquisition will throw an interrupted exception which will be swallowed by + * the threaded action listener if the queue of the thread pool on which it submits is full + * - if a permit is acquired and the queue of the thread pool which the the threaded action listener uses is full, the + * onFailure handler is executed on the calling thread; this should not be the recovery thread as it would delay the + * recovery + */ + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + for (ActionListener queuedAction : queuedActions) { + acquire(queuedAction, null, false); + } + }); } } /** * Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided * {@link ActionListener} will be called on the calling thread. During calls of - * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided ActionListener will - * then be called using the provided executor once operations are no longer blocked. + * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener} + * will then be called using the provided executor once operations are no longer blocked. * * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed * @param executorOnDelay executor to use for delayed call * @param forceExecution whether the runnable should force its execution in case it gets rejected */ - public void acquire(ActionListener onAcquired, String executorOnDelay, boolean forceExecution) { + public void acquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution) { if (closed) { onAcquired.onFailure(new IndexShardClosedException(shardId)); return; } - Releasable releasable; + final Releasable releasable; try { synchronized (this) { - releasable = tryAcquire(); - if (releasable == null) { - // blockOperations is executing, this operation will be retried by blockOperations once it finishes - if (delayedOperations == null) { - delayedOperations = new ArrayList<>(); - } + if (delayed) { final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false); if (executorOnDelay != null) { delayedOperations.add( - new ThreadedActionListener<>(logger, threadPool, executorOnDelay, - new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution)); + new ThreadedActionListener<>(logger, threadPool, executorOnDelay, + new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution)); } else { delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired)); } return; + } else { + releasable = tryAcquire(); + assert releasable != null; } } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { onAcquired.onFailure(e); return; } + // execute this outside the synchronized block! onAcquired.onResponse(releasable); } - @Nullable private Releasable tryAcquire() throws InterruptedException { - if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting - AtomicBoolean closed = new AtomicBoolean(); + @Nullable + private Releasable tryAcquire() throws InterruptedException { + assert Thread.holdsLock(this); + if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting + final AtomicBoolean closed = new AtomicBoolean(); return () -> { if (closed.compareAndSet(false, true)) { semaphore.release(1); @@ -163,13 +251,23 @@ final class IndexShardOperationPermits implements Closeable { return null; } - public int getActiveOperationsCount() { + /** + * Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight). + * + * @return the active operation count, or zero when all permits ar eheld + */ + int getActiveOperationsCount() { int availablePermits = semaphore.availablePermits(); if (availablePermits == 0) { - // when blockOperations is holding all permits + /* + * This occurs when either doBlockOperations is holding all the permits or there are outstanding operations in flight and the + * remainder of the permits are held by doBlockOperations. We do not distinguish between these two cases and simply say that + * the active operations count is zero. + */ return 0; } else { return TOTAL_PERMITS - availablePermits; } } + } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 18a250a4282..ec22f9d862b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -25,20 +25,30 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; public class IndexShardOperationPermitsTests extends ESTestCase { @@ -143,7 +153,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { PlainActionFuture future = new PlainActionFuture<>(); - try (Releasable releasable = blockAndWait()) { + try (Releasable ignored = blockAndWait()) { permits.acquire(future, ThreadPool.Names.GENERIC, true); assertFalse(future.isDone()); } @@ -184,7 +194,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { } }; - try (Releasable releasable = blockAndWait()) { + try (Releasable ignored = blockAndWait()) { // we preserve the thread context here so that we have a different context in the call to acquire than the context present // when the releasable is closed try (ThreadContext.StoredContext ignore = context.newStoredContext(false)) { @@ -238,6 +248,202 @@ public class IndexShardOperationPermitsTests extends ESTestCase { }; } + public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedException { + final CountDownLatch blockAcquired = new CountDownLatch(1); + final CountDownLatch releaseBlock = new CountDownLatch(1); + final AtomicBoolean blocked = new AtomicBoolean(); + permits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> { + blocked.set(true); + blockAcquired.countDown(); + releaseBlock.await(); + }, + e -> { + throw new RuntimeException(e); + }); + blockAcquired.await(); + assertTrue(blocked.get()); + + // an operation that is submitted while there is a delay in place should be delayed + final CountDownLatch delayedOperation = new CountDownLatch(1); + final AtomicBoolean delayed = new AtomicBoolean(); + final Thread thread = new Thread(() -> + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + delayed.set(true); + releasable.close(); + delayedOperation.countDown(); + } + + @Override + public void onFailure(Exception e) { + + } + }, + ThreadPool.Names.GENERIC, + false)); + thread.start(); + assertFalse(delayed.get()); + releaseBlock.countDown(); + delayedOperation.await(); + assertTrue(delayed.get()); + thread.join(); + } + + public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedException, BrokenBarrierException { + final CyclicBarrier barrier = new CyclicBarrier(2); + final CountDownLatch operationExecutingLatch = new CountDownLatch(1); + final CountDownLatch firstOperationLatch = new CountDownLatch(1); + final CountDownLatch firstOperationCompleteLatch = new CountDownLatch(1); + final Thread firstOperationThread = + new Thread(controlledAcquire(barrier, operationExecutingLatch, firstOperationLatch, firstOperationCompleteLatch)); + firstOperationThread.start(); + + barrier.await(); + + operationExecutingLatch.await(); + + // now we will delay operations while the first operation is still executing (because it is latched) + final CountDownLatch blockedLatch = new CountDownLatch(1); + final AtomicBoolean onBlocked = new AtomicBoolean(); + permits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> { + onBlocked.set(true); + blockedLatch.countDown(); + }, e -> { + throw new RuntimeException(e); + }); + + assertFalse(onBlocked.get()); + + // if we submit another operation, it should be delayed + final CountDownLatch secondOperationExecuting = new CountDownLatch(1); + final CountDownLatch secondOperationComplete = new CountDownLatch(1); + final AtomicBoolean secondOperation = new AtomicBoolean(); + final Thread secondOperationThread = new Thread(() -> { + secondOperationExecuting.countDown(); + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + secondOperation.set(true); + releasable.close(); + secondOperationComplete.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.GENERIC, + false); + }); + secondOperationThread.start(); + + secondOperationExecuting.await(); + assertFalse(secondOperation.get()); + + firstOperationLatch.countDown(); + firstOperationCompleteLatch.await(); + blockedLatch.await(); + assertTrue(onBlocked.get()); + + secondOperationComplete.await(); + assertTrue(secondOperation.get()); + + firstOperationThread.join(); + secondOperationThread.join(); + } + + public void testAsyncBlockOperationsRace() throws Exception { + // we racily submit operations and a delay, and then ensure that all operations were actually completed + final int operations = scaledRandomIntBetween(1, 64); + final CyclicBarrier barrier = new CyclicBarrier(1 + 1 + operations); + final CountDownLatch operationLatch = new CountDownLatch(1 + operations); + final Set values = Collections.newSetFromMap(new ConcurrentHashMap<>()); + final List threads = new ArrayList<>(); + for (int i = 0; i < operations; i++) { + final int value = i; + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + values.add(value); + releasable.close(); + operationLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + + } + }, + ThreadPool.Names.GENERIC, + false); + }); + thread.start(); + threads.add(thread); + } + + final Thread blockingThread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + permits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> { + values.add(operations); + operationLatch.countDown(); + }, e -> { + throw new RuntimeException(e); + }); + }); + blockingThread.start(); + + barrier.await(); + + operationLatch.await(); + for (final Thread thread : threads) { + thread.join(); + } + blockingThread.join(); + + // check that all operations completed + for (int i = 0; i < operations; i++) { + assertTrue(values.contains(i)); + } + assertTrue(values.contains(operations)); + /* + * The block operation is executed on another thread and the operations can have completed before this thread has returned all the + * permits to the semaphore. We wait here until all generic threads are idle as an indication that all permits have been returned to + * the semaphore. + */ + awaitBusy(() -> { + for (final ThreadPoolStats.Stats stats : threadPool.stats()) { + if (ThreadPool.Names.GENERIC.equals(stats.getName())) { + return stats.getActive() == 0; + } + } + return false; + }); + } + public void testActiveOperationsCount() throws ExecutionException, InterruptedException { PlainActionFuture future1 = new PlainActionFuture<>(); permits.acquire(future1, ThreadPool.Names.GENERIC, true); @@ -267,4 +473,114 @@ public class IndexShardOperationPermitsTests extends ESTestCase { future3.get().close(); assertThat(permits.getActiveOperationsCount(), equalTo(0)); } + + public void testAsyncBlockOperationsOnFailure() throws InterruptedException { + final AtomicReference reference = new AtomicReference<>(); + final CountDownLatch onFailureLatch = new CountDownLatch(1); + permits.asyncBlockOperations( + 10, + TimeUnit.MINUTES, + () -> { + throw new RuntimeException("simulated"); + }, + e -> { + reference.set(e); + onFailureLatch.countDown(); + }); + onFailureLatch.await(); + assertThat(reference.get(), instanceOf(RuntimeException.class)); + assertThat(reference.get(), hasToString(containsString("simulated"))); + } + + public void testTimeout() throws BrokenBarrierException, InterruptedException { + final CyclicBarrier barrier = new CyclicBarrier(2); + final CountDownLatch operationExecutingLatch = new CountDownLatch(1); + final CountDownLatch operationLatch = new CountDownLatch(1); + final CountDownLatch operationCompleteLatch = new CountDownLatch(1); + + final Thread thread = new Thread(controlledAcquire(barrier, operationExecutingLatch, operationLatch, operationCompleteLatch)); + thread.start(); + + barrier.await(); + + operationExecutingLatch.await(); + + { + final TimeoutException e = + expectThrows(TimeoutException.class, () -> permits.blockOperations(1, TimeUnit.MILLISECONDS, () -> {})); + assertThat(e, hasToString(containsString("timeout while blocking operations"))); + } + + { + final AtomicReference reference = new AtomicReference<>(); + final CountDownLatch onFailureLatch = new CountDownLatch(1); + permits.asyncBlockOperations( + 1, + TimeUnit.MILLISECONDS, + () -> {}, + e -> { + reference.set(e); + onFailureLatch.countDown(); + }); + onFailureLatch.await(); + assertThat(reference.get(), hasToString(containsString("timeout while blocking operations"))); + } + + operationLatch.countDown(); + + operationCompleteLatch.await(); + + thread.join(); + } + + /** + * Returns an operation that acquires a permit and synchronizes in the following manner: + *
    + *
  • waits on the {@code barrier} before acquiring a permit
  • + *
  • counts down the {@code operationExecutingLatch} when it acquires the permit
  • + *
  • waits on the {@code operationLatch} before releasing the permit
  • + *
  • counts down the {@code operationCompleteLatch} after releasing the permit
  • + *
+ * + * @param barrier the barrier to wait on + * @param operationExecutingLatch the latch to countdown after acquiring the permit + * @param operationLatch the latch to wait on before releasing the permit + * @param operationCompleteLatch the latch to countdown after releasing the permit + * @return a controllable runnable that acquires a permit + */ + private Runnable controlledAcquire( + final CyclicBarrier barrier, + final CountDownLatch operationExecutingLatch, + final CountDownLatch operationLatch, + final CountDownLatch operationCompleteLatch) { + return () -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + operationExecutingLatch.countDown(); + try { + operationLatch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + releasable.close(); + operationCompleteLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.GENERIC, + false); + }; + } + } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 6dce3dab3a9..e7aa3c61b4e 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -280,6 +280,114 @@ public class IndexShardTests extends IndexShardTestCase { } } + public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException { + final IndexShard indexShard = newStartedShard(false); + + final int operations = scaledRandomIntBetween(1, 64); + final CyclicBarrier barrier = new CyclicBarrier(1 + operations); + final CountDownLatch latch = new CountDownLatch(operations); + final CountDownLatch operationLatch = new CountDownLatch(1); + final List threads = new ArrayList<>(); + for (int i = 0; i < operations; i++) { + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + indexShard.acquireReplicaOperationPermit( + indexShard.getPrimaryTerm(), + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + latch.countDown(); + try { + operationLatch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + releasable.close(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.INDEX); + }); + thread.start(); + threads.add(thread); + } + + barrier.await(); + latch.await(); + + // promote the replica + final ShardRouting replicaRouting = indexShard.routingEntry(); + final ShardRouting primaryRouting = + TestShardRouting.newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + null, + true, + ShardRoutingState.STARTED, + replicaRouting.allocationId()); + indexShard.updateRoutingEntry(primaryRouting); + indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); + + final int delayedOperations = scaledRandomIntBetween(1, 64); + final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations); + final CountDownLatch delayedOperationsLatch = new CountDownLatch(delayedOperations); + final AtomicLong counter = new AtomicLong(); + final List delayedThreads = new ArrayList<>(); + for (int i = 0; i < delayedOperations; i++) { + final Thread thread = new Thread(() -> { + try { + delayedOperationsBarrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + indexShard.acquirePrimaryOperationPermit( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + counter.incrementAndGet(); + releasable.close(); + delayedOperationsLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.INDEX); + }); + thread.start(); + delayedThreads.add(thread); + } + + delayedOperationsBarrier.await(); + + assertThat(counter.get(), equalTo(0L)); + + operationLatch.countDown(); + for (final Thread thread : threads) { + thread.join(); + } + + delayedOperationsLatch.await(); + + assertThat(counter.get(), equalTo((long) delayedOperations)); + + for (final Thread thread : delayedThreads) { + thread.join(); + } + + closeShards(indexShard); + } + public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; From 5bcae914d9cb875cb83afed871f68009cddb75a2 Mon Sep 17 00:00:00 2001 From: Jack Conradson Date: Tue, 30 May 2017 09:03:46 -0700 Subject: [PATCH 009/208] Make PainlessScript an interface (#24966) Allows more flexibility for the specified script context interface if we want to allow script contexts to specify an abstract class instead. --- .../org/elasticsearch/painless/Compiler.java | 13 +++-- .../painless/PainlessScript.java | 58 +++++++++---------- .../painless/WriterConstants.java | 24 ++++++-- .../elasticsearch/painless/node/SSource.java | 47 ++++++++++++--- 4 files changed, 94 insertions(+), 48 deletions(-) diff --git a/modules/lang-painless/src/main/java/org/elasticsearch/painless/Compiler.java b/modules/lang-painless/src/main/java/org/elasticsearch/painless/Compiler.java index 0d92f109eea..0dc4bd88244 100644 --- a/modules/lang-painless/src/main/java/org/elasticsearch/painless/Compiler.java +++ b/modules/lang-painless/src/main/java/org/elasticsearch/painless/Compiler.java @@ -29,7 +29,6 @@ import java.net.URL; import java.security.CodeSource; import java.security.SecureClassLoader; import java.security.cert.Certificate; -import java.util.BitSet; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.painless.WriterConstants.CLASS_NAME; @@ -68,7 +67,7 @@ final class Compiler { */ static final class Loader extends SecureClassLoader { private final AtomicInteger lambdaCounter = new AtomicInteger(0); - + /** * @param parent The parent ClassLoader. */ @@ -95,7 +94,7 @@ final class Compiler { Class defineLambda(String name, byte[] bytes) { return defineClass(name, bytes, 0, bytes.length, CODESOURCE); } - + /** * A counter used to generate a unique name for each lambda * function/reference class in this classloader. @@ -132,11 +131,13 @@ final class Compiler { try { Class clazz = loader.defineScript(CLASS_NAME, root.getBytes()); + clazz.getField("$NAME").set(null, name); + clazz.getField("$SOURCE").set(null, source); + clazz.getField("$STATEMENTS").set(null, root.getStatements()); clazz.getField("$DEFINITION").set(null, definition); - java.lang.reflect.Constructor constructor = - clazz.getConstructor(String.class, String.class, BitSet.class); + java.lang.reflect.Constructor constructor = clazz.getConstructor(); - return iface.cast(constructor.newInstance(name, source, root.getStatements())); + return iface.cast(constructor.newInstance()); } catch (Exception exception) { // Catch everything to let the user know this is something caused internally. throw new IllegalStateException("An internal error occurred attempting to define the script [" + name + "].", exception); } diff --git a/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java b/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java index 643837d8db0..9aab5c438b0 100644 --- a/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java +++ b/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java @@ -29,25 +29,25 @@ import java.util.Map; /** * Abstract superclass on top of which all Painless scripts are built. */ -public abstract class PainlessScript { - /** - * Name of the script set at compile time. - */ - private final String name; - /** - * Source of the script. - */ - private final String source; - /** - * Character number of the start of each statement. - */ - private final BitSet statements; +public interface PainlessScript { - protected PainlessScript(String name, String source, BitSet statements) { - this.name = name; - this.source = source; - this.statements = statements; - } + /** + * @return The name of the script retrieved from a static variable generated + * during compilation of a Painless script. + */ + String getName(); + + /** + * @return The source for a script retrieved from a static variable generated + * during compilation of a Painless script. + */ + String getSource(); + + /** + * @return The {@link BitSet} tracking the boundaries for statements necessary + * for good exception messages. + */ + BitSet getStatements(); /** * Adds stack trace and other useful information to exceptions thrown @@ -55,7 +55,7 @@ public abstract class PainlessScript { * @param t The throwable to build an exception around. * @return The generated ScriptException. */ - protected final ScriptException convertToScriptException(Throwable t, Map> extraMetadata) { + default ScriptException convertToScriptException(Throwable t, Map> extraMetadata) { // create a script stack: this is just the script portion List scriptStack = new ArrayList<>(); for (StackTraceElement element : t.getStackTrace()) { @@ -73,10 +73,10 @@ public abstract class PainlessScript { } int endOffset = getNextStatement(startOffset); if (endOffset == -1) { - endOffset = source.length(); + endOffset = getSource().length(); } // TODO: if this is still too long, truncate and use ellipses - String snippet = source.substring(startOffset, endOffset); + String snippet = getSource().substring(startOffset, endOffset); scriptStack.add(snippet); StringBuilder pointer = new StringBuilder(); for (int i = startOffset; i < offset; i++) { @@ -93,10 +93,10 @@ public abstract class PainlessScript { } // build a name for the script: final String name; - if (PainlessScriptEngine.INLINE_NAME.equals(this.name)) { - name = source; + if (PainlessScriptEngine.INLINE_NAME.equals(getName())) { + name = getSource(); } else { - name = this.name; + name = getName(); } ScriptException scriptException = new ScriptException("runtime error", t, scriptStack, name, PainlessScriptEngine.NAME); for (Map.Entry> entry : extraMetadata.entrySet()) { @@ -106,7 +106,7 @@ public abstract class PainlessScript { } /** returns true for methods that are part of the runtime */ - private static boolean shouldFilter(StackTraceElement element) { + default boolean shouldFilter(StackTraceElement element) { return element.getClassName().startsWith("org.elasticsearch.painless.") || element.getClassName().startsWith("java.lang.invoke.") || element.getClassName().startsWith("sun.invoke."); @@ -115,14 +115,14 @@ public abstract class PainlessScript { /** * Finds the start of the first statement boundary that is on or before {@code offset}. If one is not found, {@code -1} is returned. */ - private int getPreviousStatement(int offset) { - return statements.previousSetBit(offset); + default int getPreviousStatement(int offset) { + return getStatements().previousSetBit(offset); } /** * Finds the start of the first statement boundary that is after {@code offset}. If one is not found, {@code -1} is returned. */ - private int getNextStatement(int offset) { - return statements.nextSetBit(offset + 1); + default int getNextStatement(int offset) { + return getStatements().nextSetBit(offset + 1); } } diff --git a/modules/lang-painless/src/main/java/org/elasticsearch/painless/WriterConstants.java b/modules/lang-painless/src/main/java/org/elasticsearch/painless/WriterConstants.java index af9f84424b4..e2e235dfa4d 100644 --- a/modules/lang-painless/src/main/java/org/elasticsearch/painless/WriterConstants.java +++ b/modules/lang-painless/src/main/java/org/elasticsearch/painless/WriterConstants.java @@ -46,19 +46,28 @@ public final class WriterConstants { public static final int CLASS_VERSION = Opcodes.V1_8; public static final int ASM_VERSION = Opcodes.ASM5; - public static final String BASE_CLASS_NAME = PainlessScript.class.getName(); - public static final Type BASE_CLASS_TYPE = Type.getType(PainlessScript.class); + public static final String BASE_INTERFACE_NAME = PainlessScript.class.getName(); + public static final Type BASE_INTERFACE_TYPE = Type.getType(PainlessScript.class); public static final Method CONVERT_TO_SCRIPT_EXCEPTION_METHOD = getAsmMethod(ScriptException.class, "convertToScriptException", Throwable.class, Map.class); - public static final String CLASS_NAME = BASE_CLASS_NAME + "$Script"; - public static final Type CLASS_TYPE = Type.getObjectType(CLASS_NAME.replace('.', '/')); - + public static final String CLASS_NAME = BASE_INTERFACE_NAME + "$Script"; + public static final Type CLASS_TYPE = Type.getObjectType(CLASS_NAME.replace('.', '/')); + public static final String CTOR_METHOD_NAME = ""; - public static final Method CONSTRUCTOR = getAsmMethod(void.class, CTOR_METHOD_NAME, String.class, String.class, BitSet.class); + public static final Method CONSTRUCTOR = getAsmMethod(void.class, CTOR_METHOD_NAME); public static final Method CLINIT = getAsmMethod(void.class, ""); + public static final String GET_NAME_NAME = "getName"; + public static final Method GET_NAME_METHOD = getAsmMethod(String.class, GET_NAME_NAME); + + public static final String GET_SOURCE_NAME = "getSource"; + public static final Method GET_SOURCE_METHOD = getAsmMethod(String.class, GET_SOURCE_NAME); + + public static final String GET_STATEMENTS_NAME = "getStatements"; + public static final Method GET_STATEMENTS_METHOD = getAsmMethod(BitSet.class, GET_STATEMENTS_NAME); + // All of these types are caught by the main method and rethrown as ScriptException public static final Type PAINLESS_ERROR_TYPE = Type.getType(PainlessError.class); public static final Type BOOTSTRAP_METHOD_ERROR_TYPE = Type.getType(BootstrapMethodError.class); @@ -68,6 +77,9 @@ public final class WriterConstants { public static final Type PAINLESS_EXPLAIN_ERROR_TYPE = Type.getType(PainlessExplainError.class); public static final Method PAINLESS_EXPLAIN_ERROR_GET_HEADERS_METHOD = getAsmMethod(Map.class, "getHeaders", Definition.class); + public static final Type OBJECT_TYPE = Type.getType(Object.class); + public static final Type BITSET_TYPE = Type.getType(BitSet.class); + public static final Type DEFINITION_TYPE = Type.getType(Definition.class); public static final Type COLLECTIONS_TYPE = Type.getType(Collections.class); diff --git a/modules/lang-painless/src/main/java/org/elasticsearch/painless/node/SSource.java b/modules/lang-painless/src/main/java/org/elasticsearch/painless/node/SSource.java index eb323e3293f..1ab39ea20e9 100644 --- a/modules/lang-painless/src/main/java/org/elasticsearch/painless/node/SSource.java +++ b/modules/lang-painless/src/main/java/org/elasticsearch/painless/node/SSource.java @@ -53,7 +53,8 @@ import java.util.Set; import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableSet; -import static org.elasticsearch.painless.WriterConstants.BASE_CLASS_TYPE; +import static org.elasticsearch.painless.WriterConstants.BASE_INTERFACE_TYPE; +import static org.elasticsearch.painless.WriterConstants.BITSET_TYPE; import static org.elasticsearch.painless.WriterConstants.BOOTSTRAP_METHOD_ERROR_TYPE; import static org.elasticsearch.painless.WriterConstants.CLASS_TYPE; import static org.elasticsearch.painless.WriterConstants.COLLECTIONS_TYPE; @@ -65,11 +66,16 @@ import static org.elasticsearch.painless.WriterConstants.DEF_BOOTSTRAP_DELEGATE_ import static org.elasticsearch.painless.WriterConstants.DEF_BOOTSTRAP_METHOD; import static org.elasticsearch.painless.WriterConstants.EMPTY_MAP_METHOD; import static org.elasticsearch.painless.WriterConstants.EXCEPTION_TYPE; +import static org.elasticsearch.painless.WriterConstants.GET_NAME_METHOD; +import static org.elasticsearch.painless.WriterConstants.GET_SOURCE_METHOD; +import static org.elasticsearch.painless.WriterConstants.GET_STATEMENTS_METHOD; +import static org.elasticsearch.painless.WriterConstants.OBJECT_TYPE; import static org.elasticsearch.painless.WriterConstants.OUT_OF_MEMORY_ERROR_TYPE; import static org.elasticsearch.painless.WriterConstants.PAINLESS_ERROR_TYPE; import static org.elasticsearch.painless.WriterConstants.PAINLESS_EXPLAIN_ERROR_GET_HEADERS_METHOD; import static org.elasticsearch.painless.WriterConstants.PAINLESS_EXPLAIN_ERROR_TYPE; import static org.elasticsearch.painless.WriterConstants.STACK_OVERFLOW_ERROR_TYPE; +import static org.elasticsearch.painless.WriterConstants.STRING_TYPE; /** * The root of all Painless trees. Contains a series of statements. @@ -203,9 +209,9 @@ public final class SSource extends AStatement { int classFrames = ClassWriter.COMPUTE_FRAMES | ClassWriter.COMPUTE_MAXS; int classAccess = Opcodes.ACC_PUBLIC | Opcodes.ACC_SUPER | Opcodes.ACC_FINAL; - String classBase = BASE_CLASS_TYPE.getInternalName(); + String interfaceBase = BASE_INTERFACE_TYPE.getInternalName(); String className = CLASS_TYPE.getInternalName(); - String classInterfaces[] = new String[] { Type.getType(scriptInterface.getInterface()).getInternalName() }; + String classInterfaces[] = new String[] { interfaceBase, Type.getType(scriptInterface.getInterface()).getInternalName() }; ClassWriter writer = new ClassWriter(classFrames); ClassVisitor visitor = writer; @@ -218,7 +224,7 @@ public final class SSource extends AStatement { if (debugStream != null) { visitor = new TraceClassVisitor(visitor, debugStream, null); } - visitor.visit(WriterConstants.CLASS_VERSION, classAccess, className, null, classBase, classInterfaces); + visitor.visit(WriterConstants.CLASS_VERSION, classAccess, className, null, OBJECT_TYPE.getInternalName(), classInterfaces); visitor.visitSource(Location.computeSourceName(name, source), null); // Write the a method to bootstrap def calls @@ -231,6 +237,11 @@ public final class SSource extends AStatement { bootstrapDef.returnValue(); bootstrapDef.endMethod(); + // Write static variables for name, source and statements used for writing exception messages + visitor.visitField(Opcodes.ACC_PUBLIC | Opcodes.ACC_STATIC, "$NAME", STRING_TYPE.getDescriptor(), null, null).visitEnd(); + visitor.visitField(Opcodes.ACC_PUBLIC | Opcodes.ACC_STATIC, "$SOURCE", STRING_TYPE.getDescriptor(), null, null).visitEnd(); + visitor.visitField(Opcodes.ACC_PUBLIC | Opcodes.ACC_STATIC, "$STATEMENTS", BITSET_TYPE.getDescriptor(), null, null).visitEnd(); + // Write the static variable used by the method to bootstrap def calls visitor.visitField(Opcodes.ACC_PUBLIC | Opcodes.ACC_STATIC, "$DEFINITION", DEFINITION_TYPE.getDescriptor(), null, null).visitEnd(); @@ -239,10 +250,32 @@ public final class SSource extends AStatement { constructor.visitCode(); constructor.loadThis(); constructor.loadArgs(); - constructor.invokeConstructor(BASE_CLASS_TYPE, CONSTRUCTOR); + constructor.invokeConstructor(OBJECT_TYPE, CONSTRUCTOR); constructor.returnValue(); constructor.endMethod(); + // Write a method to get static variable source + MethodWriter nameMethod = new MethodWriter(Opcodes.ACC_PUBLIC, GET_NAME_METHOD, visitor, globals.getStatements(), settings); + nameMethod.visitCode(); + nameMethod.getStatic(CLASS_TYPE, "$NAME", STRING_TYPE); + nameMethod.returnValue(); + nameMethod.endMethod(); + + // Write a method to get static variable source + MethodWriter sourceMethod = new MethodWriter(Opcodes.ACC_PUBLIC, GET_SOURCE_METHOD, visitor, globals.getStatements(), settings); + sourceMethod.visitCode(); + sourceMethod.getStatic(CLASS_TYPE, "$SOURCE", STRING_TYPE); + sourceMethod.returnValue(); + sourceMethod.endMethod(); + + // Write a method to get static variable statements + MethodWriter statementsMethod = + new MethodWriter(Opcodes.ACC_PUBLIC, GET_STATEMENTS_METHOD, visitor, globals.getStatements(), settings); + statementsMethod.visitCode(); + statementsMethod.getStatic(CLASS_TYPE, "$STATEMENTS", BITSET_TYPE); + statementsMethod.returnValue(); + statementsMethod.endMethod(); + // Write the method defined in the interface: MethodWriter executeMethod = new MethodWriter(Opcodes.ACC_PUBLIC, scriptInterface.getExecuteMethod(), visitor, globals.getStatements(), settings); @@ -357,7 +390,7 @@ public final class SSource extends AStatement { writer.dup(); writer.getStatic(CLASS_TYPE, "$DEFINITION", DEFINITION_TYPE); writer.invokeVirtual(PAINLESS_EXPLAIN_ERROR_TYPE, PAINLESS_EXPLAIN_ERROR_GET_HEADERS_METHOD); - writer.invokeVirtual(BASE_CLASS_TYPE, CONVERT_TO_SCRIPT_EXCEPTION_METHOD); + writer.invokeInterface(BASE_INTERFACE_TYPE, CONVERT_TO_SCRIPT_EXCEPTION_METHOD); writer.throwException(); // This looks like: // } catch (PainlessError | BootstrapMethodError | OutOfMemoryError | StackOverflowError | Exception e) { @@ -373,7 +406,7 @@ public final class SSource extends AStatement { writer.loadThis(); writer.swap(); writer.invokeStatic(COLLECTIONS_TYPE, EMPTY_MAP_METHOD); - writer.invokeVirtual(BASE_CLASS_TYPE, CONVERT_TO_SCRIPT_EXCEPTION_METHOD); + writer.invokeInterface(BASE_INTERFACE_TYPE, CONVERT_TO_SCRIPT_EXCEPTION_METHOD); writer.throwException(); writer.mark(endCatch); } From 2a6e6866bd6a25f469c07ba0766edec6272e93ac Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 30 May 2017 09:42:26 -0700 Subject: [PATCH 010/208] Fix floating-point error when DateProcessor parses UNIX (#24947) DateProcessor's DateFormat UNIX format parser resulted in a floating point rounding error when parsing certain stringed epoch times. Now Double.parseDouble is used, preserving the intented input. --- .../main/java/org/elasticsearch/ingest/common/DateFormat.java | 2 +- .../java/org/elasticsearch/ingest/common/DateFormatTests.java | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateFormat.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateFormat.java index 9ead2d05f7d..bf664afb407 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateFormat.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateFormat.java @@ -38,7 +38,7 @@ enum DateFormat { Unix { @Override Function getFunction(String format, DateTimeZone timezone, Locale locale) { - return (date) -> new DateTime((long)(Float.parseFloat(date) * 1000), timezone); + return (date) -> new DateTime((long)(Double.parseDouble(date) * 1000), timezone); } }, UnixMs { diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateFormatTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateFormatTests.java index 886630dbbd9..415ee872093 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateFormatTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateFormatTests.java @@ -50,6 +50,10 @@ public class DateFormatTests extends ESTestCase { assertThat(DateFormat.Unix.getFunction(null, DateTimeZone.UTC, null).apply("1000.5").getMillis(), equalTo(1000500L)); } + public void testParseUnixWithMsPrecision() { + assertThat(DateFormat.Unix.getFunction(null, DateTimeZone.UTC, null).apply("1495718015").getMillis(), equalTo(1495718015000L)); + } + public void testParseISO8601() { assertThat(DateFormat.Iso8601.getFunction(null, DateTimeZone.UTC, null).apply("2001-01-01T00:00:00-0800").getMillis(), equalTo(978336000000L)); From ce7195d81abafd0a880ebbc2d1514ab608e56b79 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Tue, 30 May 2017 19:13:07 +0200 Subject: [PATCH 011/208] Terms aggregation should remap global ordinal buckets when a sub-aggregator is used to sort the terms (#24941) `terms` aggregations at the root level use the `global_ordinals` execution hint by default. When all sub-aggregators can be run in `breadth_first` mode the collected buckets for these sub-aggs are dense (remapped after the initial pruning). But if a sub-aggregator is not deferrable and needs to collect all buckets before pruning we don't remap global ords and the aggregator needs to deal with sparse buckets. Most (if not all) aggregators expect dense buckets and uses this information to allocate memories. This change forces the remap of the global ordinals but only when there is at least one sub-aggregator that cannot be deferred. Relates #24788 --- ...balOrdinalsSignificantTermsAggregator.java | 89 ++---- .../SignificantTermsAggregatorFactory.java | 97 +++++-- .../GlobalOrdinalsStringTermsAggregator.java | 260 ++++++++---------- .../bucket/terms/TermsAggregatorFactory.java | 127 ++++++--- .../bucket/terms/support/IncludeExclude.java | 12 +- .../terms/TermsAggregatorFactoryTests.java | 3 +- .../bucket/terms/TermsAggregatorTests.java | 48 ++++ .../aggregation/AggregationProfilerIT.java | 10 +- 8 files changed, 357 insertions(+), 289 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java index 98effdcfd54..50f114b35b5 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java @@ -53,22 +53,28 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri protected final SignificantTermsAggregatorFactory termsAggFactory; private final SignificanceHeuristic significanceHeuristic; - public GlobalOrdinalsSignificantTermsAggregator(String name, AggregatorFactories factories, - ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, DocValueFormat format, - BucketCountThresholds bucketCountThresholds, IncludeExclude.OrdinalsFilter includeExclude, - SearchContext context, Aggregator parent, - SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory termsAggFactory, - List pipelineAggregators, Map metaData) throws IOException { - + public GlobalOrdinalsSignificantTermsAggregator(String name, + AggregatorFactories factories, + ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, + DocValueFormat format, + BucketCountThresholds bucketCountThresholds, + IncludeExclude.OrdinalsFilter includeExclude, + SearchContext context, + Aggregator parent, + boolean forceRemapGlobalOrds, + SignificanceHeuristic significanceHeuristic, + SignificantTermsAggregatorFactory termsAggFactory, + List pipelineAggregators, + Map metaData) throws IOException { super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, context, parent, - SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData); + forceRemapGlobalOrds, SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData); this.significanceHeuristic = significanceHeuristic; this.termsAggFactory = termsAggFactory; + this.numCollectedDocs = 0; } @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) { @Override public void collect(int doc, long bucket) throws IOException { @@ -78,18 +84,17 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri }; } - @Override public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws IOException { assert owningBucketOrdinal == 0; - if (globalOrds == null) { // no context in this reader + if (valueCount == 0) { // no context in this reader return buildEmptyAggregation(); } final int size; if (bucketCountThresholds.getMinDocCount() == 0) { // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns - size = (int) Math.min(globalOrds.getValueCount(), bucketCountThresholds.getShardSize()); + size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); } else { size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); } @@ -98,7 +103,7 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); SignificantStringTerms.Bucket spare = null; - for (long globalTermOrd = 0; globalTermOrd < globalOrds.getValueCount(); ++globalTermOrd) { + for (long globalTermOrd = 0; globalTermOrd < valueCount; ++globalTermOrd) { if (includeExclude != null && !acceptedGlobalOrdinals.get(globalTermOrd)) { continue; } @@ -115,7 +120,7 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format); } spare.bucketOrd = bucketOrd; - copy(globalOrds.lookupOrd(globalTermOrd), spare.termBytes); + copy(lookupGlobalOrd.apply(globalTermOrd), spare.termBytes); spare.subsetDf = bucketDocCount; spare.subsetSize = subsetSize; spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes); @@ -148,63 +153,13 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri IndexReader topReader = searcher.getIndexReader(); int supersetSize = topReader.numDocs(); return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - pipelineAggregators(), metaData(), format, 0, supersetSize, significanceHeuristic, emptyList()); + pipelineAggregators(), metaData(), format, numCollectedDocs, supersetSize, significanceHeuristic, emptyList()); } @Override protected void doClose() { + super.doClose(); Releasables.close(termsAggFactory); } - - public static class WithHash extends GlobalOrdinalsSignificantTermsAggregator { - - private final LongHash bucketOrds; - - public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, - DocValueFormat format, BucketCountThresholds bucketCountThresholds, IncludeExclude.OrdinalsFilter includeExclude, - SearchContext context, Aggregator parent, SignificanceHeuristic significanceHeuristic, - SignificantTermsAggregatorFactory termsAggFactory, List pipelineAggregators, - Map metaData) throws IOException { - super(name, factories, valuesSource, format, bucketCountThresholds, includeExclude, context, parent, significanceHeuristic, - termsAggFactory, pipelineAggregators, metaData); - bucketOrds = new LongHash(1, context.bigArrays()); - } - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) { - @Override - public void collect(int doc, long bucket) throws IOException { - assert bucket == 0; - numCollectedDocs++; - if (globalOrds.advanceExact(doc)) { - for (long globalOrd = globalOrds.nextOrd(); - globalOrd != SortedSetDocValues.NO_MORE_ORDS; - globalOrd = globalOrds.nextOrd()) { - long bucketOrd = bucketOrds.add(globalOrd); - if (bucketOrd < 0) { - bucketOrd = -1 - bucketOrd; - collectExistingBucket(sub, doc, bucketOrd); - } else { - collectBucket(sub, doc, bucketOrd); - } - } - } - } - }; - } - - @Override - protected long getBucketOrd(long termOrd) { - return bucketOrds.find(termOrd); - } - - @Override - protected void doClose() { - Releasables.close(termsAggFactory, bucketOrds); - } - } - } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java index c27cabf78b2..ba5191b1229 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java @@ -70,10 +70,17 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac private final TermsAggregator.BucketCountThresholds bucketCountThresholds; private final SignificanceHeuristic significanceHeuristic; - public SignificantTermsAggregatorFactory(String name, ValuesSourceConfig config, IncludeExclude includeExclude, - String executionHint, QueryBuilder filterBuilder, TermsAggregator.BucketCountThresholds bucketCountThresholds, - SignificanceHeuristic significanceHeuristic, SearchContext context, AggregatorFactory parent, - AggregatorFactories.Builder subFactoriesBuilder, Map metaData) throws IOException { + public SignificantTermsAggregatorFactory(String name, + ValuesSourceConfig config, + IncludeExclude includeExclude, + String executionHint, + QueryBuilder filterBuilder, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + SignificanceHeuristic significanceHeuristic, + SearchContext context, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metaData) throws IOException { super(name, config, context, parent, subFactoriesBuilder, metaData); this.includeExclude = includeExclude; this.executionHint = executionHint; @@ -246,44 +253,71 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac MAP(new ParseField("map")) { @Override - Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format, - TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude, - SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic, - SignificantTermsAggregatorFactory termsAggregatorFactory, List pipelineAggregators, - Map metaData) throws IOException { + Aggregator create(String name, + AggregatorFactories factories, + ValuesSource valuesSource, + DocValueFormat format, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + IncludeExclude includeExclude, + SearchContext aggregationContext, + Aggregator parent, + SignificanceHeuristic significanceHeuristic, + SignificantTermsAggregatorFactory termsAggregatorFactory, + List pipelineAggregators, + Map metaData) throws IOException { + final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format); return new SignificantStringTermsAggregator(name, factories, valuesSource, format, bucketCountThresholds, filter, aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData); + } }, GLOBAL_ORDINALS(new ParseField("global_ordinals")) { @Override - Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format, - TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude, - SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic, - SignificantTermsAggregatorFactory termsAggregatorFactory, List pipelineAggregators, - Map metaData) throws IOException { + Aggregator create(String name, + AggregatorFactories factories, + ValuesSource valuesSource, + DocValueFormat format, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + IncludeExclude includeExclude, + SearchContext aggregationContext, + Aggregator parent, + SignificanceHeuristic significanceHeuristic, + SignificantTermsAggregatorFactory termsAggregatorFactory, + List pipelineAggregators, + Map metaData) throws IOException { + final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format); return new GlobalOrdinalsSignificantTermsAggregator(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter, - aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData); + aggregationContext, parent, false, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData); + } }, GLOBAL_ORDINALS_HASH(new ParseField("global_ordinals_hash")) { @Override - Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format, - TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude, - SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic, - SignificantTermsAggregatorFactory termsAggregatorFactory, List pipelineAggregators, - Map metaData) throws IOException { + Aggregator create(String name, + AggregatorFactories factories, + ValuesSource valuesSource, + DocValueFormat format, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + IncludeExclude includeExclude, + SearchContext aggregationContext, + Aggregator parent, + SignificanceHeuristic significanceHeuristic, + SignificantTermsAggregatorFactory termsAggregatorFactory, + List pipelineAggregators, + Map metaData) throws IOException { + final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format); - return new GlobalOrdinalsSignificantTermsAggregator.WithHash(name, factories, - (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter, - aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData); + return new GlobalOrdinalsSignificantTermsAggregator(name, factories, + (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter, aggregationContext, parent, + true, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData); + } }; @@ -302,11 +336,18 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac this.parseField = parseField; } - abstract Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format, - TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude, - SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic, - SignificantTermsAggregatorFactory termsAggregatorFactory, List pipelineAggregators, - Map metaData) throws IOException; + abstract Aggregator create(String name, + AggregatorFactories factories, + ValuesSource valuesSource, + DocValueFormat format, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + IncludeExclude includeExclude, + SearchContext aggregationContext, + Aggregator parent, + SignificanceHeuristic significanceHeuristic, + SignificantTermsAggregatorFactory termsAggregatorFactory, + List pipelineAggregators, + Map metaData) throws IOException; @Override public String toString() { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 33bbc370c6e..d9c60da7acc 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations.bucket.terms; import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedSetDocValues; @@ -52,6 +53,8 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS; + /** * An aggregator of string values that relies on global ordinals in order to build buckets. */ @@ -66,67 +69,104 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr // first defined one. // So currently for each instance of this aggregator the acceptedglobalValues will be computed, this is unnecessary // especially if this agg is on a second layer or deeper. - protected LongBitSet acceptedGlobalOrdinals; + protected final LongBitSet acceptedGlobalOrdinals; + protected final long valueCount; + protected final GlobalOrdLookupFunction lookupGlobalOrd; - protected SortedSetDocValues globalOrds; + private final LongHash bucketOrds; - public GlobalOrdinalsStringTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals valuesSource, - BucketOrder order, DocValueFormat format, BucketCountThresholds bucketCountThresholds, - IncludeExclude.OrdinalsFilter includeExclude, SearchContext context, Aggregator parent, - SubAggCollectionMode collectionMode, boolean showTermDocCountError, List pipelineAggregators, - Map metaData) throws IOException { + public interface GlobalOrdLookupFunction { + BytesRef apply(long ord) throws IOException; + } + + public GlobalOrdinalsStringTermsAggregator(String name, AggregatorFactories factories, + ValuesSource.Bytes.WithOrdinals valuesSource, + BucketOrder order, + DocValueFormat format, + BucketCountThresholds bucketCountThresholds, + IncludeExclude.OrdinalsFilter includeExclude, + SearchContext context, + Aggregator parent, + boolean forceRemapGlobalOrds, + SubAggCollectionMode collectionMode, + boolean showTermDocCountError, + List pipelineAggregators, + Map metaData) throws IOException { super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, pipelineAggregators, metaData); this.valuesSource = valuesSource; this.includeExclude = includeExclude; + final IndexReader reader = context.searcher().getIndexReader(); + final SortedSetDocValues values = reader.leaves().size() > 0 ? + valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) : DocValues.emptySortedSet(); + this.valueCount = values.getValueCount(); + this.lookupGlobalOrd = values::lookupOrd; + this.acceptedGlobalOrdinals = includeExclude != null ? includeExclude.acceptedGlobalOrdinals(values) : null; + + /** + * Remap global ords to dense bucket ordinals if any sub-aggregator cannot be deferred. + * Sub-aggregators expect dense buckets and allocate memories based on this assumption. + * Deferred aggregators are safe because the selected ordinals are remapped when the buckets + * are replayed. + */ + boolean remapGlobalOrds = forceRemapGlobalOrds || Arrays.stream(subAggregators).anyMatch((a) -> shouldDefer(a) == false); + this.bucketOrds = remapGlobalOrds ? new LongHash(1, context.bigArrays()) : null; } - protected long getBucketOrd(long termOrd) { - return termOrd; + + boolean remapGlobalOrds() { + return bucketOrds != null; + } + + protected final long getBucketOrd(long globalOrd) { + return bucketOrds == null ? globalOrd : bucketOrds.find(globalOrd); + } + + private void collectGlobalOrd(int doc, long globalOrd, LeafBucketCollector sub) throws IOException { + if (bucketOrds == null) { + collectExistingBucket(sub, doc, globalOrd); + } else { + long bucketOrd = bucketOrds.add(globalOrd); + if (bucketOrd < 0) { + bucketOrd = -1 - bucketOrd; + collectExistingBucket(sub, doc, bucketOrd); + } else { + collectBucket(sub, doc, bucketOrd); + } + } + } + + private SortedSetDocValues getGlobalOrds(LeafReaderContext ctx) throws IOException { + return acceptedGlobalOrdinals == null ? + valuesSource.globalOrdinalsValues(ctx) : new FilteredOrdinals(valuesSource.globalOrdinalsValues(ctx), acceptedGlobalOrdinals); } @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - - globalOrds = valuesSource.globalOrdinalsValues(ctx); - - if (acceptedGlobalOrdinals == null && includeExclude != null) { - acceptedGlobalOrdinals = includeExclude.acceptedGlobalOrdinals(globalOrds); + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + final SortedSetDocValues globalOrds = getGlobalOrds(ctx); + if (bucketOrds == null) { + grow(globalOrds.getValueCount()); } - - if (acceptedGlobalOrdinals != null) { - globalOrds = new FilteredOrdinals(globalOrds, acceptedGlobalOrdinals); - } - - return newCollector(globalOrds, sub); - } - - protected LeafBucketCollector newCollector(final SortedSetDocValues ords, - final LeafBucketCollector sub) { - grow(ords.getValueCount()); - final SortedDocValues singleValues = DocValues.unwrapSingleton(ords); + final SortedDocValues singleValues = DocValues.unwrapSingleton(globalOrds); if (singleValues != null) { - return new LeafBucketCollectorBase(sub, ords) { + return new LeafBucketCollectorBase(sub, globalOrds) { @Override public void collect(int doc, long bucket) throws IOException { assert bucket == 0; if (singleValues.advanceExact(doc)) { final int ord = singleValues.ordValue(); - collectExistingBucket(sub, doc, ord); + collectGlobalOrd(doc, ord, sub); } } }; } else { - return new LeafBucketCollectorBase(sub, ords) { + return new LeafBucketCollectorBase(sub, globalOrds) { @Override public void collect(int doc, long bucket) throws IOException { assert bucket == 0; - if (ords.advanceExact(doc)) { - for (long globalOrd = ords.nextOrd(); - globalOrd != SortedSetDocValues.NO_MORE_ORDS; - globalOrd = ords.nextOrd()) { - collectExistingBucket(sub, doc, globalOrd); + if (globalOrds.advanceExact(doc)) { + for (long globalOrd = globalOrds.nextOrd(); globalOrd != NO_MORE_ORDS; globalOrd = globalOrds.nextOrd()) { + collectGlobalOrd(doc, globalOrd, sub); } } } @@ -145,21 +185,21 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr @Override public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException { - if (globalOrds == null) { // no context in this reader + if (valueCount == 0) { // no context in this reader return buildEmptyAggregation(); } final int size; if (bucketCountThresholds.getMinDocCount() == 0) { // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns - size = (int) Math.min(globalOrds.getValueCount(), bucketCountThresholds.getShardSize()); + size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); } else { size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); } long otherDocCount = 0; BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator(this)); OrdBucket spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0); - for (long globalTermOrd = 0; globalTermOrd < globalOrds.getValueCount(); ++globalTermOrd) { + for (long globalTermOrd = 0; globalTermOrd < valueCount; ++globalTermOrd) { if (includeExclude != null && !acceptedGlobalOrdinals.get(globalTermOrd)) { continue; } @@ -184,10 +224,10 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr final StringTerms.Bucket[] list = new StringTerms.Bucket[ordered.size()]; long survivingBucketOrds[] = new long[ordered.size()]; for (int i = ordered.size() - 1; i >= 0; --i) { - final OrdBucket bucket = (OrdBucket) ordered.pop(); + final OrdBucket bucket = ordered.pop(); survivingBucketOrds[i] = bucket.bucketOrd; BytesRef scratch = new BytesRef(); - copy(globalOrds.lookupOrd(bucket.globalOrd), scratch); + copy(lookupGlobalOrd.apply(bucket.globalOrd), scratch); list[i] = new StringTerms.Bucket(scratch, bucket.docCount, null, showTermDocCountError, 0, format); list[i].bucketOrd = bucket.bucketOrd; otherDocCount -= list[i].docCount; @@ -254,76 +294,9 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } } - /** - * Variant of {@link GlobalOrdinalsStringTermsAggregator} that rebases hashes in order to make them dense. Might be - * useful in case few hashes are visited. - */ - public static class WithHash extends GlobalOrdinalsStringTermsAggregator { - - private final LongHash bucketOrds; - - public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals valuesSource, BucketOrder order, - DocValueFormat format, BucketCountThresholds bucketCountThresholds, IncludeExclude.OrdinalsFilter includeExclude, - SearchContext context, Aggregator parent, SubAggCollectionMode collectionMode, - boolean showTermDocCountError, List pipelineAggregators, Map metaData) - throws IOException { - super(name, factories, valuesSource, order, format, bucketCountThresholds, includeExclude, context, parent, collectionMode, - showTermDocCountError, pipelineAggregators, metaData); - bucketOrds = new LongHash(1, context.bigArrays()); - } - - @Override - protected LeafBucketCollector newCollector(final SortedSetDocValues ords, - final LeafBucketCollector sub) { - final SortedDocValues singleValues = DocValues.unwrapSingleton(ords); - if (singleValues != null) { - return new LeafBucketCollectorBase(sub, ords) { - @Override - public void collect(int doc, long bucket) throws IOException { - if (singleValues.advanceExact(doc)) { - final int globalOrd = singleValues.ordValue(); - long bucketOrd = bucketOrds.add(globalOrd); - if (bucketOrd < 0) { - bucketOrd = -1 - bucketOrd; - collectExistingBucket(sub, doc, bucketOrd); - } else { - collectBucket(sub, doc, bucketOrd); - } - } - } - }; - } else { - return new LeafBucketCollectorBase(sub, ords) { - @Override - public void collect(int doc, long bucket) throws IOException { - if (ords.advanceExact(doc)) { - for (long globalOrd = ords.nextOrd(); - globalOrd != SortedSetDocValues.NO_MORE_ORDS; - globalOrd = ords.nextOrd()) { - long bucketOrd = bucketOrds.add(globalOrd); - if (bucketOrd < 0) { - bucketOrd = -1 - bucketOrd; - collectExistingBucket(sub, doc, bucketOrd); - } else { - collectBucket(sub, doc, bucketOrd); - } - } - } - } - }; - } - } - - @Override - protected long getBucketOrd(long termOrd) { - return bucketOrds.find(termOrd); - } - - @Override - protected void doClose() { - Releasables.close(bucketOrds); - } - + @Override + protected void doClose() { + Releasables.close(bucketOrds); } /** @@ -331,32 +304,44 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr * instead of on the fly for each match.This is beneficial for low cardinality fields, because it can reduce * the amount of look-ups significantly. */ - public static class LowCardinality extends GlobalOrdinalsStringTermsAggregator { + static class LowCardinality extends GlobalOrdinalsStringTermsAggregator { private IntArray segmentDocCounts; - + private SortedSetDocValues globalOrds; private SortedSetDocValues segmentOrds; - public LowCardinality(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals valuesSource, - BucketOrder order, DocValueFormat format, - BucketCountThresholds bucketCountThresholds, SearchContext context, Aggregator parent, - SubAggCollectionMode collectionMode, boolean showTermDocCountError, List pipelineAggregators, - Map metaData) throws IOException { - super(name, factories, valuesSource, order, format, bucketCountThresholds, null, context, parent, collectionMode, - showTermDocCountError, pipelineAggregators, metaData); + LowCardinality(String name, + AggregatorFactories factories, + ValuesSource.Bytes.WithOrdinals valuesSource, + BucketOrder order, + DocValueFormat format, + BucketCountThresholds bucketCountThresholds, + SearchContext context, + Aggregator parent, + boolean forceDenseMode, + SubAggCollectionMode collectionMode, + boolean showTermDocCountError, + List pipelineAggregators, + Map metaData) throws IOException { + super(name, factories, valuesSource, order, format, bucketCountThresholds, null, + context, parent, forceDenseMode, collectionMode, showTermDocCountError, pipelineAggregators, metaData); assert factories == null || factories.countAggregators() == 0; this.segmentDocCounts = context.bigArrays().newIntArray(1, true); } - // bucketOrd is ord + 1 to avoid a branch to deal with the missing ord @Override - protected LeafBucketCollector newCollector(final SortedSetDocValues ords, - LeafBucketCollector sub) { - segmentDocCounts = context.bigArrays().grow(segmentDocCounts, 1 + ords.getValueCount()); + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, + final LeafBucketCollector sub) throws IOException { + if (segmentOrds != null) { + mapSegmentCountsToGlobalCounts(); + } + globalOrds = valuesSource.globalOrdinalsValues(ctx); + segmentOrds = valuesSource.ordinalsValues(ctx); + segmentDocCounts = context.bigArrays().grow(segmentDocCounts, 1 + segmentOrds.getValueCount()); assert sub == LeafBucketCollector.NO_OP_COLLECTOR; - final SortedDocValues singleValues = DocValues.unwrapSingleton(ords); + final SortedDocValues singleValues = DocValues.unwrapSingleton(segmentOrds); if (singleValues != null) { - return new LeafBucketCollectorBase(sub, ords) { + return new LeafBucketCollectorBase(sub, segmentOrds) { @Override public void collect(int doc, long bucket) throws IOException { assert bucket == 0; @@ -367,14 +352,12 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } }; } else { - return new LeafBucketCollectorBase(sub, ords) { + return new LeafBucketCollectorBase(sub, segmentOrds) { @Override public void collect(int doc, long bucket) throws IOException { assert bucket == 0; - if (ords.advanceExact(doc)) { - for (long segmentOrd = ords.nextOrd(); - segmentOrd != SortedSetDocValues.NO_MORE_ORDS; - segmentOrd = ords.nextOrd()) { + if (segmentOrds.advanceExact(doc)) { + for (long segmentOrd = segmentOrds.nextOrd(); segmentOrd != NO_MORE_ORDS; segmentOrd = segmentOrds.nextOrd()) { segmentDocCounts.increment(segmentOrd + 1, 1); } } @@ -383,18 +366,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } } - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - if (segmentOrds != null) { - mapSegmentCountsToGlobalCounts(); - } - - globalOrds = valuesSource.globalOrdinalsValues(ctx); - segmentOrds = valuesSource.ordinalsValues(ctx); - return newCollector(segmentOrds, sub); - } - @Override protected void doPostCollection() { if (segmentOrds != null) { @@ -426,7 +397,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } final long ord = i - 1; // remember we do +1 when counting final long globalOrd = mapping == null ? ord : mapping.getGlobalOrd(ord); - incrementBucketDocCount(globalOrd, inc); + long bucketOrd = getBucketOrd(globalOrd); + incrementBucketDocCount(bucketOrd, inc); } } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 9a06dfe66f5..ca2974a1059 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -53,10 +53,18 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory config, BucketOrder order, - IncludeExclude includeExclude, String executionHint, SubAggCollectionMode collectMode, - TermsAggregator.BucketCountThresholds bucketCountThresholds, boolean showTermDocCountError, SearchContext context, - AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, Map metaData) throws IOException { + public TermsAggregatorFactory(String name, + ValuesSourceConfig config, + BucketOrder order, + IncludeExclude includeExclude, + String executionHint, + SubAggCollectionMode collectMode, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + boolean showTermDocCountError, + SearchContext context, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metaData) throws IOException { super(name, config, context, parent, subFactoriesBuilder, metaData); this.order = order; this.includeExclude = includeExclude; @@ -225,14 +233,24 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory pipelineAggregators, Map metaData) - throws IOException { + Aggregator create(String name, + AggregatorFactories factories, + ValuesSource valuesSource, + BucketOrder order, + DocValueFormat format, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + IncludeExclude includeExclude, + SearchContext context, + Aggregator parent, + SubAggCollectionMode subAggCollectMode, + boolean showTermDocCountError, + List pipelineAggregators, + Map metaData) throws IOException { + final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format); return new StringTermsAggregator(name, factories, valuesSource, order, format, bucketCountThresholds, filter, context, parent, subAggCollectMode, showTermDocCountError, pipelineAggregators, metaData); + } @Override @@ -244,15 +262,24 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory pipelineAggregators, Map metaData) - throws IOException { + Aggregator create(String name, + AggregatorFactories factories, + ValuesSource valuesSource, + BucketOrder order, + DocValueFormat format, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + IncludeExclude includeExclude, + SearchContext context, Aggregator parent, + SubAggCollectionMode subAggCollectMode, + boolean showTermDocCountError, + List pipelineAggregators, + Map metaData) throws IOException { + final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format); return new GlobalOrdinalsStringTermsAggregator(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, order, - format, bucketCountThresholds, filter, context, parent, subAggCollectMode, showTermDocCountError, + format, bucketCountThresholds, filter, context, parent, false, subAggCollectMode, showTermDocCountError, pipelineAggregators, metaData); + } @Override @@ -264,15 +291,25 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory pipelineAggregators, Map metaData) - throws IOException { + Aggregator create(String name, + AggregatorFactories factories, + ValuesSource valuesSource, + BucketOrder order, + DocValueFormat format, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + IncludeExclude includeExclude, + SearchContext context, + Aggregator parent, + SubAggCollectionMode subAggCollectMode, + boolean showTermDocCountError, + List pipelineAggregators, + Map metaData) throws IOException { + final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format); - return new GlobalOrdinalsStringTermsAggregator.WithHash(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, - order, format, bucketCountThresholds, filter, context, parent, subAggCollectMode, showTermDocCountError, - pipelineAggregators, metaData); + return new GlobalOrdinalsStringTermsAggregator(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, + order, format, bucketCountThresholds, filter, context, parent, true, subAggCollectMode, + showTermDocCountError, pipelineAggregators, metaData); + } @Override @@ -283,21 +320,31 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory pipelineAggregators, Map metaData) - throws IOException { + Aggregator create(String name, + AggregatorFactories factories, + ValuesSource valuesSource, + BucketOrder order, + DocValueFormat format, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + IncludeExclude includeExclude, + SearchContext context, + Aggregator parent, + SubAggCollectionMode subAggCollectMode, + boolean showTermDocCountError, + List pipelineAggregators, + Map metaData) throws IOException { + if (includeExclude != null || factories.countAggregators() > 0 - // we need the FieldData impl to be able to extract the - // segment to global ord mapping + // we need the FieldData impl to be able to extract the + // segment to global ord mapping || valuesSource.getClass() != ValuesSource.Bytes.FieldData.class) { return GLOBAL_ORDINALS.create(name, factories, valuesSource, order, format, bucketCountThresholds, includeExclude, context, parent, subAggCollectMode, showTermDocCountError, pipelineAggregators, metaData); } return new GlobalOrdinalsStringTermsAggregator.LowCardinality(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, order, format, bucketCountThresholds, context, parent, - subAggCollectMode, showTermDocCountError, pipelineAggregators, metaData); + false, subAggCollectMode, showTermDocCountError, pipelineAggregators, metaData); + } @Override @@ -321,11 +368,19 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory pipelineAggregators, Map metaData) - throws IOException; + abstract Aggregator create(String name, + AggregatorFactories factories, + ValuesSource valuesSource, + BucketOrder order, + DocValueFormat format, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + IncludeExclude includeExclude, + SearchContext context, + Aggregator parent, + SubAggCollectionMode subAggCollectMode, + boolean showTermDocCountError, + List pipelineAggregators, + Map metaData) throws IOException; abstract boolean needsGlobalOrdinals(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java index 46e371a3dfe..dd0785b2d70 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java @@ -233,16 +233,14 @@ public class IncludeExclude implements Writeable, ToXContent { } public abstract static class OrdinalsFilter { - public abstract LongBitSet acceptedGlobalOrdinals(SortedSetDocValues globalOrdinals) - throws IOException; + public abstract LongBitSet acceptedGlobalOrdinals(SortedSetDocValues globalOrdinals) throws IOException; } class PartitionedOrdinalsFilter extends OrdinalsFilter { @Override - public LongBitSet acceptedGlobalOrdinals(SortedSetDocValues globalOrdinals) - throws IOException { + public LongBitSet acceptedGlobalOrdinals(SortedSetDocValues globalOrdinals) throws IOException { final long numOrds = globalOrdinals.getValueCount(); final LongBitSet acceptedGlobalOrdinals = new LongBitSet(numOrds); final TermsEnum termEnum = globalOrdinals.termsEnum(); @@ -271,8 +269,7 @@ public class IncludeExclude implements Writeable, ToXContent { * */ @Override - public LongBitSet acceptedGlobalOrdinals(SortedSetDocValues globalOrdinals) - throws IOException { + public LongBitSet acceptedGlobalOrdinals(SortedSetDocValues globalOrdinals) throws IOException { LongBitSet acceptedGlobalOrdinals = new LongBitSet(globalOrdinals.getValueCount()); TermsEnum globalTermsEnum; Terms globalTerms = new DocValuesTerms(globalOrdinals); @@ -297,8 +294,7 @@ public class IncludeExclude implements Writeable, ToXContent { } @Override - public LongBitSet acceptedGlobalOrdinals(SortedSetDocValues globalOrdinals) - throws IOException { + public LongBitSet acceptedGlobalOrdinals(SortedSetDocValues globalOrdinals) throws IOException { LongBitSet acceptedGlobalOrdinals = new LongBitSet(globalOrdinals.getValueCount()); if (includeValues != null) { for (BytesRef term : includeValues) { diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactoryTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactoryTests.java index 7d7ffac4fb3..fe32bff86b8 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactoryTests.java @@ -21,10 +21,11 @@ package org.elasticsearch.search.aggregations.bucket.terms; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.ESTestCase; import static org.hamcrest.Matchers.equalTo; -public class TermsAggregatorFactoryTests extends ESSingleNodeTestCase { +public class TermsAggregatorFactoryTests extends ESTestCase { public void testSubAggCollectMode() throws Exception { assertThat(TermsAggregatorFactory.subAggCollectionMode(Integer.MAX_VALUE, -1), equalTo(Aggregator.SubAggCollectionMode.DEPTH_FIRST)); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index f54cb902d96..33e0c54d934 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -35,6 +35,8 @@ import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.BucketOrder; @@ -44,7 +46,53 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.hamcrest.Matchers.instanceOf; + public class TermsAggregatorTests extends AggregatorTestCase { + public void testGlobalOrdinalsExecutionHint() throws Exception { + Directory directory = newDirectory(); + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + indexWriter.close(); + IndexReader indexReader = DirectoryReader.open(directory); + // We do not use LuceneTestCase.newSearcher because we need a DirectoryReader + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + + TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name", ValueType.STRING) + .field("string") + .collectMode(Aggregator.SubAggCollectionMode.BREADTH_FIRST); + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType(); + fieldType.setName("string"); + fieldType.setHasDocValues(true); + + TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType); + assertThat(aggregator, instanceOf(GlobalOrdinalsStringTermsAggregator.class)); + GlobalOrdinalsStringTermsAggregator globalAgg = (GlobalOrdinalsStringTermsAggregator) aggregator; + assertFalse(globalAgg.remapGlobalOrds()); + + aggregationBuilder + .subAggregation(AggregationBuilders.cardinality("card").field("string")); + aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType); + assertThat(aggregator, instanceOf(GlobalOrdinalsStringTermsAggregator.class)); + globalAgg = (GlobalOrdinalsStringTermsAggregator) aggregator; + assertFalse(globalAgg.remapGlobalOrds()); + + aggregationBuilder + .order(BucketOrder.aggregation("card", true)); + aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType); + assertThat(aggregator, instanceOf(GlobalOrdinalsStringTermsAggregator.class)); + globalAgg = (GlobalOrdinalsStringTermsAggregator) aggregator; + assertTrue(globalAgg.remapGlobalOrds()); + + aggregationBuilder = new TermsAggregationBuilder("_name", ValueType.STRING) + .field("string") + .executionHint("global_ordinals_hash"); + aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType); + assertThat(aggregator, instanceOf(GlobalOrdinalsStringTermsAggregator.class)); + globalAgg = (GlobalOrdinalsStringTermsAggregator) aggregator; + assertTrue(globalAgg.remapGlobalOrds()); + indexReader.close(); + directory.close(); + } public void testTermsAggregator() throws Exception { Directory directory = newDirectory(); diff --git a/core/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java b/core/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java index b09c177bf0b..9914938854d 100644 --- a/core/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java +++ b/core/src/test/java/org/elasticsearch/search/profile/aggregation/AggregationProfilerIT.java @@ -154,7 +154,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { ProfileResult termsAggResult = histoAggResult.getProfiledChildren().get(0); assertThat(termsAggResult, notNullValue()); - assertThat(termsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.WithHash.class.getName())); + assertThat(termsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.class.getName())); assertThat(termsAggResult.getLuceneDescription(), equalTo("terms")); assertThat(termsAggResult.getTime(), greaterThan(0L)); Map termsBreakdown = termsAggResult.getTimeBreakdown(); @@ -224,7 +224,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { ProfileResult termsAggResult = histoAggResult.getProfiledChildren().get(0); assertThat(termsAggResult, notNullValue()); - assertThat(termsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.WithHash.class.getName())); + assertThat(termsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.class.getName())); assertThat(termsAggResult.getLuceneDescription(), equalTo("terms")); assertThat(termsAggResult.getTime(), greaterThan(0L)); Map termsBreakdown = termsAggResult.getTimeBreakdown(); @@ -355,7 +355,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { ProfileResult tagsAggResult = histoAggResult.getProfiledChildren().get(0); assertThat(tagsAggResult, notNullValue()); - assertThat(tagsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.WithHash.class.getName())); + assertThat(tagsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.class.getName())); assertThat(tagsAggResult.getLuceneDescription(), equalTo("tags")); assertThat(tagsAggResult.getTime(), greaterThan(0L)); Map tagsBreakdown = tagsAggResult.getTimeBreakdown(); @@ -406,7 +406,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { ProfileResult stringsAggResult = histoAggResult.getProfiledChildren().get(1); assertThat(stringsAggResult, notNullValue()); - assertThat(stringsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.WithHash.class.getName())); + assertThat(stringsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.class.getName())); assertThat(stringsAggResult.getLuceneDescription(), equalTo("strings")); assertThat(stringsAggResult.getTime(), greaterThan(0L)); Map stringsBreakdown = stringsAggResult.getTimeBreakdown(); @@ -457,7 +457,7 @@ public class AggregationProfilerIT extends ESIntegTestCase { tagsAggResult = stringsAggResult.getProfiledChildren().get(2); assertThat(tagsAggResult, notNullValue()); - assertThat(tagsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.WithHash.class.getName())); + assertThat(tagsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.class.getName())); assertThat(tagsAggResult.getLuceneDescription(), equalTo("tags")); assertThat(tagsAggResult.getTime(), greaterThan(0L)); tagsBreakdown = tagsAggResult.getTimeBreakdown(); From b28141a9903d72396c52a4faa6d9cf3d01b72b57 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 13:19:44 -0400 Subject: [PATCH 012/208] Fill gaps on primary promotion When a primary is promoted, it could have gaps in its history due to concurrency and in-flight operations when it was serving as a replica. This commit fills the gaps in the history of the promoted shard after all operations from the previous term have drained, and future operations are blocked. This commit does not handle replicating the no-ops that fill the gaps to any remaining replicas, that is the responsibility of the primary/replica sync that we are laying the ground work for. Relates #24945 --- .../elasticsearch/index/shard/IndexShard.java | 5 +- .../index/shard/IndexShardTests.java | 69 ++++++++++++++++++- 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6a790261252..13abf553785 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -369,7 +369,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl indexShardOperationPermits.asyncBlockOperations( 30, TimeUnit.MINUTES, - latch::await, + () -> { + latch.await(); + getEngine().fillSeqNoGaps(newPrimaryTerm); + }, e -> failShard("exception during primary term transition", e)); primaryTerm = newPrimaryTerm; latch.countDown(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e7aa3c61b4e..38cac70b5e3 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -131,7 +131,9 @@ import static java.util.Collections.emptySet; import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.VersionType.EXTERNAL; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.Matchers.containsString; @@ -141,6 +143,7 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; /** @@ -388,6 +391,70 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(indexShard); } + public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { + final IndexShard indexShard = newStartedShard(false); + + // most of the time this is large enough that most of the time there will be at least one gap + final int operations = 1024 - scaledRandomIntBetween(0, 1024); + int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED); + boolean gap = false; + for (int i = 0; i < operations; i++) { + final String id = Integer.toString(i); + final ParsedDocument doc = testParsedDocument(id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null); + if (!rarely()) { + final Term uid = new Term("_id", doc.id()); + final Engine.Index index = + new Engine.Index(uid, doc, i, indexShard.getPrimaryTerm(), 1, EXTERNAL, REPLICA, System.nanoTime(), -1, false); + indexShard.index(index); + max = i; + } else { + gap = true; + } + } + + final int maxSeqNo = max; + if (gap) { + assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo))); + } + + // promote the replica + final ShardRouting replicaRouting = indexShard.routingEntry(); + final ShardRouting primaryRouting = + TestShardRouting.newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + null, + true, + ShardRoutingState.STARTED, + replicaRouting.allocationId()); + indexShard.updateRoutingEntry(primaryRouting); + indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); + + /* + * This operation completing means that the delay operation executed as part of increasing the primary term has completed and the + * gaps are filled. + */ + final CountDownLatch latch = new CountDownLatch(1); + indexShard.acquirePrimaryOperationPermit( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + releasable.close(); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.GENERIC); + + latch.await(); + assertThat(indexShard.getLocalCheckpoint(), equalTo((long) maxSeqNo)); + closeShards(indexShard); + } + public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; @@ -1172,7 +1239,7 @@ public class IndexShardTests extends IndexShardTestCase { test = otherShard.prepareIndexOnReplica( SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(), XContentType.JSON), - 1, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + 1, 1, 1, EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); otherShard.index(test); final ShardRouting primaryShardRouting = shard.routingEntry(); From 9b6b4ffe8e11ab807ab7ee70111d461f391c7771 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 15:34:20 -0400 Subject: [PATCH 013/208] Set number of processes in systemd unit file This commit sets the number of processes in the systemd unit file for Elasticsearch to meet the bootstrap checks. Relates #24970 --- distribution/src/main/packaging/systemd/elasticsearch.service | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distribution/src/main/packaging/systemd/elasticsearch.service b/distribution/src/main/packaging/systemd/elasticsearch.service index 0554371a1f9..45457069385 100644 --- a/distribution/src/main/packaging/systemd/elasticsearch.service +++ b/distribution/src/main/packaging/systemd/elasticsearch.service @@ -38,6 +38,9 @@ StandardError=inherit # Specifies the maximum file descriptor number that can be opened by this process LimitNOFILE=65536 +# Specifies the maximum number of processes +LimitNPROC=4096 + # Specifies the maximum number of bytes of memory that may be locked into RAM # Set to "infinity" if you use the 'bootstrap.memory_lock: true' option # in elasticsearch.yml and 'MAX_LOCKED_MEMORY=unlimited' in ${path.env} From ac94253dce99f12742085e02ed891dc397ba2272 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 16:22:17 -0400 Subject: [PATCH 014/208] Clarify acquiring index shard permit In previous work, we refactored the delay mechanism in index shard operation permits to allow for async delaying of acquisition. This refactoring made explicit when permit acquisition is disabled whereas previously we were relying on an implicit condition, namely that all permits were acquired by the thread trying to delay acquisition. When using the implicit mechanism, we tried to acquire a permit and if this failed, we returned a null releasable as an indication that our operation should be queued. Yet, now we know when we are delayed and we should not even try to acquire a permit. If we try to acquire a permit and one is not available, we know that we are not delayed, and so acquisition should be successful. If it is not successful, something is deeply wrong. This commit takes advantage of this refactoring to simplify the internal implementation. Relates #24971 --- .../shard/IndexShardOperationPermits.java | 13 +++++------ .../IndexShardOperationPermitsTests.java | 22 +++++++++++++++++++ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 83a372dd453..de539026e7a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.common.CheckedRunnable; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; @@ -53,7 +52,7 @@ final class IndexShardOperationPermits implements Closeable { private final Logger logger; private final ThreadPool threadPool; - private static final int TOTAL_PERMITS = Integer.MAX_VALUE; + static final int TOTAL_PERMITS = Integer.MAX_VALUE; final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved private final List> delayedOperations = new ArrayList<>(); // operations that are delayed private volatile boolean closed; @@ -225,8 +224,7 @@ final class IndexShardOperationPermits implements Closeable { } return; } else { - releasable = tryAcquire(); - assert releasable != null; + releasable = acquire(); } } } catch (final InterruptedException e) { @@ -237,8 +235,7 @@ final class IndexShardOperationPermits implements Closeable { onAcquired.onResponse(releasable); } - @Nullable - private Releasable tryAcquire() throws InterruptedException { + private Releasable acquire() throws InterruptedException { assert Thread.holdsLock(this); if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting final AtomicBoolean closed = new AtomicBoolean(); @@ -247,8 +244,10 @@ final class IndexShardOperationPermits implements Closeable { semaphore.release(1); } }; + } else { + // this should never happen, if it does something is deeply wrong + throw new IllegalStateException("failed to obtain permit but operations are not delayed"); } - return null; } /** diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index ec22f9d862b..41dc8f520cc 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -533,6 +533,28 @@ public class IndexShardOperationPermitsTests extends ESTestCase { thread.join(); } + public void testNoPermitsRemaining() throws InterruptedException { + permits.semaphore.tryAcquire(IndexShardOperationPermits.TOTAL_PERMITS, 1, TimeUnit.SECONDS); + final IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> this.permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + assert false; + } + + @Override + public void onFailure(Exception e) { + assert false; + } + }, + ThreadPool.Names.GENERIC, + false)); + assertThat(e, hasToString(containsString("failed to obtain permit but operations are not delayed"))); + permits.semaphore.release(IndexShardOperationPermits.TOTAL_PERMITS); + } + /** * Returns an operation that acquires a permit and synchronizes in the following manner: *