[STORE] Expose ShardId via LeafReader rather than Direcotry API
Today we try to fetch a shard Id for a given IndexReader / LeafReader by walking it's tree until the lucene internal SegmentReader and then casting the directory into a StoreDirecotory. This class is fully internal to Elasticsearch and should not be exposed outside of the Store. This commit makes StoreDirectory a private inner class and adds dedicated ElasticsearchDirectoryReader / ElasticserachLeafReader exposing a ShardId getter to obtain information about the shard the index / segment belogs to. These classes can be used to expose other segment specific information in the future more easily.
This commit is contained in:
parent
a43259eba4
commit
8d7ce3c558
|
@ -1,68 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Elasticsearch under one or more contributor
|
|
||||||
* license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright
|
|
||||||
* ownership. Elasticsearch licenses this file to you under
|
|
||||||
* the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
* not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.elasticsearch.common.lucene;
|
|
||||||
|
|
||||||
import org.apache.lucene.index.FilterLeafReader;
|
|
||||||
import org.apache.lucene.index.LeafReader;
|
|
||||||
import org.apache.lucene.index.SegmentReader;
|
|
||||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
|
||||||
import org.elasticsearch.common.Nullable;
|
|
||||||
|
|
||||||
public class SegmentReaderUtils {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tries to extract a segment reader from the given index reader.
|
|
||||||
* If no SegmentReader can be extracted an {@link org.elasticsearch.ElasticsearchIllegalStateException} is thrown.
|
|
||||||
*/
|
|
||||||
@Nullable
|
|
||||||
public static SegmentReader segmentReader(LeafReader reader) {
|
|
||||||
return internalSegmentReader(reader, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tries to extract a segment reader from the given index reader and returns it, otherwise <code>null</code>
|
|
||||||
* is returned
|
|
||||||
*/
|
|
||||||
@Nullable
|
|
||||||
public static SegmentReader segmentReaderOrNull(LeafReader reader) {
|
|
||||||
return internalSegmentReader(reader, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean registerCoreListener(LeafReader reader, SegmentReader.CoreClosedListener listener) {
|
|
||||||
reader.addCoreClosedListener(listener);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static SegmentReader internalSegmentReader(LeafReader reader, boolean fail) {
|
|
||||||
if (reader == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (reader instanceof SegmentReader) {
|
|
||||||
return (SegmentReader) reader;
|
|
||||||
} else if (reader instanceof FilterLeafReader) {
|
|
||||||
final FilterLeafReader fReader = (FilterLeafReader) reader;
|
|
||||||
return segmentReader(FilterLeafReader.unwrap(fReader));
|
|
||||||
}
|
|
||||||
if (fail) {
|
|
||||||
// hard fail - we can't get a SegmentReader
|
|
||||||
throw new ElasticsearchIllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.common.lucene.index;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
|
import org.apache.lucene.index.FilterDirectoryReader;
|
||||||
|
import org.apache.lucene.index.FilterLeafReader;
|
||||||
|
import org.apache.lucene.index.LeafReader;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes
|
||||||
|
* Elasticsearch internal per shard / index information like the shard ID.
|
||||||
|
*/
|
||||||
|
public final class ElasticsearchDirectoryReader extends FilterDirectoryReader {
|
||||||
|
|
||||||
|
private final ShardId shardId;
|
||||||
|
private final FilterDirectoryReader.SubReaderWrapper wrapper;
|
||||||
|
|
||||||
|
private ElasticsearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId) {
|
||||||
|
super(in, wrapper);
|
||||||
|
this.wrapper = wrapper;
|
||||||
|
this.shardId = shardId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the shard id this index belongs to.
|
||||||
|
*/
|
||||||
|
public ShardId shardId() {
|
||||||
|
return this.shardId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) {
|
||||||
|
return new ElasticsearchDirectoryReader(in, wrapper, shardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps the given reader in a {@link org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader} as
|
||||||
|
* well as all it's sub-readers in {@link org.elasticsearch.common.lucene.index.ElasticsearchLeafReader} to
|
||||||
|
* expose the given shard Id.
|
||||||
|
*
|
||||||
|
* @param reader the reader to wrap
|
||||||
|
* @param shardId the shard ID to expose via the elasticsearch internal reader wrappers.
|
||||||
|
*/
|
||||||
|
public static ElasticsearchDirectoryReader wrap(DirectoryReader reader, ShardId shardId) {
|
||||||
|
return new ElasticsearchDirectoryReader(reader, new SubReaderWrapper(shardId), shardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final static class SubReaderWrapper extends FilterDirectoryReader.SubReaderWrapper {
|
||||||
|
private final ShardId shardId;
|
||||||
|
SubReaderWrapper(ShardId shardId) {
|
||||||
|
this.shardId = shardId;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public LeafReader wrap(LeafReader reader) {
|
||||||
|
return new ElasticsearchLeafReader(reader, shardId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,52 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.common.lucene.index;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.DirectoryReader;
|
||||||
|
import org.apache.lucene.index.FilterDirectoryReader;
|
||||||
|
import org.apache.lucene.index.FilterLeafReader;
|
||||||
|
import org.apache.lucene.index.LeafReader;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link org.apache.lucene.index.FilterLeafReader} that exposes
|
||||||
|
* Elasticsearch internal per shard / index information like the shard ID.
|
||||||
|
*/
|
||||||
|
public final class ElasticsearchLeafReader extends FilterLeafReader {
|
||||||
|
|
||||||
|
private final ShardId shardId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Construct a FilterLeafReader based on the specified base reader.
|
||||||
|
* <p>Note that base reader is closed if this FilterLeafReader is closed.</p>
|
||||||
|
*
|
||||||
|
* @param in specified base reader.
|
||||||
|
*/
|
||||||
|
ElasticsearchLeafReader(LeafReader in, ShardId shardId) {
|
||||||
|
super(in);
|
||||||
|
this.shardId = shardId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the shard id this segment belongs to.
|
||||||
|
*/
|
||||||
|
public ShardId shardId() {
|
||||||
|
return this.shardId;
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.component.CloseableComponent;
|
import org.elasticsearch.common.component.CloseableComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.lucene.SegmentReaderUtils;
|
|
||||||
import org.elasticsearch.common.lucene.search.NoCacheFilter;
|
import org.elasticsearch.common.lucene.search.NoCacheFilter;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
@ -130,7 +129,7 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
|
||||||
Cache<Filter, Value> filterToFbs = loadedFilters.get(coreCacheReader, new Callable<Cache<Filter, Value>>() {
|
Cache<Filter, Value> filterToFbs = loadedFilters.get(coreCacheReader, new Callable<Cache<Filter, Value>>() {
|
||||||
@Override
|
@Override
|
||||||
public Cache<Filter, Value> call() throws Exception {
|
public Cache<Filter, Value> call() throws Exception {
|
||||||
SegmentReaderUtils.registerCoreListener(context.reader(), BitsetFilterCache.this);
|
context.reader().addCoreClosedListener(BitsetFilterCache.this);
|
||||||
return CacheBuilder.newBuilder().build();
|
return CacheBuilder.newBuilder().build();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -34,7 +34,6 @@ import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.lucene.SegmentReaderUtils;
|
|
||||||
import org.elasticsearch.common.lucene.docset.DocIdSets;
|
import org.elasticsearch.common.lucene.docset.DocIdSets;
|
||||||
import org.elasticsearch.common.lucene.search.CachedFilter;
|
import org.elasticsearch.common.lucene.search.CachedFilter;
|
||||||
import org.elasticsearch.common.lucene.search.NoCacheFilter;
|
import org.elasticsearch.common.lucene.search.NoCacheFilter;
|
||||||
|
@ -169,7 +168,7 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte
|
||||||
Boolean previous = cache.seenReaders.putIfAbsent(context.reader().getCoreCacheKey(), Boolean.TRUE);
|
Boolean previous = cache.seenReaders.putIfAbsent(context.reader().getCoreCacheKey(), Boolean.TRUE);
|
||||||
if (previous == null) {
|
if (previous == null) {
|
||||||
// we add a core closed listener only, for non core IndexReaders we rely on clear being called (percolator for example)
|
// we add a core closed listener only, for non core IndexReaders we rely on clear being called (percolator for example)
|
||||||
SegmentReaderUtils.registerCoreListener(context.reader(), cache);
|
context.reader().addCoreClosedListener(cache);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// we can't pass down acceptedDocs provided, because we are caching the result, and acceptedDocs
|
// we can't pass down acceptedDocs provided, because we are caching the result, and acceptedDocs
|
||||||
|
|
|
@ -20,19 +20,8 @@
|
||||||
package org.elasticsearch.index.engine.internal;
|
package org.elasticsearch.index.engine.internal;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.lucene.index.IndexReader;
|
import org.apache.lucene.index.*;
|
||||||
import org.apache.lucene.index.IndexWriter;
|
|
||||||
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
|
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
|
||||||
import org.apache.lucene.index.IndexWriterConfig;
|
|
||||||
import org.apache.lucene.index.LeafReader;
|
|
||||||
import org.apache.lucene.index.LeafReaderContext;
|
|
||||||
import org.apache.lucene.index.LiveIndexWriterConfig;
|
|
||||||
import org.apache.lucene.index.MergePolicy;
|
|
||||||
import org.apache.lucene.index.MultiReader;
|
|
||||||
import org.apache.lucene.index.SegmentCommitInfo;
|
|
||||||
import org.apache.lucene.index.SegmentInfos;
|
|
||||||
import org.apache.lucene.index.SegmentReader;
|
|
||||||
import org.apache.lucene.index.Term;
|
|
||||||
import org.apache.lucene.search.FilteredQuery;
|
import org.apache.lucene.search.FilteredQuery;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.search.Query;
|
import org.apache.lucene.search.Query;
|
||||||
|
@ -54,7 +43,7 @@ import org.elasticsearch.common.lease.Releasables;
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.lucene.LoggerInfoStream;
|
import org.elasticsearch.common.lucene.LoggerInfoStream;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.common.lucene.SegmentReaderUtils;
|
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||||
import org.elasticsearch.common.lucene.uid.Versions;
|
import org.elasticsearch.common.lucene.uid.Versions;
|
||||||
import org.elasticsearch.common.math.MathUtils;
|
import org.elasticsearch.common.math.MathUtils;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -1208,8 +1197,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long getReaderRamBytesUsed(LeafReaderContext reader) {
|
private static long getReaderRamBytesUsed(LeafReaderContext reader) {
|
||||||
final SegmentReader segmentReader = SegmentReaderUtils.segmentReader(reader.reader());
|
return segmentReader(reader.reader()).ramBytesUsed();
|
||||||
return segmentReader.ramBytesUsed();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1239,8 +1227,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
Searcher searcher = acquireSearcher("segments");
|
Searcher searcher = acquireSearcher("segments");
|
||||||
try {
|
try {
|
||||||
for (LeafReaderContext reader : searcher.reader().leaves()) {
|
for (LeafReaderContext reader : searcher.reader().leaves()) {
|
||||||
assert reader.reader() instanceof SegmentReader;
|
SegmentCommitInfo info = segmentReader(reader.reader()).getSegmentInfo();
|
||||||
SegmentCommitInfo info = SegmentReaderUtils.segmentReader(reader.reader()).getSegmentInfo();
|
|
||||||
assert !segments.containsKey(info.info.name);
|
assert !segments.containsKey(info.info.name);
|
||||||
Segment segment = new Segment(info.info.name);
|
Segment segment = new Segment(info.info.name);
|
||||||
segment.search = true;
|
segment.search = true;
|
||||||
|
@ -1425,7 +1412,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
*/
|
*/
|
||||||
private static boolean isMergedSegment(LeafReader reader) {
|
private static boolean isMergedSegment(LeafReader reader) {
|
||||||
// We expect leaves to be segment readers
|
// We expect leaves to be segment readers
|
||||||
final Map<String, String> diagnostics = SegmentReaderUtils.segmentReader(reader).getSegmentInfo().info.getDiagnostics();
|
final Map<String, String> diagnostics = segmentReader(reader).getSegmentInfo().info.getDiagnostics();
|
||||||
final String source = diagnostics.get(IndexWriter.SOURCE);
|
final String source = diagnostics.get(IndexWriter.SOURCE);
|
||||||
assert Arrays.asList(IndexWriter.SOURCE_ADDINDEXES_READERS, IndexWriter.SOURCE_FLUSH, IndexWriter.SOURCE_MERGE).contains(source) : "Unknown source " + source;
|
assert Arrays.asList(IndexWriter.SOURCE_ADDINDEXES_READERS, IndexWriter.SOURCE_FLUSH, IndexWriter.SOURCE_MERGE).contains(source) : "Unknown source " + source;
|
||||||
return IndexWriter.SOURCE_MERGE.equals(source);
|
return IndexWriter.SOURCE_MERGE.equals(source);
|
||||||
|
@ -1546,7 +1533,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
}
|
}
|
||||||
|
|
||||||
private SearcherManager buildSearchManager(IndexWriter indexWriter) throws IOException {
|
private SearcherManager buildSearchManager(IndexWriter indexWriter) throws IOException {
|
||||||
return new SearcherManager(indexWriter, true, searcherFactory);
|
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter, true), shardId);
|
||||||
|
return new SearcherManager(directoryReader, searcherFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
class EngineSearcher implements Searcher {
|
class EngineSearcher implements Searcher {
|
||||||
|
@ -1814,4 +1802,19 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
throw new UnsupportedOperationException("NoOpLock can't provide a condition");
|
throw new UnsupportedOperationException("NoOpLock can't provide a condition");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tries to extract a segment reader from the given index reader.
|
||||||
|
* If no SegmentReader can be extracted an {@link org.elasticsearch.ElasticsearchIllegalStateException} is thrown.
|
||||||
|
*/
|
||||||
|
private static SegmentReader segmentReader(LeafReader reader) {
|
||||||
|
if (reader instanceof SegmentReader) {
|
||||||
|
return (SegmentReader) reader;
|
||||||
|
} else if (reader instanceof FilterLeafReader) {
|
||||||
|
final FilterLeafReader fReader = (FilterLeafReader) reader;
|
||||||
|
return segmentReader(FilterLeafReader.unwrap(fReader));
|
||||||
|
}
|
||||||
|
// hard fail - we can't get a SegmentReader
|
||||||
|
throw new ElasticsearchIllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,47 +19,65 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.shard;
|
package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
import org.apache.lucene.index.LeafReader;
|
import org.apache.lucene.index.*;
|
||||||
import org.apache.lucene.index.IndexReader;
|
|
||||||
import org.apache.lucene.index.SegmentReader;
|
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.lucene.SegmentReaderUtils;
|
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||||
import org.elasticsearch.index.store.DirectoryUtils;
|
import org.elasticsearch.common.lucene.index.ElasticsearchLeafReader;
|
||||||
import org.elasticsearch.index.store.Store;
|
|
||||||
|
|
||||||
/**
|
public final class ShardUtils {
|
||||||
*/
|
|
||||||
public class ShardUtils {
|
private ShardUtils() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tries to extract the shard id from a reader if possible, when its not possible,
|
* Tries to extract the shard id from a reader if possible, when its not possible,
|
||||||
* will return null. This method requires the reader to be a {@link SegmentReader}
|
* will return null.
|
||||||
* and the directory backing it to be {@link org.elasticsearch.index.store.Store.StoreDirectory}.
|
|
||||||
* This will be the case in almost all cases, except for percolator currently.
|
|
||||||
*/
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public static ShardId extractShardId(LeafReader reader) {
|
public static ShardId extractShardId(LeafReader reader) {
|
||||||
return extractShardId(SegmentReaderUtils.segmentReaderOrNull(reader));
|
final ElasticsearchLeafReader esReader = getElasticsearchLeafReader(reader);
|
||||||
}
|
if (esReader != null) {
|
||||||
|
assert reader.getRefCount() > 0 : "ElasticsearchLeafReader is already closed";
|
||||||
@Nullable
|
return esReader.shardId();
|
||||||
private static ShardId extractShardId(SegmentReader reader) {
|
|
||||||
if (reader != null) {
|
|
||||||
assert reader.getRefCount() > 0 : "SegmentReader is already closed";
|
|
||||||
// reader.directory doesn't call ensureOpen for internal reasons.
|
|
||||||
Store.StoreDirectory storeDir = DirectoryUtils.getStoreDirectory(reader.directory());
|
|
||||||
if (storeDir != null) {
|
|
||||||
return storeDir.shardId();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tries to extract the shard id from a reader if possible, when its not possible,
|
||||||
|
* will return null.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
public static ShardId extractShardId(IndexReader reader) {
|
public static ShardId extractShardId(IndexReader reader) {
|
||||||
|
final ElasticsearchDirectoryReader esReader = getElasticsearchDirectoryReader(reader);
|
||||||
|
if (esReader != null) {
|
||||||
|
return esReader.shardId();
|
||||||
|
}
|
||||||
if (!reader.leaves().isEmpty()) {
|
if (!reader.leaves().isEmpty()) {
|
||||||
return extractShardId(reader.leaves().get(0).reader());
|
return extractShardId(reader.leaves().get(0).reader());
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static ElasticsearchLeafReader getElasticsearchLeafReader(LeafReader reader) {
|
||||||
|
if (reader instanceof FilterLeafReader) {
|
||||||
|
if (reader instanceof ElasticsearchLeafReader) {
|
||||||
|
return (ElasticsearchLeafReader) reader;
|
||||||
|
} else {
|
||||||
|
return getElasticsearchLeafReader(FilterLeafReader.unwrap(reader));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ElasticsearchDirectoryReader getElasticsearchDirectoryReader(IndexReader reader) {
|
||||||
|
if (reader instanceof FilterDirectoryReader) {
|
||||||
|
if (reader instanceof ElasticsearchDirectoryReader) {
|
||||||
|
return (ElasticsearchDirectoryReader) reader;
|
||||||
|
} else {
|
||||||
|
return null; // lucene needs a getDelegate method on FilteredDirectoryReader - not a big deal here
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -805,7 +805,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
||||||
for (String storeFile : store.directory().listAll()) {
|
for (String storeFile : store.directory().listAll()) {
|
||||||
if (!Store.isChecksum(storeFile) && !snapshot.containPhysicalIndexFile(storeFile)) {
|
if (!Store.isChecksum(storeFile) && !snapshot.containPhysicalIndexFile(storeFile)) {
|
||||||
try {
|
try {
|
||||||
store.logDeleteFile("restore", storeFile);
|
store.deleteFile("restore", storeFile);
|
||||||
store.directory().deleteFile(storeFile);
|
store.directory().deleteFile(storeFile);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// ignore
|
// ignore
|
||||||
|
|
|
@ -31,25 +31,6 @@ public final class DirectoryUtils {
|
||||||
|
|
||||||
private DirectoryUtils() {} // no instance
|
private DirectoryUtils() {} // no instance
|
||||||
|
|
||||||
/**
|
|
||||||
* Try and extract a store directory out of a directory, tries to take into
|
|
||||||
* account the fact that a directory is a filter directory, and/or a compound dir.
|
|
||||||
*/
|
|
||||||
@Nullable
|
|
||||||
public static Store.StoreDirectory getStoreDirectory(Directory dir) {
|
|
||||||
Directory current = dir;
|
|
||||||
while (true) {
|
|
||||||
if (current instanceof Store.StoreDirectory) {
|
|
||||||
return (Store.StoreDirectory) current;
|
|
||||||
}
|
|
||||||
if (current instanceof FilterDirectory) {
|
|
||||||
current = ((FilterDirectory) current).getDelegate();
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static final Directory getLeafDirectory(FilterDirectory dir) {
|
static final Directory getLeafDirectory(FilterDirectory dir) {
|
||||||
Directory current = dir.getDelegate();
|
Directory current = dir.getDelegate();
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||||
public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, Distributor distributor, ShardLock shardLock) throws IOException {
|
public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, Distributor distributor, ShardLock shardLock) throws IOException {
|
||||||
super(shardId, indexSettings);
|
super(shardId, indexSettings);
|
||||||
this.directoryService = directoryService;
|
this.directoryService = directoryService;
|
||||||
this.directory = new StoreDirectory(directoryService.newFromDistributor(distributor));
|
this.directory = new StoreDirectory(directoryService.newFromDistributor(distributor), Loggers.getLogger("index.store.deletes", indexSettings, shardId));
|
||||||
this.shardLock = shardLock;
|
this.shardLock = shardLock;
|
||||||
assert shardLock != null;
|
assert shardLock != null;
|
||||||
assert shardLock.getShardId().equals(shardId);
|
assert shardLock.getShardId().equals(shardId);
|
||||||
|
@ -547,12 +547,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||||
failIfCorrupted();
|
failIfCorrupted();
|
||||||
metadataLock.writeLock().lock();
|
metadataLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
final Directory dir = directory();
|
final StoreDirectory dir = directory;
|
||||||
for (String existingFile : dir.listAll()) {
|
for (String existingFile : dir.listAll()) {
|
||||||
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum)
|
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum)
|
||||||
if (!sourceMetaData.contains(existingFile) && !Store.isChecksum(existingFile)) {
|
if (!sourceMetaData.contains(existingFile) && !Store.isChecksum(existingFile)) {
|
||||||
try {
|
try {
|
||||||
logDeleteFile(reason, existingFile);
|
dir.deleteFile(reason, existingFile);
|
||||||
dir.deleteFile(existingFile);
|
dir.deleteFile(existingFile);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// ignore, we don't really care, will get deleted later on
|
// ignore, we don't really care, will get deleted later on
|
||||||
|
@ -603,21 +603,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private static final class StoreDirectory extends FilterDirectory {
|
||||||
* This exists so {@link org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat} can load its boolean setting; can we find a more straightforward way?
|
|
||||||
*/
|
|
||||||
public final class StoreDirectory extends FilterDirectory {
|
|
||||||
|
|
||||||
public final ESLogger deletesLogger;
|
private final ESLogger deletesLogger;
|
||||||
|
|
||||||
StoreDirectory(Directory delegateDirectory) throws IOException {
|
StoreDirectory(Directory delegateDirectory, ESLogger deletesLogger) throws IOException {
|
||||||
super(delegateDirectory);
|
super(delegateDirectory);
|
||||||
deletesLogger = Loggers.getLogger("index.store.deletes", indexSettings, shardId);
|
this.deletesLogger = deletesLogger;
|
||||||
}
|
|
||||||
|
|
||||||
public ShardId shardId() {
|
|
||||||
ensureOpen();
|
|
||||||
return Store.this.shardId();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -625,10 +617,14 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||||
assert false : "Nobody should close this directory except of the Store itself";
|
assert false : "Nobody should close this directory except of the Store itself";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void deleteFile(String msg, String name) throws IOException {
|
||||||
|
deletesLogger.trace("{}: delete file {}", msg, name);
|
||||||
|
super.deleteFile(name);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteFile(String name) throws IOException {
|
public void deleteFile(String name) throws IOException {
|
||||||
logDeleteFile("StoreDirectory.deleteFile", name);
|
deleteFile("StoreDirectory.deleteFile", name);
|
||||||
super.deleteFile(name);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void innerClose() throws IOException {
|
private void innerClose() throws IOException {
|
||||||
|
@ -642,17 +638,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Log that we are about to delete this file, to the index.store.deletes component. */
|
/** Log that we are about to delete this file, to the index.store.deletes component. */
|
||||||
public void logDeleteFile(String message, String fileName) {
|
public void deleteFile(String msg, String storeFile) throws IOException {
|
||||||
logDeleteFile(directory(), message, fileName);
|
directory.deleteFile(msg, storeFile);
|
||||||
}
|
|
||||||
|
|
||||||
/** Log that we are about to delete this file, to the index.store.deletes component. */
|
|
||||||
public static void logDeleteFile(Directory dir, String message, String fileName) {
|
|
||||||
assert dir instanceof StoreDirectory;
|
|
||||||
if (dir instanceof StoreDirectory) {
|
|
||||||
((StoreDirectory) dir).deletesLogger.trace("{}: delete file {}", message, fileName);
|
|
||||||
}
|
|
||||||
// else what to do...?
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.lucene.SegmentReaderUtils;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
@ -169,7 +168,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||||
final Accountable accountable = cache.get(key, new Callable<AtomicFieldData>() {
|
final Accountable accountable = cache.get(key, new Callable<AtomicFieldData>() {
|
||||||
@Override
|
@Override
|
||||||
public AtomicFieldData call() throws Exception {
|
public AtomicFieldData call() throws Exception {
|
||||||
SegmentReaderUtils.registerCoreListener(context.reader(), IndexFieldCache.this);
|
context.reader().addCoreClosedListener(IndexFieldCache.this);
|
||||||
|
|
||||||
key.listeners.add(indicesFieldDataCacheListener);
|
key.listeners.add(indicesFieldDataCacheListener);
|
||||||
final ShardId shardId = ShardUtils.extractShardId(context.reader());
|
final ShardId shardId = ShardUtils.extractShardId(context.reader());
|
||||||
|
|
|
@ -65,6 +65,7 @@ import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
|
||||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.index.shard.ShardUtils;
|
||||||
import org.elasticsearch.index.similarity.SimilarityService;
|
import org.elasticsearch.index.similarity.SimilarityService;
|
||||||
import org.elasticsearch.index.store.DirectoryService;
|
import org.elasticsearch.index.store.DirectoryService;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
|
@ -1417,4 +1418,13 @@ public class InternalEngineTests extends ElasticsearchTestCase {
|
||||||
protected Term newUid(String id) {
|
protected Term newUid(String id) {
|
||||||
return new Term("_uid", id);
|
return new Term("_uid", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExtractShardId() {
|
||||||
|
try (Engine.Searcher test = this.engine.acquireSearcher("test")) {
|
||||||
|
ShardId shardId = ShardUtils.extractShardId(test.reader());
|
||||||
|
assertNotNull(shardId);
|
||||||
|
assertEquals(shardId, engine.shardId());
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.lucene.document.Document;
|
||||||
|
import org.apache.lucene.document.StringField;
|
||||||
|
import org.apache.lucene.index.*;
|
||||||
|
import org.apache.lucene.store.BaseDirectoryWrapper;
|
||||||
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||||
|
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class ShardUtilsTests extends ElasticsearchLuceneTestCase {
|
||||||
|
|
||||||
|
public void testExtractShardId() throws IOException {
|
||||||
|
BaseDirectoryWrapper dir = newDirectory();
|
||||||
|
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
|
||||||
|
writer.commit();
|
||||||
|
ShardId id = new ShardId("foo", random().nextInt());
|
||||||
|
try (DirectoryReader reader = DirectoryReader.open(writer, random().nextBoolean())) {
|
||||||
|
ElasticsearchDirectoryReader wrap = ElasticsearchDirectoryReader.wrap(reader, id);
|
||||||
|
assertEquals(id, ShardUtils.extractShardId(wrap));
|
||||||
|
}
|
||||||
|
final int numDocs = 1 + random().nextInt(5);
|
||||||
|
for (int i = 0; i < numDocs; i++) {
|
||||||
|
Document d = new Document();
|
||||||
|
d.add(newField("name", "foobar", StringField.TYPE_STORED));
|
||||||
|
writer.addDocument(d);
|
||||||
|
if (random().nextBoolean()) {
|
||||||
|
writer.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try (DirectoryReader reader = DirectoryReader.open(writer, random().nextBoolean())) {
|
||||||
|
ElasticsearchDirectoryReader wrap = ElasticsearchDirectoryReader.wrap(reader, id);
|
||||||
|
assertEquals(id, ShardUtils.extractShardId(wrap));
|
||||||
|
CompositeReaderContext context = wrap.getContext();
|
||||||
|
for (LeafReaderContext leaf : context.leaves()) {
|
||||||
|
assertEquals(id, ShardUtils.extractShardId(leaf.reader()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
IOUtils.close(writer, dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue