diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index 4f478057fa0..a9b0ba58e09 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -583,7 +583,6 @@ public class ElasticsearchException extends RuntimeException implements ToXConte org.elasticsearch.index.query.QueryParsingException.class, org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnPrimaryException.class, org.elasticsearch.index.engine.DeleteByQueryFailedEngineException.class, - org.elasticsearch.index.engine.ForceMergeFailedEngineException.class, org.elasticsearch.discovery.MasterNotDiscoveredException.class, org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException.class, org.elasticsearch.node.NodeClosedException.class, diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java index 549ce8fd57b..846612dae47 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java @@ -74,7 +74,7 @@ public class TransportOptimizeAction extends TransportBroadcastByNodeAction() { @Override diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 21f0c7a13e2..0cb21055974 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IntroSorter; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; @@ -36,6 +37,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.PriorityComparator; import org.elasticsearch.node.settings.NodeSettingsService; import java.util.*; @@ -559,6 +561,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * use the sorter to save some iterations. */ final AllocationDeciders deciders = allocation.deciders(); + final PriorityComparator secondaryComparator = PriorityComparator.getAllocationComparator(allocation); final Comparator comparator = new Comparator() { @Override public int compare(ShardRouting o1, @@ -570,7 +573,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if ((indexCmp = o1.index().compareTo(o2.index())) == 0) { return o1.getId() - o2.getId(); } - return indexCmp; + // this comparator is more expensive than all the others up there + // that's why it's added last even though it could be easier to read + // if we'd apply it earlier. this comparator will only differentiate across + // indices all shards of the same index is treated equally. + final int secondary = secondaryComparator.compare(o1, o2); + return secondary == 0 ? indexCmp : secondary; } }; /* diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 578f574744f..6b52531c18c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -989,7 +989,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } } - private ClusterState rejoin(ClusterState clusterState, String reason) { + protected ClusterState rejoin(ClusterState clusterState, String reason) { // *** called from within an cluster state update task *** // assert Thread.currentThread().getName().contains(InternalClusterService.UPDATE_THREAD_NAME); diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 449bd67e26c..50a77e197f0 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -120,14 +120,7 @@ public class GatewayAllocator extends AbstractComponent { boolean changed = false; RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); - unassigned.sort(new PriorityComparator() { - - @Override - protected Settings getIndexSettings(String index) { - IndexMetaData indexMetaData = allocation.metaData().index(index); - return indexMetaData.getSettings(); - } - }); // sort for priority ordering + unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering changed |= primaryShardAllocator.allocateUnassigned(allocation); changed |= replicaShardAllocator.processExistingRecoveries(allocation); diff --git a/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java b/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java index 2176a70c74b..4f70bf41488 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java @@ -21,6 +21,7 @@ package org.elasticsearch.gateway; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.settings.Settings; import java.util.Comparator; @@ -33,7 +34,7 @@ import java.util.Comparator; * here the newer indices matter more). If even that is the same, we compare the index name which is useful * if the date is baked into the index name. ie logstash-2015.05.03. */ -abstract class PriorityComparator implements Comparator { +public abstract class PriorityComparator implements Comparator { @Override public final int compare(ShardRouting o1, ShardRouting o2) { @@ -63,4 +64,17 @@ abstract class PriorityComparator implements Comparator { } protected abstract Settings getIndexSettings(String index); + + /** + * Returns a PriorityComparator that uses the RoutingAllocation index metadata to access the index setting per index. + */ + public static PriorityComparator getAllocationComparator(final RoutingAllocation allocation) { + return new PriorityComparator() { + @Override + protected Settings getIndexSettings(String index) { + IndexMetaData indexMetaData = allocation.metaData().index(index); + return indexMetaData.getSettings(); + } + }; + } } diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index d22a7540491..88fa656bd64 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; +import org.apache.lucene.util.Accountable; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -38,7 +39,10 @@ import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule; +import org.elasticsearch.index.fielddata.FieldDataType; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexFieldDataService; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.settings.IndexSettings; @@ -150,8 +154,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class); // inject workarounds for cyclic dep - indexFieldData.setIndexService(this); - bitSetFilterCache.setIndexService(this); + indexFieldData.setListener(new FieldDataCacheListener(this)); + bitSetFilterCache.setListener(new BitsetCacheListener(this)); this.nodeEnv = nodeEnv; } @@ -537,4 +541,62 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone public Settings getIndexSettings() { return indexSettings; } + + private static final class BitsetCacheListener implements BitsetFilterCache.Listener { + final IndexService indexService; + + private BitsetCacheListener(IndexService indexService) { + this.indexService = indexService; + } + + @Override + public void onCache(ShardId shardId, Accountable accountable) { + if (shardId != null) { + final IndexShard shard = indexService.shard(shardId.id()); + if (shard != null) { + long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0l; + shard.shardBitsetFilterCache().onCached(ramBytesUsed); + } + } + } + + @Override + public void onRemoval(ShardId shardId, Accountable accountable) { + if (shardId != null) { + final IndexShard shard = indexService.shard(shardId.id()); + if (shard != null) { + long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0l; + shard.shardBitsetFilterCache().onRemoval(ramBytesUsed); + } + } + } + } + + private final class FieldDataCacheListener implements IndexFieldDataCache.Listener { + final IndexService indexService; + + public FieldDataCacheListener(IndexService indexService) { + this.indexService = indexService; + } + + @Override + public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) { + if (shardId != null) { + final IndexShard shard = indexService.shard(shardId.id()); + if (shard != null) { + shard.fieldData().onCache(shardId, fieldNames, fieldDataType, ramUsage); + } + } + } + + @Override + public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) { + if (shardId != null) { + final IndexShard shard = indexService.shard(shardId.id()); + if (shard != null) { + shard.fieldData().onRemoval(shardId, fieldNames, fieldDataType, wasEvicted, sizeInBytes); + } + } + } + } } diff --git a/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java b/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java index 360efa15455..58768cd08d1 100644 --- a/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java +++ b/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java @@ -33,6 +33,7 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.Weight; import org.apache.lucene.search.join.BitSetProducer; +import org.apache.lucene.util.Accountable; import org.apache.lucene.util.BitDocIdSet; import org.apache.lucene.util.BitSet; import org.elasticsearch.ExceptionsHelper; @@ -43,7 +44,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.object.ObjectMapper; @@ -61,10 +61,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; +import java.util.concurrent.*; /** * This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time. @@ -76,12 +73,21 @@ import java.util.concurrent.Executor; public class BitsetFilterCache extends AbstractIndexComponent implements LeafReader.CoreClosedListener, RemovalListener>, Closeable { public static final String LOAD_RANDOM_ACCESS_FILTERS_EAGERLY = "index.load_fixed_bitset_filters_eagerly"; + private static final Listener DEFAULT_NOOP_LISTENER = new Listener() { + @Override + public void onCache(ShardId shardId, Accountable accountable) { + } + + @Override + public void onRemoval(ShardId shardId, Accountable accountable) { + } + }; private final boolean loadRandomAccessFiltersEagerly; private final Cache> loadedFilters; + private volatile Listener listener = DEFAULT_NOOP_LISTENER; private final BitSetProducerWarmer warmer; - private IndexService indexService; private IndicesWarmer indicesWarmer; @Inject @@ -95,16 +101,24 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea @Inject(optional = true) public void setIndicesWarmer(IndicesWarmer indicesWarmer) { this.indicesWarmer = indicesWarmer; - } - - public void setIndexService(IndexService indexService) { - this.indexService = indexService; - // First the indicesWarmer is set and then the indexService is set, because of this there is a small window of - // time where indexService is null. This is why the warmer should only registered after indexService has been set. - // Otherwise there is a small chance of the warmer running into a NPE, since it uses the indexService indicesWarmer.addListener(warmer); } + /** + * Sets a listener that is invoked for all subsequent cache and removal events. + * @throws IllegalStateException if the listener is set more than once + */ + public void setListener(Listener listener) { + if (listener == null) { + throw new IllegalArgumentException("listener must not be null"); + } + if (this.listener != DEFAULT_NOOP_LISTENER) { + throw new IllegalStateException("can't set listener more than once"); + } + this.listener = listener; + } + + public BitSetProducer getBitSetProducer(Query query) { return new QueryWrapperBitSetProducer(query); } @@ -116,7 +130,9 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea @Override public void close() { - indicesWarmer.removeListener(warmer); + if (indicesWarmer != null) { + indicesWarmer.removeListener(warmer); + } clear("close"); } @@ -135,31 +151,22 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea return CacheBuilder.newBuilder().build(); } }); - return filterToFbs.get(query, new Callable() { - @Override - public Value call() throws Exception { - final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); - final IndexSearcher searcher = new IndexSearcher(topLevelContext); - searcher.setQueryCache(null); - final Weight weight = searcher.createNormalizedWeight(query, false); - final DocIdSetIterator it = weight.scorer(context); - final BitSet bitSet; - if (it == null) { - bitSet = null; - } else { - bitSet = BitSet.of(it, context.reader().maxDoc()); - } - - Value value = new Value(bitSet, shardId); - if (shardId != null) { - IndexShard shard = indexService.shard(shardId.id()); - if (shard != null) { - long ramBytesUsed = value.bitset != null ? value.bitset.ramBytesUsed() : 0l; - shard.shardBitsetFilterCache().onCached(ramBytesUsed); - } - } - return value; + return filterToFbs.get(query, () -> { + final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); + final IndexSearcher searcher = new IndexSearcher(topLevelContext); + searcher.setQueryCache(null); + final Weight weight = searcher.createNormalizedWeight(query, false); + final DocIdSetIterator it = weight.scorer(context); + final BitSet bitSet; + if (it == null) { + bitSet = null; + } else { + bitSet = BitSet.of(it, context.reader().maxDoc()); } + + Value value = new Value(bitSet, shardId); + listener.onCache(shardId, value.bitset); + return value; }).bitset; } @@ -170,20 +177,13 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea return; } - Cache value = notification.getValue(); - if (value == null) { + Cache valueCache = notification.getValue(); + if (valueCache == null) { return; } - for (Map.Entry entry : value.asMap().entrySet()) { - if (entry.getValue().shardId == null) { - continue; - } - IndexShard shard = indexService.shard(entry.getValue().shardId.id()); - if (shard != null) { - ShardBitsetFilterCache shardBitsetFilterCache = shard.shardBitsetFilterCache(); - shardBitsetFilterCache.onRemoval(entry.getValue().bitset.ramBytesUsed()); - } + for (Value value : valueCache.asMap().values()) { + listener.onRemoval(value.shardId, value.bitset); // if null then this means the shard has already been removed and the stats are 0 anyway for the shard this key belongs to } } @@ -266,32 +266,22 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea final CountDownLatch latch = new CountDownLatch(context.searcher().reader().leaves().size() * warmUp.size()); for (final LeafReaderContext ctx : context.searcher().reader().leaves()) { for (final Query filterToWarm : warmUp) { - executor.execute(new Runnable() { - - @Override - public void run() { - try { - final long start = System.nanoTime(); - getAndLoadIfNotPresent(filterToWarm, ctx); - if (indexShard.warmerService().logger().isTraceEnabled()) { - indexShard.warmerService().logger().trace("warmed bitset for [{}], took [{}]", filterToWarm, TimeValue.timeValueNanos(System.nanoTime() - start)); - } - } catch (Throwable t) { - indexShard.warmerService().logger().warn("failed to load bitset for [{}]", t, filterToWarm); - } finally { - latch.countDown(); + executor.execute(() -> { + try { + final long start = System.nanoTime(); + getAndLoadIfNotPresent(filterToWarm, ctx); + if (indexShard.warmerService().logger().isTraceEnabled()) { + indexShard.warmerService().logger().trace("warmed bitset for [{}], took [{}]", filterToWarm, TimeValue.timeValueNanos(System.nanoTime() - start)); } + } catch (Throwable t) { + indexShard.warmerService().logger().warn("failed to load bitset for [{}]", t, filterToWarm); + } finally { + latch.countDown(); } - }); } } - return new TerminationHandle() { - @Override - public void awaitTermination() throws InterruptedException { - latch.await(); - } - }; + return () -> latch.await(); } @Override @@ -304,4 +294,25 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea Cache> getLoadedFilters() { return loadedFilters; } + + + /** + * A listener interface that is executed for each onCache / onRemoval event + */ + public interface Listener { + /** + * Called for each cached bitset on the cache event. + * @param shardId the shard id the bitset was cached for. This can be null + * @param accountable the bitsets ram representation + */ + void onCache(ShardId shardId, Accountable accountable); + /** + * Called for each cached bitset on the removal event. + * @param shardId the shard id the bitset was cached for. This can be null + * @param accountable the bitsets ram representation + */ + void onRemoval(ShardId shardId, Accountable accountable); + } + + } diff --git a/core/src/main/java/org/elasticsearch/index/cache/bitset/ShardBitsetFilterCache.java b/core/src/main/java/org/elasticsearch/index/cache/bitset/ShardBitsetFilterCache.java index 730539974ff..6e0551a8905 100644 --- a/core/src/main/java/org/elasticsearch/index/cache/bitset/ShardBitsetFilterCache.java +++ b/core/src/main/java/org/elasticsearch/index/cache/bitset/ShardBitsetFilterCache.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.cache.bitset; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; @@ -47,4 +46,5 @@ public class ShardBitsetFilterCache extends AbstractIndexShardComponent { public long getMemorySizeInBytes() { return totalMetric.count(); } + } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 071a00ba65c..54cbccadf25 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -502,14 +502,14 @@ public abstract class Engine implements Closeable { /** * Optimizes to 1 segment */ - public void forceMerge(boolean flush) { + public void forceMerge(boolean flush) throws IOException { forceMerge(flush, 1, false, false, false); } /** * Triggers a forced merge on this engine */ - public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException; + public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException; /** * Snapshots the index and returns a handle to it. If needed will try and "commit" the diff --git a/core/src/main/java/org/elasticsearch/index/engine/ForceMergeFailedEngineException.java b/core/src/main/java/org/elasticsearch/index/engine/ForceMergeFailedEngineException.java deleted file mode 100644 index 7aac909c9e8..00000000000 --- a/core/src/main/java/org/elasticsearch/index/engine/ForceMergeFailedEngineException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.engine; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; - -/** - * - */ -public class ForceMergeFailedEngineException extends EngineException { - - public ForceMergeFailedEngineException(ShardId shardId, Throwable t) { - super(shardId, "force merge failed", t); - } - - public ForceMergeFailedEngineException(StreamInput in) throws IOException{ - super(in); - } -} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 054b0b1dfff..512153b909a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -45,6 +45,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.DjbHashFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; @@ -823,7 +824,7 @@ public class InternalEngine extends Engine { @Override public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, - final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException { + final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, EngineClosedException, IOException { /* * We do NOT acquire the readlock here since we are waiting on the merges to finish * that's fine since the IW.rollback should stop all the threads and trigger an IOException @@ -865,9 +866,8 @@ public class InternalEngine extends Engine { store.decRef(); } } catch (Throwable t) { - ForceMergeFailedEngineException ex = new ForceMergeFailedEngineException(shardId, t); - maybeFailEngine("force merge", ex); - throw ex; + maybeFailEngine("force merge", t); + throw t; } finally { try { mp.setUpgradeInProgress(false, false); // reset it just to make sure we reset it in a case of an error diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java index 76d9c24da29..86b71c13a46 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.util.Accountable; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.shard.ShardId; /** * A simple field data cache abstraction on the *index* level. @@ -44,13 +45,19 @@ public interface IndexFieldDataCache { */ void clear(String fieldName); - void clear(Object coreCacheKey); + void clear(IndexReader reader); interface Listener { - void onLoad(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage); + /** + * Called after the fielddata is loaded during the cache phase + */ + void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage); - void onUnload(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes); + /** + * Called after the fielddata is unloaded + */ + void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes); } class None implements IndexFieldDataCache { @@ -75,8 +82,7 @@ public interface IndexFieldDataCache { } @Override - public void clear(Object coreCacheKey) { - + public void clear(IndexReader reader) { } } } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java index d0cf9b7da02..8f69e879e4c 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.fielddata; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import org.apache.lucene.util.Accountable; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.common.collect.MapBuilder; @@ -32,11 +33,12 @@ import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; import org.elasticsearch.index.fielddata.plain.*; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.core.BooleanFieldMapper; import org.elasticsearch.index.mapper.internal.IndexFieldMapper; import org.elasticsearch.index.mapper.internal.ParentFieldMapper; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -139,8 +141,18 @@ public class IndexFieldDataService extends AbstractIndexComponent { private final IndicesFieldDataCache indicesFieldDataCache; // the below map needs to be modified under a lock private final Map fieldDataCaches = Maps.newHashMap(); + private final MapperService mapperService; + private static final IndexFieldDataCache.Listener DEFAULT_NOOP_LISTENER = new IndexFieldDataCache.Listener() { + @Override + public void onCache(ShardId shardId, Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) { + } + + @Override + public void onRemoval(ShardId shardId, Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) { + } + }; + private volatile IndexFieldDataCache.Listener listener = DEFAULT_NOOP_LISTENER; - IndexService indexService; // We need to cache fielddata on the _parent field because of 1.x indices. // When we don't support 1.x anymore (3.0) then remove this caching @@ -149,15 +161,11 @@ public class IndexFieldDataService extends AbstractIndexComponent { @Inject public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache, - CircuitBreakerService circuitBreakerService) { + CircuitBreakerService circuitBreakerService, MapperService mapperService) { super(index, indexSettings); this.indicesFieldDataCache = indicesFieldDataCache; this.circuitBreakerService = circuitBreakerService; - } - - // we need to "inject" the index service to not create cyclic dep - public void setIndexService(IndexService indexService) { - this.indexService = indexService; + this.mapperService = mapperService; } public synchronized void clear() { @@ -229,7 +237,7 @@ public class IndexFieldDataService extends AbstractIndexComponent { // this means changing the node level settings is simple, just set the bounds there String cacheType = type.getSettings().get("cache", indexSettings.get(FIELDDATA_CACHE_KEY, FIELDDATA_CACHE_VALUE_NODE)); if (FIELDDATA_CACHE_VALUE_NODE.equals(cacheType)) { - cache = indicesFieldDataCache.buildIndexFieldDataCache(indexService, index, fieldNames, type); + cache = indicesFieldDataCache.buildIndexFieldDataCache(listener, index, fieldNames, type); } else if ("none".equals(cacheType)){ cache = new IndexFieldDataCache.None(); } else { @@ -243,13 +251,29 @@ public class IndexFieldDataService extends AbstractIndexComponent { && Version.indexCreated(indexSettings).before(Version.V_2_0_0_beta1); if (isOldParentField) { if (parentIndexFieldData == null) { - parentIndexFieldData = builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService()); + parentIndexFieldData = builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, mapperService); } return (IFD) parentIndexFieldData; } } - return (IFD) builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService()); + return (IFD) builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, mapperService); + } + + /** + * Sets a {@link org.elasticsearch.index.fielddata.IndexFieldDataCache.Listener} passed to each {@link IndexFieldData} + * creation to capture onCache and onRemoval events. Setting a listener on this method will override any previously + * set listeners. + * @throws IllegalStateException if the listener is set more than once + */ + public void setListener(IndexFieldDataCache.Listener listener) { + if (listener == null) { + throw new IllegalArgumentException("listener must not be null"); + } + if (this.listener != DEFAULT_NOOP_LISTENER) { + throw new IllegalStateException("can't set listener more than once"); + } + this.listener = listener; } } diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java b/core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java index 4b1f42060a6..8fccda94d80 100644 --- a/core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java +++ b/core/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java @@ -56,7 +56,7 @@ public class ShardFieldData implements IndexFieldDataCache.Listener { } @Override - public void onLoad(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) { + public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) { totalMetric.inc(ramUsage.ramBytesUsed()); String keyFieldName = fieldNames.indexName(); CounterMetric total = perFieldTotals.get(keyFieldName); @@ -73,7 +73,7 @@ public class ShardFieldData implements IndexFieldDataCache.Listener { } @Override - public void onUnload(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) { + public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) { if (wasEvicted) { evictionsMetric.inc(); } diff --git a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java index 91ff1de78e7..7add3d3f13c 100644 --- a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java +++ b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.indexing.IndexingOperationListener; @@ -82,7 +83,6 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener(); private final PercolateTypeListener percolateTypeListener = new PercolateTypeListener(); private final AtomicBoolean realTimePercolatorEnabled = new AtomicBoolean(false); - private boolean mapUnmappedFieldsAsString; public PercolatorQueriesRegistry(ShardId shardId, @IndexSettings Settings indexSettings, IndexQueryParserService queryParserService, diff --git a/core/src/main/java/org/elasticsearch/index/query/QueryParseContext.java b/core/src/main/java/org/elasticsearch/index/query/QueryParseContext.java index c8d6da065cc..f152d226d51 100644 --- a/core/src/main/java/org/elasticsearch/index/query/QueryParseContext.java +++ b/core/src/main/java/org/elasticsearch/index/query/QueryParseContext.java @@ -395,5 +395,4 @@ public class QueryParseContext { public Version indexVersionCreated() { return indexVersionCreated; } - } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2101ed03e63..2bc4ae1f666 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -697,7 +697,7 @@ public class IndexShard extends AbstractIndexShardComponent { } - public void optimize(OptimizeRequest optimize) { + public void optimize(OptimizeRequest optimize) throws IOException { verifyStarted(); if (logger.isTraceEnabled()) { logger.trace("optimize with {}", optimize); @@ -708,7 +708,7 @@ public class IndexShard extends AbstractIndexShardComponent { /** * Upgrades the shard to the current version of Lucene and returns the minimum segment version */ - public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) { + public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) throws IOException { verifyStarted(); if (logger.isTraceEnabled()) { logger.trace("upgrade with {}", upgrade); diff --git a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index 1bf020315c5..2a2aef4d691 100644 --- a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.util.Accountable; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; @@ -35,17 +36,13 @@ import org.elasticsearch.index.fielddata.AtomicFieldData; import org.elasticsearch.index.fielddata.FieldDataType; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldDataCache; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; /** */ @@ -95,8 +92,8 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL this.closed = true; } - public IndexFieldDataCache buildIndexFieldDataCache(IndexService indexService, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType) { - return new IndexFieldCache(logger, cache, indicesFieldDataCacheListener, indexService, index, fieldNames, fieldDataType); + public IndexFieldDataCache buildIndexFieldDataCache(IndexFieldDataCache.Listener listener, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType) { + return new IndexFieldCache(logger, cache, index, fieldNames, fieldDataType, indicesFieldDataCacheListener, listener); } public Cache getCache() { @@ -111,7 +108,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL final Accountable value = notification.getValue(); for (IndexFieldDataCache.Listener listener : key.listeners) { try { - listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), value.ramBytesUsed()); + listener.onRemoval(key.shardId, indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), value.ramBytesUsed()); } catch (Throwable e) { // load anyway since listeners should not throw exceptions logger.error("Failed to call listener on field data cache unloading", e); @@ -133,96 +130,78 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL */ static class IndexFieldCache implements IndexFieldDataCache, SegmentReader.CoreClosedListener, IndexReader.ReaderClosedListener { private final ESLogger logger; - private final IndexService indexService; final Index index; final MappedFieldType.Names fieldNames; final FieldDataType fieldDataType; private final Cache cache; - private final IndicesFieldDataCacheListener indicesFieldDataCacheListener; + private final Listener[] listeners; - IndexFieldCache(ESLogger logger,final Cache cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener, IndexService indexService, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType) { + IndexFieldCache(ESLogger logger,final Cache cache, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Listener... listeners) { this.logger = logger; - this.indexService = indexService; + this.listeners = listeners; this.index = index; this.fieldNames = fieldNames; this.fieldDataType = fieldDataType; this.cache = cache; - this.indicesFieldDataCacheListener = indicesFieldDataCacheListener; - assert indexService != null; } @Override public > FD load(final LeafReaderContext context, final IFD indexFieldData) throws Exception { - final Key key = new Key(this, context.reader().getCoreCacheKey()); + final ShardId shardId = ShardUtils.extractShardId(context.reader()); + final Key key = new Key(this, context.reader().getCoreCacheKey(), shardId); //noinspection unchecked - final Accountable accountable = cache.get(key, new Callable() { - @Override - public AtomicFieldData call() throws Exception { - context.reader().addCoreClosedListener(IndexFieldCache.this); - - key.listeners.add(indicesFieldDataCacheListener); - final ShardId shardId = ShardUtils.extractShardId(context.reader()); - if (shardId != null) { - final IndexShard shard = indexService.shard(shardId.id()); - if (shard != null) { - key.listeners.add(shard.fieldData()); - } - } - final AtomicFieldData fieldData = indexFieldData.loadDirect(context); - for (Listener listener : key.listeners) { - try { - listener.onLoad(fieldNames, fieldDataType, fieldData); - } catch (Throwable e) { - // load anyway since listeners should not throw exceptions - logger.error("Failed to call listener on atomic field data loading", e); - } - } - return fieldData; + final Accountable accountable = cache.get(key, () -> { + context.reader().addCoreClosedListener(IndexFieldCache.this); + for (Listener listener : this.listeners) { + key.listeners.add(listener); } + final AtomicFieldData fieldData = indexFieldData.loadDirect(context); + for (Listener listener : key.listeners) { + try { + listener.onCache(shardId, fieldNames, fieldDataType, fieldData); + } catch (Throwable e) { + // load anyway since listeners should not throw exceptions + logger.error("Failed to call listener on atomic field data loading", e); + } + } + return fieldData; }); return (FD) accountable; } @Override public > IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception { - final Key key = new Key(this, indexReader.getCoreCacheKey()); + final ShardId shardId = ShardUtils.extractShardId(indexReader); + final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId); //noinspection unchecked - final Accountable accountable = cache.get(key, new Callable() { - @Override - public Accountable call() throws Exception { - indexReader.addReaderClosedListener(IndexFieldCache.this); - key.listeners.add(indicesFieldDataCacheListener); - final ShardId shardId = ShardUtils.extractShardId(indexReader); - if (shardId != null) { - IndexShard shard = indexService.shard(shardId.id()); - if (shard != null) { - key.listeners.add(shard.fieldData()); - } - } - final Accountable ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader); - for (Listener listener : key.listeners) { - try { - listener.onLoad(fieldNames, fieldDataType, ifd); - } catch (Throwable e) { - // load anyway since listeners should not throw exceptions - logger.error("Failed to call listener on global ordinals loading", e); - } - } - return ifd; + final Accountable accountable = cache.get(key, () -> { + indexReader.addReaderClosedListener(IndexFieldCache.this); + for (Listener listener : this.listeners) { + key.listeners.add(listener); } + final Accountable ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader); + for (Listener listener : key.listeners) { + try { + listener.onCache(shardId, fieldNames, fieldDataType, ifd); + } catch (Throwable e) { + // load anyway since listeners should not throw exceptions + logger.error("Failed to call listener on global ordinals loading", e); + } + } + return ifd; }); return (IFD) accountable; } @Override public void onClose(Object coreKey) { - cache.invalidate(new Key(this, coreKey)); + cache.invalidate(new Key(this, coreKey, null)); // don't call cache.cleanUp here as it would have bad performance implications } @Override public void onClose(IndexReader reader) { - cache.invalidate(new Key(this, reader.getCoreCacheKey())); + cache.invalidate(new Key(this, reader.getCoreCacheKey(), null)); // don't call cache.cleanUp here as it would have bad performance implications } @@ -263,8 +242,8 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL } @Override - public void clear(Object coreCacheKey) { - cache.invalidate(new Key(this, coreCacheKey)); + public void clear(IndexReader indexReader) { + cache.invalidate(new Key(this, indexReader.getCoreCacheKey(), null)); // don't call cache.cleanUp here as it would have bad performance implications } } @@ -272,13 +251,14 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL public static class Key { public final IndexFieldCache indexCache; public final Object readerKey; + public final ShardId shardId; public final List listeners = new ArrayList<>(); - - Key(IndexFieldCache indexCache, Object readerKey) { + Key(IndexFieldCache indexCache, Object readerKey, @Nullable ShardId shardId) { this.indexCache = indexCache; this.readerKey = readerKey; + this.shardId = shardId; } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java index eff99a26e1a..75cfeb9572d 100644 --- a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java +++ b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.fielddata.cache; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.util.Accountable; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.inject.Inject; @@ -26,6 +27,7 @@ import org.elasticsearch.index.fielddata.FieldDataType; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.breaker.CircuitBreakerService; /** @@ -44,11 +46,11 @@ public class IndicesFieldDataCacheListener implements IndexFieldDataCache.Listen } @Override - public void onLoad(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable fieldData) { + public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable fieldData) { } @Override - public void onUnload(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) { + public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) { assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]"; circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes); } diff --git a/core/src/test/java/org/elasticsearch/bootstrap/BootstrapForTesting.java b/core/src/test/java/org/elasticsearch/bootstrap/BootstrapForTesting.java index a7b9043f350..d942cc20409 100644 --- a/core/src/test/java/org/elasticsearch/bootstrap/BootstrapForTesting.java +++ b/core/src/test/java/org/elasticsearch/bootstrap/BootstrapForTesting.java @@ -61,13 +61,7 @@ public class BootstrapForTesting { try { JarHell.checkJarHell(); } catch (Exception e) { - if (Boolean.parseBoolean(System.getProperty("tests.maven"))) { - throw new RuntimeException("found jar hell in test classpath", e); - } else { - Loggers.getLogger(BootstrapForTesting.class) - .warn("Your ide or custom test runner has jar hell issues, " + - "you might want to look into that", e); - } + throw new RuntimeException("found jar hell in test classpath", e); } // make sure java.io.tmpdir exists always (in case code uses it in a static initializer) diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java new file mode 100644 index 00000000000..6fbc92a66ad --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.test.ESAllocationTestCase; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.common.settings.Settings.settingsBuilder; + +public class AllocationPriorityTests extends ESAllocationTestCase { + + /** + * Tests that higher prioritized primaries and replicas are allocated first even on the balanced shard allocator + * See https://github.com/elastic/elasticsearch/issues/13249 for details + */ + public void testPrioritizedIndicesAllocatedFirst() { + AllocationService allocation = createAllocationService(settingsBuilder(). + put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, 1).build()); + final String highPriorityName; + final String lowPriorityName; + final int priorityFirst; + final int prioritySecond; + if (randomBoolean()) { + highPriorityName = "first"; + lowPriorityName = "second"; + prioritySecond = 1; + priorityFirst = 100; + } else { + lowPriorityName = "first"; + highPriorityName = "second"; + prioritySecond = 100; + priorityFirst = 1; + } + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("first").settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_PRIORITY, priorityFirst)).numberOfShards(2).numberOfReplicas(1)) + .put(IndexMetaData.builder("second").settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_PRIORITY, prioritySecond)).numberOfShards(2).numberOfReplicas(1)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("first")) + .addAsNew(metaData.index("second")) + .build(); + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); + RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState); + clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + + routingTable = allocation.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + } +} diff --git a/core/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java b/core/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java index 380e8b1a57e..6a9608619ff 100644 --- a/core/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java +++ b/core/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java @@ -34,14 +34,21 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.QueryWrapperFilter; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.join.BitSetProducer; +import org.apache.lucene.store.Directory; import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.util.Accountable; import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.junit.Test; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.equalTo; @@ -109,4 +116,91 @@ public class BitSetFilterCacheTests extends ESTestCase { assertThat(cache.getLoadedFilters().size(), equalTo(0l)); } + public void testListener() throws IOException { + IndexWriter writer = new IndexWriter( + new RAMDirectory(), + new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(new LogByteSizeMergePolicy()) + ); + Document document = new Document(); + document.add(new StringField("field", "value", Field.Store.NO)); + writer.addDocument(document); + writer.commit(); + final DirectoryReader writerReader = DirectoryReader.open(writer, false); + final IndexReader reader = randomBoolean() ? writerReader : ElasticsearchDirectoryReader.wrap(writerReader, new ShardId("test", 0)); + + final AtomicLong stats = new AtomicLong(); + final AtomicInteger onCacheCalls = new AtomicInteger(); + final AtomicInteger onRemoveCalls = new AtomicInteger(); + + final BitsetFilterCache cache = new BitsetFilterCache(new Index("test"), Settings.EMPTY); + cache.setListener(new BitsetFilterCache.Listener() { + @Override + public void onCache(ShardId shardId, Accountable accountable) { + onCacheCalls.incrementAndGet(); + stats.addAndGet(accountable.ramBytesUsed()); + if (writerReader != reader) { + assertNotNull(shardId); + assertEquals("test", shardId.index().name()); + assertEquals(0, shardId.id()); + } else { + assertNull(shardId); + } + } + + @Override + public void onRemoval(ShardId shardId, Accountable accountable) { + onRemoveCalls.incrementAndGet(); + stats.addAndGet(-accountable.ramBytesUsed()); + if (writerReader != reader) { + assertNotNull(shardId); + assertEquals("test", shardId.index().name()); + assertEquals(0, shardId.id()); + } else { + assertNull(shardId); + } + } + }); + BitSetProducer filter = cache.getBitSetProducer(new QueryWrapperFilter(new TermQuery(new Term("field", "value")))); + assertThat(matchCount(filter, reader), equalTo(1)); + assertTrue(stats.get() > 0); + assertEquals(1, onCacheCalls.get()); + assertEquals(0, onRemoveCalls.get()); + IOUtils.close(reader, writer); + assertEquals(1, onRemoveCalls.get()); + assertEquals(0, stats.get()); + } + + public void testSetListenerTwice() { + final BitsetFilterCache cache = new BitsetFilterCache(new Index("test"), Settings.EMPTY); + cache.setListener(new BitsetFilterCache.Listener() { + + @Override + public void onCache(ShardId shardId, Accountable accountable) { + + } + + @Override + public void onRemoval(ShardId shardId, Accountable accountable) { + + } + }); + try { + cache.setListener(new BitsetFilterCache.Listener() { + + @Override + public void onCache(ShardId shardId, Accountable accountable) { + + } + + @Override + public void onRemoval(ShardId shardId, Accountable accountable) { + + } + }); + fail("can't set it twice"); + } catch (IllegalStateException ex) { + // all is well + } + } + } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index deebc4511c0..0c154daf022 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -41,6 +41,7 @@ import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityIT; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Base64; @@ -94,6 +95,7 @@ import java.nio.file.Path; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS; @@ -1000,8 +1002,7 @@ public class InternalEngineTests extends ESTestCase { indexed.countDown(); try { engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean()); - } catch (ForceMergeFailedEngineException ex) { - // ok + } catch (IOException e) { return; } } @@ -2019,4 +2020,42 @@ public class InternalEngineTests extends ESTestCase { assertThat(topDocs.totalHits, equalTo(numDocs)); } } + + public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException { + AtomicReference throwable = new AtomicReference<>(); + String operation = randomFrom("optimize", "refresh", "flush"); + Thread mergeThread = new Thread() { + @Override + public void run() { + boolean stop = false; + logger.info("try with {}", operation); + while (stop == false) { + try { + switch (operation) { + case "optimize": { + engine.forceMerge(true, 1, false, false, false); + break; + } + case "refresh": { + engine.refresh("test refresh"); + break; + } + case "flush": { + engine.flush(true, false); + break; + } + } + } catch (Throwable t) { + throwable.set(t); + stop = true; + } + } + } + }; + mergeThread.start(); + engine.close(); + mergeThread.join(); + logger.info("exception caught: ", throwable.get()); + assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(throwable.get())); + } } diff --git a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataImplTestCase.java b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataImplTestCase.java index a0f51a71113..ff072233ea6 100644 --- a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataImplTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataImplTestCase.java @@ -19,8 +19,8 @@ package org.elasticsearch.index.fielddata; -import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.*; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.Strings; @@ -273,5 +273,4 @@ public abstract class AbstractFieldDataImplTestCase extends AbstractFieldDataTes } protected abstract void fillExtendedMvSet() throws Exception; - } diff --git a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java index a4f000ebc86..d42c86cbc03 100644 --- a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataTestCase.java @@ -54,7 +54,6 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase { protected LeafReaderContext readerContext; protected IndexReader topLevelReader; protected IndicesFieldDataCache indicesFieldDataCache; - protected abstract FieldDataType getFieldDataType(); protected boolean hasDocValues() { @@ -109,11 +108,12 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase { writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(new LogByteSizeMergePolicy())); } - protected LeafReaderContext refreshReader() throws Exception { + protected final LeafReaderContext refreshReader() throws Exception { if (readerContext != null) { readerContext.reader().close(); } - LeafReader reader = SlowCompositeReaderWrapper.wrap(topLevelReader = DirectoryReader.open(writer, true)); + topLevelReader = DirectoryReader.open(writer, true); + LeafReader reader = SlowCompositeReaderWrapper.wrap(topLevelReader); readerContext = reader.getContext(); return readerContext; } @@ -150,8 +150,5 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase { } previous = current; } - - } - } diff --git a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractStringFieldDataTestCase.java b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractStringFieldDataTestCase.java index f17f20dcc77..6c3054f379f 100644 --- a/core/src/test/java/org/elasticsearch/index/fielddata/AbstractStringFieldDataTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/fielddata/AbstractStringFieldDataTestCase.java @@ -26,11 +26,7 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.document.StringField; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.RandomAccessOrds; -import org.apache.lucene.index.Term; -import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.index.*; import org.apache.lucene.search.Filter; import org.apache.lucene.search.FilteredQuery; import org.apache.lucene.search.IndexSearcher; diff --git a/core/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java b/core/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java index 002a82ef355..4c3dd025aac 100644 --- a/core/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/fielddata/IndexFieldDataServiceTests.java @@ -25,6 +25,9 @@ import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.StringField; import org.apache.lucene.index.*; import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.util.Accountable; +import org.elasticsearch.common.lucene.index.ESDirectoryReaderTests; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.fielddata.plain.*; import org.elasticsearch.index.mapper.ContentPath; @@ -33,12 +36,16 @@ import org.elasticsearch.index.mapper.Mapper.BuilderContext; import org.elasticsearch.index.mapper.MapperBuilders; import org.elasticsearch.index.mapper.core.*; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.test.ESSingleNodeTestCase; import java.util.Arrays; import java.util.Collections; import java.util.IdentityHashMap; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.instanceOf; @@ -158,4 +165,77 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase { writer.getDirectory().close(); } + public void testFieldDataCacheListener() throws Exception { + final IndexService indexService = createIndex("test"); + IndexFieldDataService shardPrivateService = indexService.fieldData(); + // copy the ifdService since we can set the listener only once. + final IndexFieldDataService ifdService = new IndexFieldDataService(shardPrivateService.index(), shardPrivateService.indexSettings(), + getInstanceFromNode(IndicesFieldDataCache.class), getInstanceFromNode(CircuitBreakerService.class), indexService.mapperService()); + + final BuilderContext ctx = new BuilderContext(indexService.settingsService().getSettings(), new ContentPath(1)); + final MappedFieldType mapper1 = MapperBuilders.stringField("s").tokenized(false).docValues(true).fieldDataSettings(Settings.builder().put(FieldDataType.FORMAT_KEY, "paged_bytes").build()).build(ctx).fieldType(); + final IndexWriter writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(new KeywordAnalyzer())); + Document doc = new Document(); + doc.add(new StringField("s", "thisisastring", Store.NO)); + writer.addDocument(doc); + DirectoryReader open = DirectoryReader.open(writer, true); + final boolean wrap = randomBoolean(); + final IndexReader reader = wrap ? ElasticsearchDirectoryReader.wrap(open, new ShardId("test", 1)) : open; + final AtomicInteger onCacheCalled = new AtomicInteger(); + final AtomicInteger onRemovalCalled = new AtomicInteger(); + ifdService.setListener(new IndexFieldDataCache.Listener() { + @Override + public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) { + if (wrap) { + assertEquals(new ShardId("test", 1), shardId); + } else { + assertNull(shardId); + } + onCacheCalled.incrementAndGet(); + } + + @Override + public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) { + if (wrap) { + assertEquals(new ShardId("test", 1), shardId); + } else { + assertNull(shardId); + } + onRemovalCalled.incrementAndGet(); + } + }); + IndexFieldData ifd = ifdService.getForField(mapper1); + LeafReaderContext leafReaderContext = reader.getContext().leaves().get(0); + AtomicFieldData load = ifd.load(leafReaderContext); + assertEquals(1, onCacheCalled.get()); + assertEquals(0, onRemovalCalled.get()); + reader.close(); + load.close(); + writer.close(); + assertEquals(1, onCacheCalled.get()); + assertEquals(1, onRemovalCalled.get()); + ifdService.clear(); + } + + public void testSetCacheListenerTwice() { + final IndexService indexService = createIndex("test"); + IndexFieldDataService shardPrivateService = indexService.fieldData(); + try { + shardPrivateService.setListener(new IndexFieldDataCache.Listener() { + @Override + public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) { + + } + + @Override + public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) { + + } + }); + fail("listener already set"); + } catch (IllegalStateException ex) { + // all well + } + } + } diff --git a/core/src/test/java/org/elasticsearch/index/search/child/ChildrenConstantScoreQueryTests.java b/core/src/test/java/org/elasticsearch/index/search/child/ChildrenConstantScoreQueryTests.java index 99a9799e0ed..89fa1b5f389 100644 --- a/core/src/test/java/org/elasticsearch/index/search/child/ChildrenConstantScoreQueryTests.java +++ b/core/src/test/java/org/elasticsearch/index/search/child/ChildrenConstantScoreQueryTests.java @@ -25,16 +25,7 @@ import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.PostingsEnum; -import org.apache.lucene.index.RandomIndexWriter; -import org.apache.lucene.index.SlowCompositeReaderWrapper; -import org.apache.lucene.index.Term; -import org.apache.lucene.index.Terms; -import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.index.*; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Filter; import org.apache.lucene.search.IndexSearcher; diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java index 2f8103fd9b7..56eb619cc80 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.mapper.internal.TypeFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketCollector; @@ -113,7 +114,7 @@ public class NestedAggregatorTests extends ESSingleNodeTestCase { indexWriter.commit(); indexWriter.close(); - DirectoryReader directoryReader = DirectoryReader.open(directory); + DirectoryReader directoryReader = DirectoryReader.open(directory); IndexSearcher searcher = new IndexSearcher(directoryReader); IndexService indexService = createIndex("test"); diff --git a/dev-tools/validate-maven-repository.py b/dev-tools/validate-maven-repository.py new file mode 100644 index 00000000000..cd457e118e9 --- /dev/null +++ b/dev-tools/validate-maven-repository.py @@ -0,0 +1,130 @@ +# Licensed to Elasticsearch under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on +# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +# Helper python script to check if a sonatype staging repo contains +# all the required files compared to a local repository +# +# The script does the following steps +# +# 1. Scans the local maven repo for all files in /org/elasticsearch +# 2. Opens a HTTP connection to the staging repo +# 3. Executes a HEAD request for each file found in step one +# 4. Compares the content-length response header with the real file size +# 5. Return an error if those two numbers differ +# +# A pre requirement to run this, is to find out via the oss.sonatype.org web UI, how that repo is named +# - After logging in you go to 'Staging repositories' and search for the one you just created +# - Click into the `Content` tab +# - Open any artifact (not a directory) +# - Copy the link of `Repository Path` on the right and reuse that part of the URL +# +# Alternatively you can just use the name of the repository and reuse the rest (ie. the repository +# named for the example below would have been named orgelasticsearch-1012) +# +# +# Example call +# python dev-tools/validate-maven-repository.py /path/to/repo/org/elasticsearch/ \ +# https://oss.sonatype.org/service/local/repositories/orgelasticsearch-1012/content/org/elasticsearch + +import sys +import os +import httplib +import urlparse +import re + +# Draw a simple progress bar, a couple of hundred HEAD requests might take a while +# Note, when drawing this, it uses the carriage return character, so you should not +# write anything in between +def drawProgressBar(percent, barLen = 40): + sys.stdout.write("\r") + progress = "" + for i in range(barLen): + if i < int(barLen * percent): + progress += "=" + else: + progress += " " + sys.stdout.write("[ %s ] %.2f%%" % (progress, percent * 100)) + sys.stdout.flush() + +if __name__ == "__main__": + if len(sys.argv) != 3: + print 'Usage: %s [user:pass]' % (sys.argv[0]) + print '' + print 'Example: %s /tmp/my-maven-repo/org/elasticsearch https://oss.sonatype.org/service/local/repositories/orgelasticsearch-1012/content/org/elasticsearch' % (sys.argv[0]) + else: + sys.argv[1] = re.sub('/$', '', sys.argv[1]) + sys.argv[2] = re.sub('/$', '', sys.argv[2]) + + localMavenRepo = sys.argv[1] + endpoint = sys.argv[2] + + filesToCheck = [] + foundSignedFiles = False + + for root, dirs, files in os.walk(localMavenRepo): + for file in files: + # no metadata files (they get renamed from maven-metadata-local.xml to maven-metadata.xml while deploying) + # no .properties and .repositories files (they dont get uploaded) + if not file.startswith('maven-metadata') and not file.endswith('.properties') and not file.endswith('.repositories'): + filesToCheck.append(os.path.join(root, file)) + if file.endswith('.asc'): + foundSignedFiles = True + + print "Need to check %i files" % len(filesToCheck) + if not foundSignedFiles: + print '### Warning: No signed .asc files found' + + # set up http + parsed_uri = urlparse.urlparse(endpoint) + domain = parsed_uri.netloc + if parsed_uri.scheme == 'https': + conn = httplib.HTTPSConnection(domain) + else: + conn = httplib.HTTPConnection(domain) + #conn.set_debuglevel(5) + + drawProgressBar(0) + errors = [] + for idx, file in enumerate(filesToCheck): + request_uri = parsed_uri.path + file[len(localMavenRepo):] + conn.request("HEAD", request_uri) + res = conn.getresponse() + res.read() # useless call for head, but prevents httplib.ResponseNotReady raise + + absolute_url = parsed_uri.scheme + '://' + parsed_uri.netloc + request_uri + if res.status == 200: + content_length = res.getheader('content-length') + local_file_size = os.path.getsize(file) + if int(content_length) != int(local_file_size): + errors.append('LENGTH MISMATCH: %s differs in size. local %s <=> %s remote' % (absolute_url, content_length, local_file_size)) + elif res.status == 404: + errors.append('MISSING: %s' % absolute_url) + elif res.status == 301 or res.status == 302: + errors.append('REDIRECT: %s to %s' % (absolute_url, res.getheader('location'))) + else: + errors.append('ERROR: %s http response: %s %s' %(absolute_url, res.status, res.reason)) + + # update progressbar at the end + drawProgressBar((idx+1)/float(len(filesToCheck))) + + print + + if len(errors) != 0: + print 'The following errors occured (%s out of %s files)' % (len(errors), len(filesToCheck)) + print + for error in errors: + print error + sys.exit(-1)