[CORE] Refactor InternalEngine into AbstractEngine and classes

InternalEngine contains a number of inner classes that it uses, however,
this makes the class overly large and hard to extend. In order to be
able to easily add other Engines (such as the ShadowEngine), these
helping methods have been extracted into an AbstractEngine class. The
classes that were previously in `InternalEngine` have been moved to
separate classes, which will allow for better unit testing as well.

None of the functionality of InternalEngine has been changed, this is
only refactoring.

Note that this is a change I originally made on my shadow-replica
branch, however it is easier to review piecemeal so I extracted it into
a separate PR.
This commit is contained in:
Lee Hinman 2015-02-05 11:51:18 -07:00
parent dcc15a6460
commit 622d2c8e42
25 changed files with 516 additions and 368 deletions

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.engine.EngineException;
import java.util.concurrent.locks.Lock;
/**
* Releasable lock used inside of Engine implementations
*/
public class ReleasableLock implements Releasable {
private final Lock lock;
public ReleasableLock(Lock lock) {
this.lock = lock;
}
@Override
public void close() {
lock.unlock();
}
public ReleasableLock acquire() throws EngineException {
lock.lock();
return this;
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.engine.internal;
package org.elasticsearch.index.engine;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.index.translog.Translog;

View File

@ -19,20 +19,24 @@
package org.elasticsearch.index.engine;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.*;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.join.BitDocIdSetFilter;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.mapper.DocumentMapper;
@ -42,24 +46,140 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import java.io.Closeable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
*
*/
public interface Engine extends Closeable {
public abstract class Engine implements Closeable {
void updateIndexingBufferSize(ByteSizeValue indexingBufferSize);
private final ESLogger logger;
private final EngineConfig engineConfig;
void create(Create create) throws EngineException;
protected Engine(EngineConfig engineConfig) {
Preconditions.checkNotNull(engineConfig.getStore(), "Store must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getDeletionPolicy(), "Snapshot deletion policy must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getTranslog(), "Translog must be provided to the engine");
void index(Index index) throws EngineException;
this.engineConfig = engineConfig;
this.logger = Loggers.getLogger(getClass(), engineConfig.getIndexSettings(), engineConfig.getShardId());
}
void delete(Delete delete) throws EngineException;
/** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */
protected static long guardedRamBytesUsed(Accountable a) {
if (a == null) {
return 0;
}
return a.ramBytesUsed();
}
void delete(DeleteByQuery delete) throws EngineException;
/**
* Tries to extract a segment reader from the given index reader.
* If no SegmentReader can be extracted an {@link org.elasticsearch.ElasticsearchIllegalStateException} is thrown.
*/
protected static SegmentReader segmentReader(LeafReader reader) {
if (reader instanceof SegmentReader) {
return (SegmentReader) reader;
} else if (reader instanceof FilterLeafReader) {
final FilterLeafReader fReader = (FilterLeafReader) reader;
return segmentReader(FilterLeafReader.unwrap(fReader));
}
// hard fail - we can't get a SegmentReader
throw new ElasticsearchIllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
}
GetResult get(Get get) throws EngineException;
/**
* Returns whether a leaf reader comes from a merge (versus flush or addIndexes).
*/
protected static boolean isMergedSegment(LeafReader reader) {
// We expect leaves to be segment readers
final Map<String, String> diagnostics = segmentReader(reader).getSegmentInfo().info.getDiagnostics();
final String source = diagnostics.get(IndexWriter.SOURCE);
assert Arrays.asList(IndexWriter.SOURCE_ADDINDEXES_READERS, IndexWriter.SOURCE_FLUSH,
IndexWriter.SOURCE_MERGE).contains(source) : "Unknown source " + source;
return IndexWriter.SOURCE_MERGE.equals(source);
}
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
return new EngineSearcher(source, searcher, manager, engineConfig.getStore(), logger);
}
/** A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
* is enabled
*/
protected static final class IndexThrottle {
private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock());
private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock());
private volatile ReleasableLock lock = NOOP_LOCK;
public Releasable acquireThrottle() {
return lock.acquire();
}
/** Activate throttling, which switches the lock to be a real lock */
public void activate() {
assert lock == NOOP_LOCK : "throttling activated while already active";
lock = lockReference;
}
/** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
public void deactivate() {
assert lock != NOOP_LOCK : "throttling deactivated but not active";
lock = NOOP_LOCK;
}
}
/** A Lock implementation that always allows the lock to be acquired */
protected static final class NoOpLock implements Lock {
@Override
public void lock() {
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return true;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return true;
}
@Override
public void unlock() {
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("NoOpLock can't provide a condition");
}
}
public abstract void updateIndexingBufferSize(ByteSizeValue indexingBufferSize);
public abstract void create(Create create) throws EngineException;
public abstract void index(Index index) throws EngineException;
public abstract void delete(Delete delete) throws EngineException;
public abstract void delete(DeleteByQuery delete) throws EngineException;
public abstract GetResult get(Get get) throws EngineException;
/**
* Returns a new searcher instance. The consumer of this
@ -68,28 +188,28 @@ public interface Engine extends Closeable {
*
* @see Searcher#close()
*/
Searcher acquireSearcher(String source) throws EngineException;
public abstract Searcher acquireSearcher(String source) throws EngineException;
/**
* Global stats on segments.
*/
SegmentsStats segmentsStats();
public abstract SegmentsStats segmentsStats();
/**
* The list of segments in the engine.
*/
List<Segment> segments(boolean verbose);
public abstract List<Segment> segments(boolean verbose);
/**
* Returns <tt>true</tt> if a refresh is really needed.
*/
boolean refreshNeeded();
public abstract boolean refreshNeeded();
/**
* Refreshes the engine for new search operations to reflect the latest
* changes.
*/
void refresh(String source) throws EngineException;
public abstract void refresh(String source) throws EngineException;
/**
* Flushes the state of the engine including the transaction log, clearing memory.
@ -97,7 +217,7 @@ public interface Engine extends Closeable {
* @param waitIfOngoing if <code>true</code> this call will block until all currently running flushes have finished.
* Otherwise this call will return without blocking.
*/
void flush(boolean force, boolean waitIfOngoing) throws EngineException;
public abstract void flush(boolean force, boolean waitIfOngoing) throws EngineException;
/**
* Flushes the state of the engine including the transaction log, clearing memory and persisting
@ -105,32 +225,32 @@ public interface Engine extends Closeable {
* This operation is not going to block if another flush operation is currently running and won't write
* a lucene commit if nothing needs to be committed.
*/
void flush() throws EngineException;
public abstract void flush() throws EngineException;
/**
* Optimizes to 1 segment
*/
void forceMerge(boolean flush, boolean waitForMerge);
abstract void forceMerge(boolean flush, boolean waitForMerge);
/**
* Triggers a forced merge on this engine
*/
void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
public abstract void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
/**
* Snapshots the index and returns a handle to it. Will always try and "commit" the
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
*/
SnapshotIndexCommit snapshotIndex() throws EngineException;
public abstract SnapshotIndexCommit snapshotIndex() throws EngineException;
void recover(RecoveryHandler recoveryHandler) throws EngineException;
public abstract void recover(RecoveryHandler recoveryHandler) throws EngineException;
/** fail engine due to some error. the engine will also be closed. */
void failEngine(String reason, Throwable failure);
public abstract void failEngine(String reason, Throwable failure);
ByteSizeValue indexingBufferSize();
public abstract ByteSizeValue indexingBufferSize();
static interface FailedEngineListener {
public static interface FailedEngineListener {
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
}
@ -146,7 +266,7 @@ public interface Engine extends Closeable {
* <p>The last phase returns the remaining transaction log. During this phase, no dirty
* operations are allowed on the index.
*/
static interface RecoveryHandler {
public static interface RecoveryHandler {
void phase1(SnapshotIndexCommit snapshot) throws ElasticsearchException;
@ -155,50 +275,38 @@ public interface Engine extends Closeable {
void phase3(Translog.Snapshot snapshot) throws ElasticsearchException;
}
static interface Searcher extends Releasable {
/**
* The source that caused this searcher to be acquired.
*/
String source();
IndexReader reader();
IndexSearcher searcher();
}
static class SimpleSearcher implements Searcher {
public static class Searcher implements Releasable {
private final String source;
private final IndexSearcher searcher;
public SimpleSearcher(String source, IndexSearcher searcher) {
public Searcher(String source, IndexSearcher searcher) {
this.source = source;
this.searcher = searcher;
}
@Override
/**
* The source that caused this searcher to be acquired.
*/
public String source() {
return source;
}
@Override
public IndexReader reader() {
return searcher.getIndexReader();
}
@Override
public IndexSearcher searcher() {
return searcher;
}
@Override
public void close() throws ElasticsearchException {
// nothing to release here...
// Nothing to close here
}
}
static interface Operation {
public static interface Operation {
static enum Type {
CREATE,
INDEX,
@ -216,7 +324,7 @@ public interface Engine extends Closeable {
Origin origin();
}
static abstract class IndexingOperation implements Operation {
public static abstract class IndexingOperation implements Operation {
private final DocumentMapper docMapper;
private final Term uid;
@ -329,7 +437,7 @@ public interface Engine extends Closeable {
}
}
static final class Create extends IndexingOperation {
public static final class Create extends IndexingOperation {
private final boolean autoGeneratedId;
public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates, boolean autoGeneratedId) {
@ -357,7 +465,7 @@ public interface Engine extends Closeable {
}
}
static final class Index extends IndexingOperation {
public static final class Index extends IndexingOperation {
private boolean created;
public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates) {
@ -389,7 +497,7 @@ public interface Engine extends Closeable {
}
}
static class Delete implements Operation {
public static class Delete implements Operation {
private final String type;
private final String id;
private final Term uid;
@ -481,7 +589,7 @@ public interface Engine extends Closeable {
}
}
static class DeleteByQuery {
public static class DeleteByQuery {
private final Query query;
private final BytesReference source;
private final String[] filteringAliases;
@ -557,7 +665,7 @@ public interface Engine extends Closeable {
}
static class Get {
public static class Get {
private final boolean realtime;
private final Term uid;
private boolean loadSource = true;
@ -605,7 +713,7 @@ public interface Engine extends Closeable {
}
}
static class GetResult {
public static class GetResult {
private final boolean exists;
private final long version;
private final Translog.Source source;

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Searcher for an Engine
*/
public class EngineSearcher extends Engine.Searcher {
private final SearcherManager manager;
private final AtomicBoolean released = new AtomicBoolean(false);
private final Store store;
private final ESLogger logger;
public EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager, Store store, ESLogger logger) {
super(source, searcher);
this.manager = manager;
this.store = store;
this.logger = logger;
}
@Override
public void close() throws ElasticsearchException {
if (!released.compareAndSet(false, true)) {
/* In general, searchers should never be released twice or this would break reference counting. There is one rare case
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short amount
* of time, this is why we only log a warning instead of throwing an exception.
*/
logger.warn("Searcher was released twice", new ElasticsearchIllegalStateException("Double release"));
return;
}
try {
manager.release(this.searcher());
} catch (IOException e) {
throw new ElasticsearchIllegalStateException("Cannot close", e);
} catch (AlreadyClosedException e) {
/* this one can happen if we already closed the
* underlying store / directory and we call into the
* IndexWriter to free up pending files. */
} finally {
store.decRef();
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
import org.elasticsearch.index.engine.EngineConfig;
import java.io.IOException;
/**
* Basic Searcher factory that allows returning an {@link IndexSearcher}
* given an {@link IndexReader}
*/
public class EngineSearcherFactory extends SearcherFactory {
private final EngineConfig engineConfig;
public EngineSearcherFactory(EngineConfig engineConfig) {
this.engineConfig = engineConfig;
}
@Override
public IndexSearcher newSearcher(IndexReader reader) throws IOException {
IndexSearcher searcher = new IndexSearcher(reader);
searcher.setSimilarity(engineConfig.getSimilarity());
return searcher;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.store.Store;
/**
* A special {@link RecoveryCounter} that flushes the engine when all
* recoveries have completed
*/
public final class FlushingRecoveryCounter extends RecoveryCounter {
private final Engine engine;
private final ESLogger logger;
FlushingRecoveryCounter(Engine engine, Store store, ESLogger logger) {
super(store);
this.engine = engine;
this.logger = logger;
}
int endRecovery() throws ElasticsearchException {
int left = super.endRecovery();
if (left == 0) {
try {
engine.flush();
} catch (IllegalIndexShardStateException|FlushNotAllowedEngineException e) {
// we are being closed, or in created state, ignore
// OR, we are not allowed to perform flush, ignore
} catch (Throwable e) {
logger.warn("failed to flush shard post recovery", e);
}
}
return left;
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.engine.internal;
package org.elasticsearch.index.engine;
import com.google.common.collect.Lists;
import org.apache.lucene.index.*;
@ -25,16 +25,13 @@ import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
@ -47,9 +44,9 @@ import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.OnGoingMerge;
@ -57,7 +54,6 @@ import org.elasticsearch.index.merge.policy.ElasticsearchMergePolicy;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
@ -67,11 +63,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -79,7 +73,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*
*/
public class InternalEngine implements Engine {
public class InternalEngine extends Engine {
protected final ESLogger logger;
protected final ShardId shardId;
@ -101,12 +95,12 @@ public class InternalEngine implements Engine {
private final MergeSchedulerProvider mergeScheduler;
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final InternalLock readLock = new InternalLock(rwl.readLock());
private final InternalLock writeLock = new InternalLock(rwl.writeLock());
private final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
private final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
private final IndexWriter indexWriter;
private final SearcherFactory searcherFactory = new SearchFactory();
private final SearcherFactory searcherFactory;
private final SearcherManager searcherManager;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
@ -118,7 +112,7 @@ public class InternalEngine implements Engine {
private final AtomicInteger flushing = new AtomicInteger();
private final Lock flushLock = new ReentrantLock();
protected final RecoveryCounter onGoingRecoveries = new RecoveryCounter();
protected final FlushingRecoveryCounter onGoingRecoveries;
// A uid (in the form of BytesRef) to the version map
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
private final LiveVersionMap versionMap;
@ -136,18 +130,17 @@ public class InternalEngine implements Engine {
private final IndexThrottle throttle;
public InternalEngine(EngineConfig engineConfig) throws EngineException {
Preconditions.checkNotNull(engineConfig.getStore(), "Store must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getDeletionPolicy(), "Snapshot deletion policy must be provided to the engine");
Preconditions.checkNotNull(engineConfig.getTranslog(), "Translog must be provided to the engine");
super(engineConfig);
this.store = engineConfig.getStore();
this.shardId = engineConfig.getShardId();
this.logger = Loggers.getLogger(getClass(), engineConfig.getIndexSettings(), shardId);
this.versionMap = new LiveVersionMap();
store.incRef();
IndexWriter writer = null;
SearcherManager manager = null;
boolean success = false;
try {
this.shardId = engineConfig.getShardId();
this.logger = Loggers.getLogger(getClass(), engineConfig.getIndexSettings(), shardId);
this.onGoingRecoveries = new FlushingRecoveryCounter(this, store, logger);
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
this.indexingService = engineConfig.getIndexingService();
this.warmer = engineConfig.getWarmer();
@ -163,6 +156,7 @@ public class InternalEngine implements Engine {
this.failedEngineListener = engineConfig.getFailedEngineListener();
throttle = new IndexThrottle();
this.engineConfig = engineConfig;
this.searcherFactory = new SearchFactory(engineConfig);
listener = new EngineConfig.EngineSettingsListener(logger, engineConfig) {
@Override
protected void onChange() {
@ -196,7 +190,7 @@ public class InternalEngine implements Engine {
@Override
public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) {
ByteSizeValue preValue = engineConfig.getIndexingBufferSize();
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
engineConfig.setIndexingBufferSize(indexingBufferSize);
indexWriter.getConfig().setRAMBufferSizeMB(indexingBufferSize.mbFrac());
@ -273,7 +267,7 @@ public class InternalEngine implements Engine {
@Override
public GetResult get(Get get) throws EngineException {
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
if (get.realtime()) {
VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes());
@ -326,7 +320,7 @@ public class InternalEngine implements Engine {
@Override
public void create(Create create) throws EngineException {
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
if (create.origin() == Operation.Origin.RECOVERY) {
// Don't throttle recovery operations
@ -432,7 +426,7 @@ public class InternalEngine implements Engine {
@Override
public void index(Index index) throws EngineException {
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
if (index.origin() == Operation.Origin.RECOVERY) {
// Don't throttle recovery operations
@ -531,7 +525,7 @@ public class InternalEngine implements Engine {
@Override
public void delete(Delete delete) throws EngineException {
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
innerDelete(delete);
flushNeeded = true;
@ -598,7 +592,7 @@ public class InternalEngine implements Engine {
@Override
public void delete(DeleteByQuery delete) throws EngineException {
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
Query query;
if (delete.nested() && delete.aliasFilter() != null) {
@ -659,10 +653,6 @@ public class InternalEngine implements Engine {
}
}
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
return new EngineSearcher(source, searcher, manager);
}
@Override
public boolean refreshNeeded() {
if (store.tryIncRef()) {
@ -689,7 +679,7 @@ public class InternalEngine implements Engine {
public void refresh(String source) throws EngineException {
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException e) {
@ -735,7 +725,7 @@ public class InternalEngine implements Engine {
flushLock.lock();
try {
if (commitTranslog) {
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
@ -776,7 +766,7 @@ public class InternalEngine implements Engine {
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
// translog on an index that was opened on a committed point in time that is "in the future"
// of that translog
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
// we allow to *just* commit if there is an ongoing recovery happening...
// its ok to use this, only a flush will cause a new translogId, and we are locked here from
@ -798,7 +788,7 @@ public class InternalEngine implements Engine {
}
// reread the last committed segment infos
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) {
@ -872,7 +862,7 @@ public class InternalEngine implements Engine {
@Override
public void forceMerge(final boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
if (optimizeMutex.compareAndSet(false, true)) {
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
/*
* The way we implement upgrades is a bit hackish in the sense that we set an instance
@ -928,7 +918,7 @@ public class InternalEngine implements Engine {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
flush(false, false, true);
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
return deletionPolicy.snapshot();
} catch (IOException e) {
@ -940,7 +930,7 @@ public class InternalEngine implements Engine {
public void recover(RecoveryHandler recoveryHandler) throws EngineException {
// take a write lock here so it won't happen while a flush is in progress
// this means that next commits will not be allowed once the lock is released
try (InternalLock _ = writeLock.acquire()) {
try (ReleasableLock _ = writeLock.acquire()) {
ensureOpen();
onGoingRecoveries.startRecovery();
}
@ -1020,13 +1010,6 @@ public class InternalEngine implements Engine {
return t;
}
private long guardedRamBytesUsed(Accountable a) {
if (a == null) {
return 0;
}
return a.ramBytesUsed();
}
@Override
public SegmentsStats segmentsStats() {
ensureOpen();
@ -1050,7 +1033,7 @@ public class InternalEngine implements Engine {
@Override
public List<Segment> segments(boolean verbose) {
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
Map<String, Segment> segments = new HashMap<>();
@ -1136,7 +1119,7 @@ public class InternalEngine implements Engine {
@Override
public void close() throws ElasticsearchException {
logger.debug("close now acquire writeLock");
try (InternalLock _ = writeLock.acquire()) {
try (ReleasableLock _ = writeLock.acquire()) {
logger.debug("close acquired writeLock");
if (isClosed.compareAndSet(false, true)) {
try {
@ -1202,7 +1185,7 @@ public class InternalEngine implements Engine {
logger.warn("failEngine threw exception", t);
} finally {
closedOrFailed = true;
try (InternalLock _ = readLock.acquire()) {
try (ReleasableLock _ = readLock.acquire()) {
// we take the readlock here to ensure nobody replaces this IW concurrently.
indexWriter.rollback();
} catch (Throwable t) {
@ -1230,17 +1213,6 @@ public class InternalEngine implements Engine {
}
}
/**
* Returns whether a leaf reader comes from a merge (versus flush or addIndexes).
*/
private static boolean isMergedSegment(LeafReader reader) {
// We expect leaves to be segment readers
final Map<String, String> diagnostics = segmentReader(reader).getSegmentInfo().info.getDiagnostics();
final String source = diagnostics.get(IndexWriter.SOURCE);
assert Arrays.asList(IndexWriter.SOURCE_ADDINDEXES_READERS, IndexWriter.SOURCE_FLUSH, IndexWriter.SOURCE_MERGE).contains(source) : "Unknown source " + source;
return IndexWriter.SOURCE_MERGE.equals(source);
}
private IndexWriter createWriter() throws IOException {
try {
boolean create = !Lucene.indexExists(store.directory());
@ -1274,7 +1246,7 @@ public class InternalEngine implements Engine {
try {
assert isMergedSegment(reader);
if (warmer != null) {
final Engine.Searcher searcher = new SimpleSearcher("warmer", new IndexSearcher(reader));
final Engine.Searcher searcher = new Searcher("warmer", new IndexSearcher(reader));
final IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, searcher);
warmer.warmNewReaders(context);
}
@ -1298,60 +1270,13 @@ public class InternalEngine implements Engine {
}
}
class EngineSearcher implements Searcher {
private final String source;
private final IndexSearcher searcher;
private final SearcherManager manager;
private final AtomicBoolean released = new AtomicBoolean(false);
/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
class SearchFactory extends EngineSearcherFactory {
private EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
this.source = source;
this.searcher = searcher;
this.manager = manager;
SearchFactory(EngineConfig engineConfig) {
super(engineConfig);
}
@Override
public String source() {
return this.source;
}
@Override
public IndexReader reader() {
return searcher.getIndexReader();
}
@Override
public IndexSearcher searcher() {
return searcher;
}
@Override
public void close() throws ElasticsearchException {
if (!released.compareAndSet(false, true)) {
/* In general, searchers should never be released twice or this would break reference counting. There is one rare case
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short amount
* of time, this is why we only log a warning instead of throwing an exception.
*/
logger.warn("Searcher was released twice", new ElasticsearchIllegalStateException("Double release"));
return;
}
try {
manager.release(searcher);
} catch (IOException e) {
throw new ElasticsearchIllegalStateException("Cannot close", e);
} catch (AlreadyClosedException e) {
/* this one can happen if we already closed the
* underlying store / directory and we call into the
* IndexWriter to free up pending files. */
} finally {
store.decRef();
}
}
}
class SearchFactory extends SearcherFactory {
@Override
public IndexSearcher newSearcher(IndexReader reader) throws IOException {
IndexSearcher searcher = new IndexSearcher(reader);
@ -1394,10 +1319,10 @@ public class InternalEngine implements Engine {
}
if (newSearcher != null) {
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new SimpleSearcher("warmer", newSearcher));
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", newSearcher));
warmer.warmNewReaders(context);
}
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new SimpleSearcher("warmer", searcher)));
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", searcher)));
} catch (Throwable e) {
if (closedOrFailed == false) {
logger.warn("failed to prepare/warm", e);
@ -1413,59 +1338,6 @@ public class InternalEngine implements Engine {
}
}
protected final class RecoveryCounter implements Releasable {
private final AtomicInteger onGoingRecoveries = new AtomicInteger();
void startRecovery() {
store.incRef();
onGoingRecoveries.incrementAndGet();
}
public int get() {
return onGoingRecoveries.get();
}
void endRecovery() throws ElasticsearchException {
store.decRef();
int left = onGoingRecoveries.decrementAndGet();
assert onGoingRecoveries.get() >= 0 : "ongoingRecoveries must be >= 0 but was: " + onGoingRecoveries.get();
if (left == 0) {
try {
flush();
} catch (IllegalIndexShardStateException e) {
// we are being closed, or in created state, ignore
} catch (FlushNotAllowedEngineException e) {
// ignore this exception, we are not allowed to perform flush
} catch (Throwable e) {
logger.warn("failed to flush shard post recovery", e);
}
}
}
@Override
public void close() throws ElasticsearchException {
endRecovery();
}
}
private static final class InternalLock implements Releasable {
private final Lock lock;
InternalLock(Lock lock) {
this.lock = lock;
}
@Override
public void close() {
lock.unlock();
}
InternalLock acquire() throws EngineException {
lock.lock();
return this;
}
}
public void activateThrottling() {
throttle.activate();
}
@ -1474,74 +1346,6 @@ public class InternalEngine implements Engine {
throttle.deactivate();
}
static final class IndexThrottle {
private static final InternalLock NOOP_LOCK = new InternalLock(new NoOpLock());
private final InternalLock lockReference = new InternalLock(new ReentrantLock());
private volatile InternalLock lock = NOOP_LOCK;
public Releasable acquireThrottle() {
return lock.acquire();
}
public void activate() {
assert lock == NOOP_LOCK : "throttling activated while already active";
lock = lockReference;
}
public void deactivate() {
assert lock != NOOP_LOCK : "throttling deactivated but not active";
lock = NOOP_LOCK;
}
}
private static final class NoOpLock implements Lock {
@Override
public void lock() {
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return true;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return true;
}
@Override
public void unlock() {
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("NoOpLock can't provide a condition");
}
}
/**
* Tries to extract a segment reader from the given index reader.
* If no SegmentReader can be extracted an {@link org.elasticsearch.ElasticsearchIllegalStateException} is thrown.
*/
private static SegmentReader segmentReader(LeafReader reader) {
if (reader instanceof SegmentReader) {
return (SegmentReader) reader;
} else if (reader instanceof FilterLeafReader) {
final FilterLeafReader fReader = (FilterLeafReader) reader;
return segmentReader(FilterLeafReader.unwrap(fReader));
}
// hard fail - we can't get a SegmentReader
throw new ElasticsearchIllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
}
long getGcDeletesInMillis() {
return engineConfig.getGcDeletesInMillis();
}

View File

@ -16,11 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine.internal;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
package org.elasticsearch.index.engine;
public class InternalEngineFactory implements EngineFactory {
@Override

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.engine.internal;
package org.elasticsearch.index.engine;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.util.Accountable;

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.store.Store;
import java.util.concurrent.atomic.AtomicInteger;
/**
* RecoveryCounter keeps tracks of the number of ongoing recoveries for a
* particular {@link Store}
*/
public class RecoveryCounter implements Releasable {
private final Store store;
RecoveryCounter(Store store) {
this.store = store;
}
private final AtomicInteger onGoingRecoveries = new AtomicInteger();
void startRecovery() {
store.incRef();
onGoingRecoveries.incrementAndGet();
}
public int get() {
return onGoingRecoveries.get();
}
/**
* End the recovery counter by decrementing the store's ref and the ongoing recovery counter
* @return number of ongoing recoveries remaining
*/
int endRecovery() throws ElasticsearchException {
store.decRef();
int left = onGoingRecoveries.decrementAndGet();
assert onGoingRecoveries.get() >= 0 : "ongoingRecoveries must be >= 0 but was: " + onGoingRecoveries.get();
return left;
}
@Override
public void close() throws ElasticsearchException {
endRecovery();
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.engine.internal;
package org.elasticsearch.index.engine;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;

View File

@ -22,7 +22,7 @@ package org.elasticsearch.index.shard;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.internal.InternalEngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
/**

View File

@ -35,7 +35,7 @@ public interface IndexShardRepository {
/**
* Creates a snapshot of the shard based on the index commit point.
* <p/>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.internal.InternalEngineHolder#snapshotIndex()} method.
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex()} method.
* IndexShardRepository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p/>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check

View File

@ -97,35 +97,19 @@ class MultiDocumentPercolatorIndex implements PercolatorIndex {
return memoryIndex;
}
private class DocSearcher implements Engine.Searcher {
private class DocSearcher extends Engine.Searcher {
private final IndexSearcher searcher;
private final MemoryIndex rootDocMemoryIndex;
private DocSearcher(IndexSearcher searcher, MemoryIndex rootDocMemoryIndex) {
this.searcher = searcher;
super("percolate", searcher);
this.rootDocMemoryIndex = rootDocMemoryIndex;
}
@Override
public String source() {
return "percolate";
}
@Override
public IndexReader reader() {
return searcher.getIndexReader();
}
@Override
public IndexSearcher searcher() {
return searcher;
}
@Override
public void close() throws ElasticsearchException {
try {
searcher.getIndexReader().close();
this.reader().close();
rootDocMemoryIndex.reset();
} catch (IOException e) {
throw new ElasticsearchException("failed to close IndexReader in percolator with nested doc", e);

View File

@ -23,10 +23,8 @@ package org.elasticsearch.percolator;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.index.engine.Engine;
@ -69,35 +67,19 @@ class SingleDocumentPercolatorIndex implements PercolatorIndex {
context.initialize(new DocEngineSearcher(memoryIndex), parsedDocument);
}
private class DocEngineSearcher implements Engine.Searcher {
private class DocEngineSearcher extends Engine.Searcher {
private final IndexSearcher searcher;
private final MemoryIndex memoryIndex;
public DocEngineSearcher(MemoryIndex memoryIndex) {
this.searcher = memoryIndex.createSearcher();
super("percolate", memoryIndex.createSearcher());
this.memoryIndex = memoryIndex;
}
@Override
public String source() {
return "percolate";
}
@Override
public IndexReader reader() {
return searcher.getIndexReader();
}
@Override
public IndexSearcher searcher() {
return searcher;
}
@Override
public void close() throws ElasticsearchException {
try {
searcher.getIndexReader().close();
this.reader().close();
memoryIndex.reset();
} catch (IOException e) {
throw new ElasticsearchException("failed to close percolator in-memory index", e);

View File

@ -26,8 +26,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.internal.InternalEngine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Ignore;

View File

@ -17,15 +17,13 @@
* under the License.
*/
package org.elasticsearch.index.engine.internal;
package org.elasticsearch.index.engine;
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine.internal;
package org.elasticsearch.index.engine;
import com.google.common.base.Predicate;
import org.apache.lucene.index.LogByteSizeMergePolicy;

View File

@ -16,11 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine.internal;
package org.elasticsearch.index.engine;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import static org.hamcrest.Matchers.is;

View File

@ -17,16 +17,14 @@
* under the License.
*/
package org.elasticsearch.index.engine.internal;
package org.elasticsearch.index.engine;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Sets;
import com.google.common.base.Predicate;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
@ -37,15 +35,12 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.LuceneTest;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -55,10 +50,8 @@ import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
@ -93,16 +86,12 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static com.carrotsearch.randomizedtesting.RandomizedTest.between;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLong;
import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.test.ElasticsearchTestCase.*;
import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom;
import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy;
import static org.elasticsearch.test.ElasticsearchTestCase.terminate;
import static org.hamcrest.Matchers.*;
public class InternalEngineTests extends ElasticsearchLuceneTestCase {
@ -1368,10 +1357,10 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
MockAppender mockAppender = new MockAppender();
// Works when running this test inside Intellij:
Logger iwIFDLogger = LogManager.exists("org.elasticsearch.index.engine.internal.lucene.iw.ifd");
Logger iwIFDLogger = LogManager.exists("org.elasticsearch.index.engine.lucene.iw.ifd");
if (iwIFDLogger == null) {
// Works when running this test from command line:
iwIFDLogger = LogManager.exists("index.engine.internal.lucene.iw.ifd");
iwIFDLogger = LogManager.exists("index.engine.lucene.iw.ifd");
assertNotNull(iwIFDLogger);
}

View File

@ -108,7 +108,7 @@ public class ChildrenConstantScoreQueryTests extends AbstractChildTests {
IndexReader indexReader = DirectoryReader.open(indexWriter.w, false);
IndexSearcher searcher = new IndexSearcher(indexReader);
((TestSearchContext) SearchContext.current()).setSearcher(new ContextIndexSearcher(
SearchContext.current(), new Engine.SimpleSearcher(ChildrenConstantScoreQueryTests.class.getSimpleName(), searcher)
SearchContext.current(), new Engine.Searcher(ChildrenConstantScoreQueryTests.class.getSimpleName(), searcher)
));
TermQuery childQuery = new TermQuery(new Term("field1", "value" + (1 + random().nextInt(3))));
@ -199,7 +199,7 @@ public class ChildrenConstantScoreQueryTests extends AbstractChildTests {
indexWriter.commit();
IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher searcher = new IndexSearcher(indexReader);
Engine.Searcher engineSearcher = new Engine.SimpleSearcher(
Engine.Searcher engineSearcher = new Engine.Searcher(
ChildrenConstantScoreQueryTests.class.getSimpleName(), searcher
);
((TestSearchContext) SearchContext.current()).setSearcher(new ContextIndexSearcher(SearchContext.current(), engineSearcher));
@ -228,7 +228,7 @@ public class ChildrenConstantScoreQueryTests extends AbstractChildTests {
indexReader.close();
indexReader = DirectoryReader.open(indexWriter.w, true);
searcher = new IndexSearcher(indexReader);
engineSearcher = new Engine.SimpleSearcher(
engineSearcher = new Engine.Searcher(
ChildrenConstantScoreQueryTests.class.getSimpleName(), searcher
);
((TestSearchContext) SearchContext.current()).setSearcher(new ContextIndexSearcher(SearchContext.current(), engineSearcher));

View File

@ -165,7 +165,7 @@ public class ChildrenQueryTests extends AbstractChildTests {
IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher searcher = new IndexSearcher(indexReader);
Engine.Searcher engineSearcher = new Engine.SimpleSearcher(
Engine.Searcher engineSearcher = new Engine.Searcher(
ChildrenQueryTests.class.getSimpleName(), searcher
);
((TestSearchContext) SearchContext.current()).setSearcher(new ContextIndexSearcher(SearchContext.current(), engineSearcher));
@ -194,7 +194,7 @@ public class ChildrenQueryTests extends AbstractChildTests {
indexReader.close();
indexReader = DirectoryReader.open(indexWriter.w, true);
searcher = new IndexSearcher(indexReader);
engineSearcher = new Engine.SimpleSearcher(
engineSearcher = new Engine.Searcher(
ChildrenConstantScoreQueryTests.class.getSimpleName(), searcher
);
((TestSearchContext) SearchContext.current()).setSearcher(new ContextIndexSearcher(SearchContext.current(), engineSearcher));
@ -357,7 +357,7 @@ public class ChildrenQueryTests extends AbstractChildTests {
IndexSearcher searcher = new IndexSearcher(reader);
// setup to read the parent/child map
Engine.SimpleSearcher engineSearcher = new Engine.SimpleSearcher(ChildrenQueryTests.class.getSimpleName(), searcher);
Engine.Searcher engineSearcher = new Engine.Searcher(ChildrenQueryTests.class.getSimpleName(), searcher);
((TestSearchContext)context).setSearcher(new ContextIndexSearcher(context, engineSearcher));
// child query that returns the score as the value of "childScore" for each child document, with the parent's score determined by the score type

View File

@ -158,7 +158,7 @@ public class ParentConstantScoreQueryTests extends AbstractChildTests {
IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher searcher = new IndexSearcher(indexReader);
Engine.Searcher engineSearcher = new Engine.SimpleSearcher(
Engine.Searcher engineSearcher = new Engine.Searcher(
ParentConstantScoreQuery.class.getSimpleName(), searcher
);
((TestSearchContext) SearchContext.current()).setSearcher(new ContextIndexSearcher(SearchContext.current(), engineSearcher));
@ -185,7 +185,7 @@ public class ParentConstantScoreQueryTests extends AbstractChildTests {
indexReader.close();
indexReader = DirectoryReader.open(indexWriter.w, true);
searcher = new IndexSearcher(indexReader);
engineSearcher = new Engine.SimpleSearcher(
engineSearcher = new Engine.Searcher(
ParentConstantScoreQueryTests.class.getSimpleName(), searcher
);
((TestSearchContext) SearchContext.current()).setSearcher(new ContextIndexSearcher(SearchContext.current(), engineSearcher));

View File

@ -156,7 +156,7 @@ public class ParentQueryTests extends AbstractChildTests {
IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher searcher = new IndexSearcher(indexReader);
Engine.Searcher engineSearcher = new Engine.SimpleSearcher(
Engine.Searcher engineSearcher = new Engine.Searcher(
ParentQueryTests.class.getSimpleName(), searcher
);
((TestSearchContext) SearchContext.current()).setSearcher(new ContextIndexSearcher(SearchContext.current(), engineSearcher));
@ -183,7 +183,7 @@ public class ParentQueryTests extends AbstractChildTests {
indexReader.close();
indexReader = DirectoryReader.open(indexWriter.w, true);
searcher = new IndexSearcher(indexReader);
engineSearcher = new Engine.SimpleSearcher(
engineSearcher = new Engine.Searcher(
ParentConstantScoreQueryTests.class.getSimpleName(), searcher
);
((TestSearchContext) SearchContext.current()).setSearcher(new ContextIndexSearcher(SearchContext.current(), engineSearcher));

View File

@ -29,7 +29,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.internal.InternalEngine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -131,7 +131,7 @@ public class MockInternalEngine extends InternalEngine {
return reader;
}
public final class AssertingSearcher implements Searcher {
public final class AssertingSearcher extends Searcher {
private final Searcher wrappedSearcher;
private final ShardId shardId;
private final IndexSearcher indexSearcher;
@ -140,6 +140,7 @@ public class MockInternalEngine extends InternalEngine {
private final int initialRefCount;
public AssertingSearcher(IndexSearcher indexSearcher, Searcher wrappedSearcher, ShardId shardId) {
super(wrappedSearcher.source(), indexSearcher);
// we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher
// with a wrapped reader.
this.wrappedSearcher = wrappedSearcher;