core: disable auto gen id optimization
This pr removes the optimization for auto generated ids. Previously, when ids were auto generated by elasticsearch then there was no check to see if a document with same id already existed and instead the new document was only appended. However, due to lucene improvements this optimization does not add much value. In addition, under rare circumstances it might cause duplicate documents: When an indexing request is retried (due to connect lost, node closed etc), then a flag 'canHaveDuplicates' is set to true for the indexing request that is send a second time. This was to make sure that even when an indexing request for a document with autogenerated id comes in we do not have to update unless this flag is set and instead only append. However, it might happen that for a retry or for the replication the indexing request that has the canHaveDuplicates set to true (the retried request) arrives at the destination before the original request that does have it set false. In this case both request add a document and we have a duplicated a document. This commit adds a workaround: remove the optimization for auto generated ids and always update the document. The asumtion is that this will not slow down indexing more than 10 percent, see: http://benchmarks.elasticsearch.org/ closes #8788 closes #9468
This commit is contained in:
parent
e412dab63a
commit
0a07ce8916
|
@ -212,7 +212,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
/* create engine config */
|
/* create engine config */
|
||||||
|
|
||||||
this.config = new EngineConfig(shardId,
|
this.config = new EngineConfig(shardId,
|
||||||
indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, true),
|
indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false),
|
||||||
threadPool,indexingService,indexSettingsService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler,
|
threadPool,indexingService,indexSettingsService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler,
|
||||||
analysisService.defaultIndexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener);
|
analysisService.defaultIndexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener);
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,9 @@ import org.apache.lucene.document.Field;
|
||||||
import org.apache.lucene.document.NumericDocValuesField;
|
import org.apache.lucene.document.NumericDocValuesField;
|
||||||
import org.apache.lucene.document.TextField;
|
import org.apache.lucene.document.TextField;
|
||||||
import org.apache.lucene.index.*;
|
import org.apache.lucene.index.*;
|
||||||
|
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||||
import org.apache.lucene.search.TermQuery;
|
import org.apache.lucene.search.TermQuery;
|
||||||
|
import org.apache.lucene.search.TopDocs;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.MockDirectoryWrapper;
|
import org.apache.lucene.store.MockDirectoryWrapper;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
@ -52,6 +54,7 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||||
import org.elasticsearch.index.engine.*;
|
import org.elasticsearch.index.engine.*;
|
||||||
import org.elasticsearch.index.indexing.ShardIndexingService;
|
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||||
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
|
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
|
||||||
|
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||||
import org.elasticsearch.index.mapper.ParseContext.Document;
|
import org.elasticsearch.index.mapper.ParseContext.Document;
|
||||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||||
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
|
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
|
||||||
|
@ -231,7 +234,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
|
||||||
|
|
||||||
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
|
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
|
||||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||||
EngineConfig config = new EngineConfig(shardId, true, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
|
EngineConfig config = new EngineConfig(shardId, false/*per default optimization for auto generated ids is disabled*/, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
|
||||||
, null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
|
, null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
|
||||||
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() {
|
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -1559,4 +1562,88 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException {
|
||||||
|
|
||||||
|
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, false);
|
||||||
|
boolean canHaveDuplicates = false;
|
||||||
|
boolean autoGeneratedId = true;
|
||||||
|
|
||||||
|
Engine.Create index = new Engine.Create(null, analyzer, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||||
|
engine.create(index);
|
||||||
|
assertThat(index.version(), equalTo(1l));
|
||||||
|
|
||||||
|
index = new Engine.Create(null, analyzer, newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||||
|
replicaEngine.create(index);
|
||||||
|
assertThat(index.version(), equalTo(1l));
|
||||||
|
|
||||||
|
canHaveDuplicates = true;
|
||||||
|
index = new Engine.Create(null, analyzer, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||||
|
engine.create(index);
|
||||||
|
assertThat(index.version(), equalTo(1l));
|
||||||
|
engine.refresh("test", true);
|
||||||
|
Engine.Searcher searcher = engine.acquireSearcher("test");
|
||||||
|
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||||
|
assertThat(topDocs.totalHits, equalTo(1));
|
||||||
|
|
||||||
|
index = new Engine.Create(null, analyzer, newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||||
|
try {
|
||||||
|
replicaEngine.create(index);
|
||||||
|
fail();
|
||||||
|
} catch (VersionConflictEngineException e) {
|
||||||
|
// we ignore version conflicts on replicas, see TransportShardReplicationOperationAction.ignoreReplicaException
|
||||||
|
}
|
||||||
|
replicaEngine.refresh("test", true);
|
||||||
|
Engine.Searcher replicaSearcher = replicaEngine.acquireSearcher("test");
|
||||||
|
topDocs = replicaSearcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||||
|
assertThat(topDocs.totalHits, equalTo(1));
|
||||||
|
searcher.close();
|
||||||
|
replicaSearcher.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() throws IOException {
|
||||||
|
|
||||||
|
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, false);
|
||||||
|
boolean canHaveDuplicates = true;
|
||||||
|
boolean autoGeneratedId = true;
|
||||||
|
|
||||||
|
Engine.Create firstIndexRequest = new Engine.Create(null, analyzer, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||||
|
engine.create(firstIndexRequest);
|
||||||
|
assertThat(firstIndexRequest.version(), equalTo(1l));
|
||||||
|
|
||||||
|
Engine.Create firstIndexRequestReplica = new Engine.Create(null, analyzer, newUid("1"), doc, firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||||
|
replicaEngine.create(firstIndexRequestReplica);
|
||||||
|
assertThat(firstIndexRequestReplica.version(), equalTo(1l));
|
||||||
|
|
||||||
|
canHaveDuplicates = false;
|
||||||
|
Engine.Create secondIndexRequest = new Engine.Create(null, analyzer, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||||
|
try {
|
||||||
|
engine.create(secondIndexRequest);
|
||||||
|
fail();
|
||||||
|
} catch (DocumentAlreadyExistsException e) {
|
||||||
|
// we can ignore the exception. In case this happens because the retry request arrived first then this error will not be sent back anyway.
|
||||||
|
// in any other case this is an actual error
|
||||||
|
}
|
||||||
|
engine.refresh("test", true);
|
||||||
|
Engine.Searcher searcher = engine.acquireSearcher("test");
|
||||||
|
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||||
|
assertThat(topDocs.totalHits, equalTo(1));
|
||||||
|
|
||||||
|
Engine.Create secondIndexRequestReplica = new Engine.Create(null, analyzer, newUid("1"), doc, firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||||
|
try {
|
||||||
|
replicaEngine.create(secondIndexRequestReplica);
|
||||||
|
fail();
|
||||||
|
} catch (VersionConflictEngineException e) {
|
||||||
|
// we ignore version conflicts on replicas, see TransportShardReplicationOperationAction.ignoreReplicaException.
|
||||||
|
}
|
||||||
|
replicaEngine.refresh("test", true);
|
||||||
|
Engine.Searcher replicaSearcher = replicaEngine.acquireSearcher("test");
|
||||||
|
topDocs = replicaSearcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||||
|
assertThat(topDocs.totalHits, equalTo(1));
|
||||||
|
searcher.close();
|
||||||
|
replicaSearcher.close();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,6 @@ public class ExceptionRetryTests extends ElasticsearchIntegrationTest {
|
||||||
* see https://github.com/elasticsearch/elasticsearch/issues/8788
|
* see https://github.com/elasticsearch/elasticsearch/issues/8788
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elasticsearch/elasticsearch/issues/8788")
|
|
||||||
public void testRetryDueToExceptionOnNetworkLayer() throws ExecutionException, InterruptedException, IOException {
|
public void testRetryDueToExceptionOnNetworkLayer() throws ExecutionException, InterruptedException, IOException {
|
||||||
final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
|
final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
|
||||||
int numDocs = scaledRandomIntBetween(100, 1000);
|
int numDocs = scaledRandomIntBetween(100, 1000);
|
||||||
|
|
Loading…
Reference in New Issue