Replace EngineClosedException with AlreadyClosedExcpetion (#22631)
`EngineClosedException` is a ES level exception that is used to indicate that the engine is closed when operation starts. It doesn't really add much value and we can use `AlreadyClosedException` from Lucene (which may already bubble if things go wrong during operations). Having two exception can just add confusion and lead to bugs, like wrong handling of `EngineClosedException` when dealing with document level failures. The latter was exposed by `IndexWithShadowReplicasIT`. This PR also removes the AwaitFix from the `IndexWithShadowReplicasIT` tests (which was what cause this to be discovered). While debugging the source of the issue I found some mismatches in document uid management in the tests. The term that was passed to the engine didn't correspond to the uid in the parsed doc - those are fixed as well.
This commit is contained in:
parent
f30b1f82ee
commit
d80e3eea6c
|
@ -50,14 +50,12 @@ import org.elasticsearch.common.xcontent.XContentHelper;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineClosedException;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardClosedException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
@ -391,8 +389,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
Exception failure = operationResult.getFailure();
|
||||
assert failure instanceof VersionConflictEngineException
|
||||
|| failure instanceof MapperParsingException
|
||||
|| failure instanceof EngineClosedException
|
||||
|| failure instanceof IndexShardClosedException
|
||||
: "expected any one of [version conflict, mapper parsing, engine closed, index shard closed]" +
|
||||
" failures. got " + failure;
|
||||
if (!TransportActions.isShardNotAvailableException(failure)) {
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.elasticsearch.index.cache.IndexCache;
|
|||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||
import org.elasticsearch.index.cache.query.QueryCache;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineClosedException;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
|
@ -675,7 +674,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
if (translog.syncNeeded()) {
|
||||
translog.sync();
|
||||
}
|
||||
} catch (EngineClosedException | AlreadyClosedException ex) {
|
||||
} catch (AlreadyClosedException ex) {
|
||||
// fine - continue;
|
||||
} catch (IOException e) {
|
||||
logger.warn("failed to sync translog", e);
|
||||
|
@ -723,7 +722,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
case STARTED:
|
||||
try {
|
||||
shard.updateGlobalCheckpointOnPrimary();
|
||||
} catch (EngineClosedException | AlreadyClosedException ex) {
|
||||
} catch (AlreadyClosedException ex) {
|
||||
// fine - continue, the shard was concurrently closed on us.
|
||||
}
|
||||
continue;
|
||||
|
|
|
@ -512,7 +512,7 @@ public abstract class Engine implements Closeable {
|
|||
manager.release(searcher);
|
||||
}
|
||||
}
|
||||
} catch (EngineClosedException ex) {
|
||||
} catch (AlreadyClosedException ex) {
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
ensureOpen(); // throw EngineCloseException here if we are already closed
|
||||
|
@ -530,7 +530,7 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
protected void ensureOpen() {
|
||||
if (isClosed.get()) {
|
||||
throw new EngineClosedException(shardId, failedEngine.get());
|
||||
throw new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1017,6 +1017,7 @@ public abstract class Engine implements Closeable {
|
|||
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
|
||||
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
|
||||
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
|
||||
assert uid.bytes().equals(doc.uid()) : "term uid " + uid + " doesn't match doc uid " + doc.uid();
|
||||
this.doc = doc;
|
||||
this.isRetry = isRetry;
|
||||
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
|
||||
|
@ -1282,7 +1283,7 @@ public abstract class Engine implements Closeable {
|
|||
logger.debug("flushing shard on close - this might take some time to sync files to disk");
|
||||
try {
|
||||
flush(); // TODO we might force a flush in the future since we have the write lock already even though recoveries are running.
|
||||
} catch (EngineClosedException ex) {
|
||||
} catch (AlreadyClosedException ex) {
|
||||
logger.debug("engine already closed - skipping flushAndClose");
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.io.IOException;
|
|||
*
|
||||
*
|
||||
*/
|
||||
@Deprecated
|
||||
public class EngineClosedException extends IndexShardClosedException {
|
||||
|
||||
public EngineClosedException(ShardId shardId) {
|
||||
|
|
|
@ -546,6 +546,10 @@ public class InternalEngine extends Engine {
|
|||
// and set the error in operation.setFailure. In case of environment related errors, the failure
|
||||
// is bubbled up
|
||||
isDocumentFailure = maybeFailEngine(operation.operationType().getLowercase(), failure) == false;
|
||||
if (failure instanceof AlreadyClosedException) {
|
||||
// ensureOpen throws AlreadyClosedException which is not a document level issue
|
||||
isDocumentFailure = false;
|
||||
}
|
||||
} catch (Exception inner) {
|
||||
// we failed checking whether the failure can fail the engine, treat it as a persistent engine failure
|
||||
isDocumentFailure = false;
|
||||
|
@ -901,8 +905,6 @@ public class InternalEngine extends Engine {
|
|||
} catch (AlreadyClosedException e) {
|
||||
failOnTragicEvent(e);
|
||||
throw e;
|
||||
} catch (EngineClosedException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
failEngine("refresh failed", e);
|
||||
|
@ -949,8 +951,6 @@ public class InternalEngine extends Engine {
|
|||
} catch (AlreadyClosedException e) {
|
||||
failOnTragicEvent(e);
|
||||
throw e;
|
||||
} catch (EngineClosedException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
failEngine("writeIndexingBuffer failed", e);
|
||||
|
@ -1129,7 +1129,7 @@ public class InternalEngine extends Engine {
|
|||
|
||||
@Override
|
||||
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
|
||||
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, EngineClosedException, IOException {
|
||||
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException {
|
||||
/*
|
||||
* We do NOT acquire the readlock here since we are waiting on the merges to finish
|
||||
* that's fine since the IW.rollback should stop all the threads and trigger an IOException
|
||||
|
@ -1215,7 +1215,8 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
@SuppressWarnings("finally")
|
||||
private void failOnTragicEvent(AlreadyClosedException ex) {
|
||||
private boolean failOnTragicEvent(AlreadyClosedException ex) {
|
||||
final boolean engineFailed;
|
||||
// if we are already closed due to some tragic exception
|
||||
// we need to fail the engine. it might have already been failed before
|
||||
// but we are double-checking it's failed and closed
|
||||
|
@ -1228,14 +1229,19 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
} else {
|
||||
failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException());
|
||||
engineFailed = true;
|
||||
}
|
||||
} else if (translog.isOpen() == false && translog.getTragicException() != null) {
|
||||
failEngine("already closed by tragic event on the translog", translog.getTragicException());
|
||||
} else if (failedEngine.get() == null) { // we are closed but the engine is not failed yet?
|
||||
engineFailed = true;
|
||||
} else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet?
|
||||
// this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
|
||||
// a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error
|
||||
throw new AssertionError("Unexpected AlreadyClosedException", ex);
|
||||
} else {
|
||||
engineFailed = false;
|
||||
}
|
||||
return engineFailed;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1248,8 +1254,7 @@ public class InternalEngine extends Engine {
|
|||
// exception that should only be thrown in a tragic event. we pass on the checks to failOnTragicEvent which will
|
||||
// throw and AssertionError if the tragic event condition is not met.
|
||||
if (e instanceof AlreadyClosedException) {
|
||||
failOnTragicEvent((AlreadyClosedException)e);
|
||||
return true;
|
||||
return failOnTragicEvent((AlreadyClosedException)e);
|
||||
} else if (e != null &&
|
||||
((indexWriter.isOpen() == false && indexWriter.getTragicException() == e)
|
||||
|| (translog.isOpen() == false && translog.getTragicException() == e))) {
|
||||
|
|
|
@ -197,9 +197,6 @@ public class ShadowEngine extends Engine {
|
|||
ensureOpen();
|
||||
searcherManager.maybeRefreshBlocking();
|
||||
} catch (AlreadyClosedException e) {
|
||||
// This means there's a bug somewhere: don't suppress it
|
||||
throw new AssertionError(e);
|
||||
} catch (EngineClosedException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
|
|
|
@ -75,7 +75,6 @@ import org.elasticsearch.index.cache.request.ShardRequestCache;
|
|||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.engine.CommitStats;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineClosedException;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
|
@ -622,7 +621,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
/**
|
||||
* Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}.
|
||||
* Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link AlreadyClosedException}.
|
||||
*/
|
||||
public void refresh(String source) {
|
||||
verifyNotClosed();
|
||||
|
@ -1265,7 +1264,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
try {
|
||||
Translog translog = engine.getTranslog();
|
||||
return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().getBytes();
|
||||
} catch (AlreadyClosedException | EngineClosedException ex) {
|
||||
} catch (AlreadyClosedException ex) {
|
||||
// that's fine we are already close - no need to flush
|
||||
}
|
||||
}
|
||||
|
@ -1304,7 +1303,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
public void activateThrottling() {
|
||||
try {
|
||||
getEngine().activateThrottling();
|
||||
} catch (EngineClosedException ex) {
|
||||
} catch (AlreadyClosedException ex) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
@ -1312,13 +1311,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
public void deactivateThrottling() {
|
||||
try {
|
||||
getEngine().deactivateThrottling();
|
||||
} catch (EngineClosedException ex) {
|
||||
} catch (AlreadyClosedException ex) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
private void handleRefreshException(Exception e) {
|
||||
if (e instanceof EngineClosedException) {
|
||||
if (e instanceof AlreadyClosedException) {
|
||||
// ignore
|
||||
} else if (e instanceof RefreshFailedEngineException) {
|
||||
RefreshFailedEngineException rfee = (RefreshFailedEngineException) e;
|
||||
|
@ -1530,7 +1529,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
Engine getEngine() {
|
||||
Engine engine = getEngineOrNull();
|
||||
if (engine == null) {
|
||||
throw new EngineClosedException(shardId);
|
||||
throw new AlreadyClosedException("engine is closed");
|
||||
}
|
||||
return engine;
|
||||
}
|
||||
|
@ -1667,7 +1666,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
private Engine createNewEngine(EngineConfig config) {
|
||||
synchronized (mutex) {
|
||||
if (state == IndexShardState.CLOSED) {
|
||||
throw new EngineClosedException(shardId);
|
||||
throw new AlreadyClosedException(shardId + " can't create engine - shard is closed");
|
||||
}
|
||||
assert this.currentEngineReference.get() == null;
|
||||
Engine engine = newEngine(config);
|
||||
|
@ -1769,7 +1768,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
try {
|
||||
final Engine engine = getEngine();
|
||||
engine.getTranslog().ensureSynced(candidates.stream().map(Tuple::v1));
|
||||
} catch (EngineClosedException ex) {
|
||||
} catch (AlreadyClosedException ex) {
|
||||
// that's fine since we already synced everything on engine close - this also is conform with the methods
|
||||
// documentation
|
||||
} catch (IOException ex) { // if this fails we are in deep shit - fail the request
|
||||
|
@ -1884,8 +1883,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
* refresh listeners.
|
||||
* Otherwise <code>false</code>.
|
||||
*
|
||||
* @throws EngineClosedException if the engine is already closed
|
||||
* @throws AlreadyClosedException if the internal indexwriter in the engine is already closed
|
||||
* @throws AlreadyClosedException if the engine or internal indexwriter in the engine is already closed
|
||||
*/
|
||||
public boolean isRefreshNeeded() {
|
||||
return getEngine().refreshNeeded() || (refreshListeners != null && refreshListeners.refreshNeeded());
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.indices;
|
|||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
|
@ -30,7 +31,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineClosedException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.IndexingOperationListener;
|
||||
|
@ -384,7 +384,7 @@ public class IndexingMemoryController extends AbstractComponent implements Index
|
|||
protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
|
||||
try {
|
||||
shard.checkIdle(inactiveTimeNS);
|
||||
} catch (EngineClosedException e) {
|
||||
} catch (AlreadyClosedException e) {
|
||||
logger.trace((Supplier<?>) () -> new ParameterizedMessage("ignore exception while checking if shard {} is inactive", shard.shardId()), e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.UnavailableShardsException;
|
||||
|
@ -55,7 +56,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.EngineClosedException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardClosedException;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
|
@ -431,12 +431,12 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private ElasticsearchException randomRetryPrimaryException(ShardId shardId) {
|
||||
private Exception randomRetryPrimaryException(ShardId shardId) {
|
||||
return randomFrom(
|
||||
new ShardNotFoundException(shardId),
|
||||
new IndexNotFoundException(shardId.getIndex()),
|
||||
new IndexShardClosedException(shardId),
|
||||
new EngineClosedException(shardId),
|
||||
new AlreadyClosedException(shardId + " primary is closed"),
|
||||
new ReplicationOperation.RetryOnPrimaryException(shardId, "hello")
|
||||
);
|
||||
}
|
||||
|
|
|
@ -48,7 +48,9 @@ import org.elasticsearch.index.cache.query.IndexQueryCache;
|
|||
import org.elasticsearch.index.cache.query.QueryCache;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.engine.InternalEngineTests;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||
import org.elasticsearch.index.shard.IndexingOperationListener;
|
||||
|
@ -247,7 +249,8 @@ public class IndexModuleTests extends ESTestCase {
|
|||
assertEquals(IndexingSlowLog.class, indexService.getIndexOperationListeners().get(0).getClass());
|
||||
assertSame(listener, indexService.getIndexOperationListeners().get(1));
|
||||
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
|
||||
ParsedDocument doc = InternalEngineTests.createParsedDoc("1", "test", null);
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc);
|
||||
ShardId shardId = new ShardId(new Index("foo", "bar"), 0);
|
||||
for (IndexingOperationListener l : indexService.getIndexOperationListeners()) {
|
||||
l.preIndex(shardId, index);
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
|
@ -34,7 +33,6 @@ import org.elasticsearch.action.index.IndexResponse;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.common.Priority;
|
||||
|
@ -58,9 +56,6 @@ import org.elasticsearch.test.ESIntegTestCase;
|
|||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -91,7 +86,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|||
/**
|
||||
* Tests for indices that use shadow replicas and a shared filesystem
|
||||
*/
|
||||
@LuceneTestCase.AwaitsFix(bugUrl = "fix this fails intermittently")
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
||||
|
||||
|
@ -459,7 +453,6 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
|||
assertHitCount(resp, numPhase1Docs + numPhase2Docs);
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "uncaught exception")
|
||||
public void testPrimaryRelocationWhereRecoveryFails() throws Exception {
|
||||
Path dataPath = createTempDir();
|
||||
Settings nodeSettings = Settings.builder()
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -33,6 +33,7 @@ import org.apache.lucene.index.SnapshotDeletionPolicy;
|
|||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.MockDirectoryWrapper;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
|
@ -44,21 +45,18 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.index.mapper.ParseContext;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.index.shard.DocsStats;
|
||||
import org.elasticsearch.index.shard.RefreshListeners;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardUtils;
|
||||
|
@ -83,7 +81,6 @@ import java.util.List;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
|
@ -172,8 +169,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
|
||||
private ParsedDocument testParsedDocument(String uid, String id, String type, String routing, ParseContext.Document document, BytesReference source, Mapping mappingsUpdate) {
|
||||
Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
private ParsedDocument testParsedDocument(String id, String type, String routing, ParseContext.Document document, BytesReference source, Mapping mappingsUpdate) {
|
||||
Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field versionField = new NumericDocValuesField("_version", 0);
|
||||
SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
|
||||
document.add(uidField);
|
||||
|
@ -254,8 +251,16 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
return config;
|
||||
}
|
||||
|
||||
protected Term newUid(String id) {
|
||||
return new Term("_uid", id);
|
||||
// protected Term newUid(String id) {
|
||||
// return new Term("_uid", id);
|
||||
// }
|
||||
|
||||
protected Term newUid(ParsedDocument doc) {
|
||||
return new Term("_uid", doc.uid());
|
||||
}
|
||||
|
||||
private Engine.Index indexForDoc(ParsedDocument doc) {
|
||||
return new Engine.Index(newUid(doc), doc);
|
||||
}
|
||||
|
||||
protected static final BytesReference B_1 = new BytesArray(new byte[]{1});
|
||||
|
@ -264,8 +269,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
|
||||
public void testCommitStats() {
|
||||
// create a doc and refresh
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(indexForDoc(doc));
|
||||
|
||||
CommitStats stats1 = replicaEngine.commitStats();
|
||||
assertThat(stats1.getGeneration(), greaterThan(0L));
|
||||
|
@ -296,11 +301,11 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
assertThat(primaryEngine.segmentsStats(false).getMemoryInBytes(), equalTo(0L));
|
||||
|
||||
// create a doc and refresh
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(indexForDoc(doc));
|
||||
|
||||
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, testDocumentWithTextField(), B_2, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("2"), doc2));
|
||||
ParsedDocument doc2 = testParsedDocument("2", "test", null, testDocumentWithTextField(), B_2, null);
|
||||
primaryEngine.index(indexForDoc(doc2));
|
||||
primaryEngine.refresh("test");
|
||||
|
||||
segments = primaryEngine.segments(false);
|
||||
|
@ -358,8 +363,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
assertThat(segments.get(0).isCompound(), equalTo(true));
|
||||
|
||||
|
||||
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, testDocumentWithTextField(), B_3, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("3"), doc3));
|
||||
ParsedDocument doc3 = testParsedDocument("3", "test", null, testDocumentWithTextField(), B_3, null);
|
||||
primaryEngine.index(indexForDoc(doc3));
|
||||
primaryEngine.refresh("test");
|
||||
|
||||
segments = primaryEngine.segments(false);
|
||||
|
@ -408,7 +413,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
|
||||
assertThat(segments.get(1).isCompound(), equalTo(true));
|
||||
|
||||
primaryEngine.delete(new Engine.Delete("test", "1", newUid("1")));
|
||||
primaryEngine.delete(new Engine.Delete("test", "1", newUid(doc)));
|
||||
primaryEngine.refresh("test");
|
||||
|
||||
segments = primaryEngine.segments(false);
|
||||
|
@ -430,8 +435,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
primaryEngine.flush();
|
||||
replicaEngine.refresh("test");
|
||||
|
||||
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, testDocumentWithTextField(), B_3, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("4"), doc4));
|
||||
ParsedDocument doc4 = testParsedDocument("4", "test", null, testDocumentWithTextField(), B_3, null);
|
||||
primaryEngine.index(indexForDoc(doc4));
|
||||
primaryEngine.refresh("test");
|
||||
|
||||
segments = primaryEngine.segments(false);
|
||||
|
@ -463,19 +468,19 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
List<Segment> segments = primaryEngine.segments(true);
|
||||
assertThat(segments.isEmpty(), equalTo(true));
|
||||
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(indexForDoc(doc));
|
||||
primaryEngine.refresh("test");
|
||||
|
||||
segments = primaryEngine.segments(true);
|
||||
assertThat(segments.size(), equalTo(1));
|
||||
assertThat(segments.get(0).ramTree, notNullValue());
|
||||
|
||||
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, testDocumentWithTextField(), B_2, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("2"), doc2));
|
||||
ParsedDocument doc2 = testParsedDocument("2", "test", null, testDocumentWithTextField(), B_2, null);
|
||||
primaryEngine.index(indexForDoc(doc2));
|
||||
primaryEngine.refresh("test");
|
||||
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, testDocumentWithTextField(), B_3, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("3"), doc3));
|
||||
ParsedDocument doc3 = testParsedDocument("3", "test", null, testDocumentWithTextField(), B_3, null);
|
||||
primaryEngine.index(indexForDoc(doc3));
|
||||
primaryEngine.refresh("test");
|
||||
|
||||
segments = primaryEngine.segments(true);
|
||||
|
@ -500,9 +505,9 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
// create a document
|
||||
ParseContext.Document document = testDocumentWithTextField();
|
||||
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, document, B_1, null);
|
||||
ParsedDocument doc = testParsedDocument("1", "test", null, document, B_1, null);
|
||||
try {
|
||||
replicaEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
replicaEngine.index(indexForDoc(doc));
|
||||
fail("should have thrown an exception");
|
||||
} catch (UnsupportedOperationException e) {}
|
||||
replicaEngine.refresh("test");
|
||||
|
@ -512,16 +517,16 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
|
||||
searchResult.close();
|
||||
Engine.GetResult getResult = replicaEngine.get(new Engine.Get(true, newUid("1")));
|
||||
Engine.GetResult getResult = replicaEngine.get(new Engine.Get(true, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(false));
|
||||
getResult.release();
|
||||
|
||||
// index a document
|
||||
document = testDocument();
|
||||
document.add(new TextField("value", "test1", Field.Store.YES));
|
||||
doc = testParsedDocument("1", "1", "test", null, document, B_1, null);
|
||||
doc = testParsedDocument("1", "test", null, document, B_1, null);
|
||||
try {
|
||||
replicaEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
replicaEngine.index(indexForDoc(doc));
|
||||
fail("should have thrown an exception");
|
||||
} catch (UnsupportedOperationException e) {}
|
||||
replicaEngine.refresh("test");
|
||||
|
@ -531,15 +536,15 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
|
||||
searchResult.close();
|
||||
getResult = replicaEngine.get(new Engine.Get(true, newUid("1")));
|
||||
getResult = replicaEngine.get(new Engine.Get(true, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(false));
|
||||
getResult.release();
|
||||
|
||||
// Now, add a document to the primary so we can test shadow engine deletes
|
||||
document = testDocumentWithTextField();
|
||||
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc = testParsedDocument("1", "1", "test", null, document, B_1, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
doc = testParsedDocument("1", "test", null, document, B_1, null);
|
||||
primaryEngine.index(indexForDoc(doc));
|
||||
primaryEngine.flush();
|
||||
replicaEngine.refresh("test");
|
||||
|
||||
|
@ -550,14 +555,14 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
searchResult.close();
|
||||
|
||||
// And the replica can retrieve it
|
||||
getResult = replicaEngine.get(new Engine.Get(false, newUid("1")));
|
||||
getResult = replicaEngine.get(new Engine.Get(false, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(true));
|
||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||
getResult.release();
|
||||
|
||||
// try to delete it on the replica
|
||||
try {
|
||||
replicaEngine.delete(new Engine.Delete("test", "1", newUid("1")));
|
||||
replicaEngine.delete(new Engine.Delete("test", "1", newUid(doc)));
|
||||
fail("should have thrown an exception");
|
||||
} catch (UnsupportedOperationException e) {}
|
||||
replicaEngine.flush();
|
||||
|
@ -569,7 +574,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
|
||||
searchResult.close();
|
||||
getResult = replicaEngine.get(new Engine.Get(false, newUid("1")));
|
||||
getResult = replicaEngine.get(new Engine.Get(false, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(true));
|
||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||
getResult.release();
|
||||
|
@ -579,7 +584,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
|
||||
searchResult.close();
|
||||
getResult = primaryEngine.get(new Engine.Get(false, newUid("1")));
|
||||
getResult = primaryEngine.get(new Engine.Get(false, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(true));
|
||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||
getResult.release();
|
||||
|
@ -593,8 +598,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
// create a document
|
||||
ParseContext.Document document = testDocumentWithTextField();
|
||||
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, document, B_1, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
ParsedDocument doc = testParsedDocument("1", "test", null, document, B_1, null);
|
||||
primaryEngine.index(indexForDoc(doc));
|
||||
|
||||
// its not there...
|
||||
searchResult = primaryEngine.acquireSearcher("test");
|
||||
|
@ -609,18 +614,18 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
searchResult.close();
|
||||
|
||||
// but, we can still get it (in realtime)
|
||||
Engine.GetResult getResult = primaryEngine.get(new Engine.Get(true, newUid("1")));
|
||||
Engine.GetResult getResult = primaryEngine.get(new Engine.Get(true, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(true));
|
||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||
getResult.release();
|
||||
|
||||
// can't get it from the replica, because it's not in the translog for a shadow replica
|
||||
getResult = replicaEngine.get(new Engine.Get(true, newUid("1")));
|
||||
getResult = replicaEngine.get(new Engine.Get(true, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(false));
|
||||
getResult.release();
|
||||
|
||||
// but, not there non realtime
|
||||
getResult = primaryEngine.get(new Engine.Get(false, newUid("1")));
|
||||
getResult = primaryEngine.get(new Engine.Get(false, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(true));
|
||||
getResult.release();
|
||||
|
||||
|
@ -631,7 +636,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
searchResult.close();
|
||||
|
||||
// also in non realtime
|
||||
getResult = primaryEngine.get(new Engine.Get(false, newUid("1")));
|
||||
getResult = primaryEngine.get(new Engine.Get(false, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(true));
|
||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||
getResult.release();
|
||||
|
@ -646,8 +651,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
document = testDocument();
|
||||
document.add(new TextField("value", "test1", Field.Store.YES));
|
||||
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_2), SourceFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc = testParsedDocument("1", "1", "test", null, document, B_2, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
doc = testParsedDocument("1", "test", null, document, B_2, null);
|
||||
primaryEngine.index(indexForDoc(doc));
|
||||
|
||||
// its not updated yet...
|
||||
searchResult = primaryEngine.acquireSearcher("test");
|
||||
|
@ -657,7 +662,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
searchResult.close();
|
||||
|
||||
// but, we can still get it (in realtime)
|
||||
getResult = primaryEngine.get(new Engine.Get(true, newUid("1")));
|
||||
getResult = primaryEngine.get(new Engine.Get(true, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(true));
|
||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||
getResult.release();
|
||||
|
@ -690,7 +695,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
searchResult.close();
|
||||
|
||||
// now delete
|
||||
primaryEngine.delete(new Engine.Delete("test", "1", newUid("1")));
|
||||
primaryEngine.delete(new Engine.Delete("test", "1", newUid(doc)));
|
||||
|
||||
// its not deleted yet
|
||||
searchResult = primaryEngine.acquireSearcher("test");
|
||||
|
@ -700,7 +705,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
searchResult.close();
|
||||
|
||||
// but, get should not see it (in realtime)
|
||||
getResult = primaryEngine.get(new Engine.Get(true, newUid("1")));
|
||||
getResult = primaryEngine.get(new Engine.Get(true, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(false));
|
||||
getResult.release();
|
||||
|
||||
|
@ -716,8 +721,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
// add it back
|
||||
document = testDocumentWithTextField();
|
||||
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc = testParsedDocument("1", "1", "test", null, document, B_1, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
doc = testParsedDocument("1", "test", null, document, B_1, null);
|
||||
primaryEngine.index(indexForDoc(doc));
|
||||
|
||||
// its not there...
|
||||
searchResult = primaryEngine.acquireSearcher("test");
|
||||
|
@ -740,7 +745,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
primaryEngine.flush();
|
||||
|
||||
// and, verify get (in real time)
|
||||
getResult = primaryEngine.get(new Engine.Get(true, newUid("1")));
|
||||
getResult = primaryEngine.get(new Engine.Get(true, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(true));
|
||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||
getResult.release();
|
||||
|
@ -752,7 +757,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
|
||||
searchResult.close();
|
||||
getResult = replicaEngine.get(new Engine.Get(true, newUid("1")));
|
||||
getResult = replicaEngine.get(new Engine.Get(true, newUid(doc)));
|
||||
assertThat(getResult.exists(), equalTo(true));
|
||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||
getResult.release();
|
||||
|
@ -761,8 +766,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
// now do an update
|
||||
document = testDocument();
|
||||
document.add(new TextField("value", "test1", Field.Store.YES));
|
||||
doc = testParsedDocument("1", "1", "test", null, document, B_1, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
doc = testParsedDocument("1", "test", null, document, B_1, null);
|
||||
primaryEngine.index(indexForDoc(doc));
|
||||
|
||||
// its not updated yet...
|
||||
searchResult = primaryEngine.acquireSearcher("test");
|
||||
|
@ -797,8 +802,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
searchResult.close();
|
||||
|
||||
// create a document
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(indexForDoc(doc));
|
||||
|
||||
// its not there...
|
||||
searchResult = primaryEngine.acquireSearcher("test");
|
||||
|
@ -827,7 +832,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
// don't release the replica search result yet...
|
||||
|
||||
// delete, refresh and do a new search, it should not be there
|
||||
primaryEngine.delete(new Engine.Delete("test", "1", newUid("1")));
|
||||
primaryEngine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc)));
|
||||
primaryEngine.flush();
|
||||
primaryEngine.refresh("test");
|
||||
replicaEngine.refresh("test");
|
||||
|
@ -842,8 +847,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testFailEngineOnCorruption() {
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(indexForDoc(doc));
|
||||
primaryEngine.flush();
|
||||
MockDirectoryWrapper leaf = DirectoryUtils.getLeaf(replicaEngine.config().getStore().directory(), MockDirectoryWrapper.class);
|
||||
leaf.setRandomIOExceptionRate(1.0);
|
||||
|
@ -860,7 +865,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
|
||||
searchResult.close();
|
||||
fail("exception expected");
|
||||
} catch (EngineClosedException ex) {
|
||||
} catch (AlreadyClosedException ex) {
|
||||
// all is well
|
||||
}
|
||||
}
|
||||
|
@ -879,8 +884,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
*/
|
||||
public void testFailStart() throws IOException {
|
||||
// Need a commit point for this
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
|
||||
primaryEngine.index(indexForDoc(doc));
|
||||
primaryEngine.flush();
|
||||
|
||||
// this test fails if any reader, searcher or directory is not closed - MDW FTW
|
||||
|
@ -965,8 +970,8 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
// create a document
|
||||
ParseContext.Document document = testDocumentWithTextField();
|
||||
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, document, B_1, null);
|
||||
pEngine.index(new Engine.Index(newUid("1"), doc));
|
||||
ParsedDocument doc = testParsedDocument("1", "test", null, document, B_1, null);
|
||||
pEngine.index(indexForDoc(doc));
|
||||
pEngine.flush(true, true);
|
||||
|
||||
t.join();
|
||||
|
|
|
@ -214,7 +214,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
|
|||
assertEquals("b", fields[1].stringValue());
|
||||
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
shard.index(new Engine.Index(new Term("_uid", "1"), doc));
|
||||
shard.index(new Engine.Index(new Term("_uid", doc.uid() ), doc));
|
||||
shard.refresh("test");
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
|
||||
|
@ -253,7 +253,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
|
|||
assertEquals("b", fields[1].stringValue());
|
||||
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
shard.index(new Engine.Index(new Term("_uid", "1"), doc));
|
||||
shard.index(new Engine.Index(new Term("_uid", doc.uid()), doc));
|
||||
shard.refresh("test");
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.elasticsearch.index.mapper.Mapping;
|
|||
import org.elasticsearch.index.mapper.ParseContext;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
@ -100,9 +101,9 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||
return pluginList(InternalSettingsPlugin.class);
|
||||
}
|
||||
|
||||
private ParsedDocument testParsedDocument(String uid, String id, String type, String routing, long seqNo,
|
||||
private ParsedDocument testParsedDocument(String id, String type, String routing, long seqNo,
|
||||
ParseContext.Document document, BytesReference source, Mapping mappingUpdate) {
|
||||
Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field versionField = new NumericDocValuesField("_version", 0);
|
||||
SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
|
||||
document.add(uidField);
|
||||
|
@ -325,14 +326,13 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||
client().prepareIndex("test", "test", "0").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
|
||||
assertFalse(shard.shouldFlush());
|
||||
ParsedDocument doc = testParsedDocument(
|
||||
"1",
|
||||
"1",
|
||||
"test",
|
||||
null,
|
||||
SequenceNumbersService.UNASSIGNED_SEQ_NO,
|
||||
new ParseContext.Document(),
|
||||
new BytesArray(new byte[]{1}), null);
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc);
|
||||
shard.index(index);
|
||||
assertTrue(shard.shouldFlush());
|
||||
assertEquals(2, shard.getEngine().getTranslog().totalOperations());
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.lucene.index.Term;
|
|||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.Version;
|
||||
|
@ -547,9 +548,9 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(shard);
|
||||
}
|
||||
|
||||
private ParsedDocument testParsedDocument(String uid, String id, String type, String routing,
|
||||
private ParsedDocument testParsedDocument(String id, String type, String routing,
|
||||
ParseContext.Document document, BytesReference source, Mapping mappingUpdate) {
|
||||
Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field versionField = new NumericDocValuesField("_version", 0);
|
||||
SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
|
||||
document.add(uidField);
|
||||
|
@ -619,9 +620,9 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
});
|
||||
recoveryShardFromStore(shard);
|
||||
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, new ParseContext.Document(),
|
||||
ParsedDocument doc = testParsedDocument("1", "test", null, new ParseContext.Document(),
|
||||
new BytesArray(new byte[]{1}), null);
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc);
|
||||
shard.index(index);
|
||||
assertEquals(1, preIndex.get());
|
||||
assertEquals(1, postIndexCreate.get());
|
||||
|
@ -640,7 +641,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
assertEquals(0, postDelete.get());
|
||||
assertEquals(0, postDeleteException.get());
|
||||
|
||||
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
|
||||
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", doc.uid()));
|
||||
shard.delete(delete);
|
||||
|
||||
assertEquals(2, preIndex.get());
|
||||
|
@ -657,7 +658,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
try {
|
||||
shard.index(index);
|
||||
fail();
|
||||
} catch (IllegalIndexShardStateException e) {
|
||||
} catch (AlreadyClosedException e) {
|
||||
|
||||
}
|
||||
|
||||
|
@ -671,7 +672,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
try {
|
||||
shard.delete(delete);
|
||||
fail();
|
||||
} catch (IllegalIndexShardStateException e) {
|
||||
} catch (AlreadyClosedException e) {
|
||||
|
||||
}
|
||||
|
||||
|
@ -1376,10 +1377,10 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
for (int i = 0; i < numDocs; i++) {
|
||||
final String id = Integer.toString(i);
|
||||
final ParsedDocument doc =
|
||||
testParsedDocument(id, id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null);
|
||||
testParsedDocument(id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null);
|
||||
final Engine.Index index =
|
||||
new Engine.Index(
|
||||
new Term("_uid", id),
|
||||
new Term("_uid", doc.uid()),
|
||||
doc,
|
||||
SequenceNumbersService.UNASSIGNED_SEQ_NO,
|
||||
0,
|
||||
|
@ -1406,10 +1407,10 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
for (final Integer i : ids) {
|
||||
final String id = Integer.toString(i);
|
||||
final ParsedDocument doc =
|
||||
testParsedDocument(id, id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null);
|
||||
testParsedDocument(id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null);
|
||||
final Engine.Index index =
|
||||
new Engine.Index(
|
||||
new Term("_uid", id),
|
||||
new Term("_uid", doc.uid()),
|
||||
doc,
|
||||
SequenceNumbersService.UNASSIGNED_SEQ_NO,
|
||||
0,
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.elasticsearch.index.shard;
|
|||
import org.apache.lucene.index.Term;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.InternalEngineTests;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
|
@ -131,9 +133,10 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
}
|
||||
Collections.shuffle(indexingOperationListeners, random());
|
||||
IndexingOperationListener.CompositeListener compositeListener =
|
||||
new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
|
||||
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
|
||||
new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
|
||||
ParsedDocument doc = InternalEngineTests.createParsedDoc("1", "test", null);
|
||||
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", doc.uid()));
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc);
|
||||
compositeListener.postDelete(randomShardId, delete, new Engine.DeleteResult(1, SequenceNumbersService.UNASSIGNED_SEQ_NO, true));
|
||||
assertEquals(0, preIndex.get());
|
||||
assertEquals(0, postIndex.get());
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor;
|
|||
import org.elasticsearch.index.mapper.ParseContext.Document;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.store.DirectoryService;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
|
@ -297,7 +298,7 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
}
|
||||
listener.assertNoError();
|
||||
|
||||
Engine.Get get = new Engine.Get(false, new Term("_uid", "test:"+threadId));
|
||||
Engine.Get get = new Engine.Get(false, new Term("_uid", Uid.createUid("test", threadId)));
|
||||
try (Engine.GetResult getResult = engine.get(get)) {
|
||||
assertTrue("document not found", getResult.exists());
|
||||
assertEquals(iteration, getResult.version());
|
||||
|
@ -328,7 +329,7 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
String uid = type + ":" + id;
|
||||
Document document = new Document();
|
||||
document.add(new TextField("test", testFieldValue, Field.Store.YES));
|
||||
Field uidField = new Field("_uid", type + ":" + id, UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY);
|
||||
SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
|
||||
document.add(uidField);
|
||||
|
@ -338,7 +339,7 @@ public class RefreshListenersTests extends ESTestCase {
|
|||
document.add(seqID.primaryTerm);
|
||||
BytesReference source = new BytesArray(new byte[] { 1 });
|
||||
ParsedDocument doc = new ParsedDocument(versionField, seqID, id, type, null, Arrays.asList(document), source, null);
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", uid), doc);
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc);
|
||||
return engine.index(index);
|
||||
}
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.elasticsearch.index.engine.Engine.Operation.Origin;
|
|||
import org.elasticsearch.index.mapper.ParseContext.Document;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -625,8 +626,12 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private Term newUid(String id) {
|
||||
return new Term("_uid", id);
|
||||
private Term newUid(ParsedDocument doc) {
|
||||
return new Term("_uid", doc.uid());
|
||||
}
|
||||
|
||||
private Term newUid(String uid) {
|
||||
return new Term("_uid", uid);
|
||||
}
|
||||
|
||||
public void testVerifyTranslogIsNotDeleted() throws IOException {
|
||||
|
@ -2014,7 +2019,7 @@ public class TranslogTests extends ESTestCase {
|
|||
seqID.seqNo.setLongValue(randomSeqNum);
|
||||
seqID.seqNoDocValue.setLongValue(randomSeqNum);
|
||||
seqID.primaryTerm.setLongValue(randomPrimaryTerm);
|
||||
Field uidField = new Field("_uid", "1", UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field uidField = new Field("_uid", Uid.createUid("test", "1"), UidFieldMapper.Defaults.FIELD_TYPE);
|
||||
Field versionField = new NumericDocValuesField("_version", 1);
|
||||
Document document = new Document();
|
||||
document.add(new TextField("value", "test", Field.Store.YES));
|
||||
|
@ -2025,7 +2030,7 @@ public class TranslogTests extends ESTestCase {
|
|||
document.add(seqID.primaryTerm);
|
||||
ParsedDocument doc = new ParsedDocument(versionField, seqID, "1", "type", null, Arrays.asList(document), B_1, null);
|
||||
|
||||
Engine.Index eIndex = new Engine.Index(newUid("1"), doc, randomSeqNum, randomPrimaryTerm,
|
||||
Engine.Index eIndex = new Engine.Index(newUid(doc), doc, randomSeqNum, randomPrimaryTerm,
|
||||
1, VersionType.INTERNAL, Origin.PRIMARY, 0, 0, false);
|
||||
Engine.IndexResult eIndexResult = new Engine.IndexResult(1, randomSeqNum, true);
|
||||
Translog.Index index = new Translog.Index(eIndex, eIndexResult);
|
||||
|
@ -2036,7 +2041,7 @@ public class TranslogTests extends ESTestCase {
|
|||
Translog.Index serializedIndex = new Translog.Index(in);
|
||||
assertEquals(index, serializedIndex);
|
||||
|
||||
Engine.Delete eDelete = new Engine.Delete("type", "1", newUid("1"), randomSeqNum, randomPrimaryTerm,
|
||||
Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm,
|
||||
2, VersionType.INTERNAL, Origin.PRIMARY, 0);
|
||||
Engine.DeleteResult eDeleteResult = new Engine.DeleteResult(2, randomSeqNum, true);
|
||||
Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult);
|
||||
|
|
Loading…
Reference in New Issue