diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 30f38230bc9..21dd799122e 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -415,7 +415,7 @@ public class TransportShardBulkAction extends TransportWriteAction entry : writer.getLiveCommitData()) { @@ -1071,6 +1093,7 @@ public class InternalEngine extends Engine { } private NoOpResult innerNoOp(final NoOp noOp) throws IOException { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); assert noOp.seqNo() > SequenceNumbersService.NO_OPS_PERFORMED; final long seqNo = noOp.seqNo(); try { diff --git a/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java b/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java index a8a983ecde8..52e3001da84 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java @@ -23,22 +23,15 @@ import java.util.Objects; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; public class SourceToParse { - public static SourceToParse source(String index, String type, String id, BytesReference source, XContentType contentType) { - return source(Origin.PRIMARY, index, type, id, source, contentType); - } - - public static SourceToParse source(Origin origin, String index, String type, String id, BytesReference source, + public static SourceToParse source(String index, String type, String id, BytesReference source, XContentType contentType) { - return new SourceToParse(origin, index, type, id, source, contentType); + return new SourceToParse(index, type, id, source, contentType); } - private final Origin origin; - private final BytesReference source; private final String index; @@ -53,8 +46,7 @@ public class SourceToParse { private XContentType xContentType; - private SourceToParse(Origin origin, String index, String type, String id, BytesReference source, XContentType xContentType) { - this.origin = Objects.requireNonNull(origin); + private SourceToParse(String index, String type, String id, BytesReference source, XContentType xContentType) { this.index = Objects.requireNonNull(index); this.type = Objects.requireNonNull(type); this.id = Objects.requireNonNull(id); @@ -64,10 +56,6 @@ public class SourceToParse { this.xContentType = Objects.requireNonNull(xContentType); } - public Origin origin() { - return origin; - } - public BytesReference source() { return this.source; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 032c033f175..5d5e17c1929 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -364,6 +364,8 @@ final class StoreRecovery { logger.debug("failed to list file details", e); } indexShard.performTranslogRecovery(indexShouldExists); + assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; + indexShard.getEngine().fillSequenceNumberHistory(indexShard.getPrimaryTerm()); } indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 3792872c9e3..f1b981e14d2 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -114,6 +114,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -191,7 +192,7 @@ import static org.hamcrest.Matchers.nullValue; public class InternalEngineTests extends ESTestCase { - protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 1); + protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0); private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); protected ThreadPool threadPool; @@ -1956,7 +1957,7 @@ public class InternalEngineTests extends ESTestCase { @Override public void append(LogEvent event) { final String formattedMessage = event.getMessage().getFormattedMessage(); - if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][1] ")) { + if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][0] ")) { if (event.getLoggerName().endsWith(".IW") && formattedMessage.contains("IW: apply all deletes during flush")) { sawIndexWriterMessage = true; @@ -2335,7 +2336,7 @@ public class InternalEngineTests extends ESTestCase { private Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, boolean isRetry) { - return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, + return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); } @@ -3836,4 +3837,79 @@ public class InternalEngineTests extends ESTestCase { } } + public void testFillUpSequenceIdGapsOnRecovery() throws IOException { + final int docs = randomIntBetween(1, 32); + int numDocsOnReplica = 0; + long maxSeqIDOnReplica = -1; + long checkpointOnReplica; + try { + for (int i = 0; i < docs; i++) { + final String docId = Integer.toString(i); + final ParsedDocument doc = + testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null); + Engine.Index primaryResponse = indexForDoc(doc); + Engine.IndexResult indexResult = engine.index(primaryResponse); + if (randomBoolean()) { + numDocsOnReplica++; + maxSeqIDOnReplica = indexResult.getSeqNo(); + replicaEngine.index(replicaIndexForDoc(doc, 1, indexResult.getSeqNo(), false)); + } + } + checkpointOnReplica = replicaEngine.seqNoService().getLocalCheckpoint(); + } finally { + IOUtils.close(replicaEngine); + } + + + boolean flushed = false; + Engine recoveringEngine = null; + try { + assertEquals(docs-1, engine.seqNoService().getMaxSeqNo()); + assertEquals(docs-1, engine.seqNoService().getLocalCheckpoint()); + assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo()); + assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint()); + recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().totalOperations()); + recoveringEngine.recoverFromTranslog(); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + assertEquals((maxSeqIDOnReplica+1) - numDocsOnReplica, recoveringEngine.fillSequenceNumberHistory(2)); + + // now snapshot the tlog and ensure the primary term is updated + Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot(); + assertTrue((maxSeqIDOnReplica+1) - numDocsOnReplica <= snapshot.totalOperations()); + Translog.Operation operation; + while((operation = snapshot.next()) != null) { + if (operation.opType() == Translog.Operation.Type.NO_OP) { + assertEquals(2, operation.primaryTerm()); + } else { + assertEquals(1, operation.primaryTerm()); + } + + } + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + if ((flushed = randomBoolean())) { + recoveringEngine.flush(true, true); + } + } finally { + IOUtils.close(recoveringEngine); + } + + // now do it again to make sure we preserve values etc. + try { + recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + if (flushed) { + assertEquals(0, recoveringEngine.getTranslog().totalOperations()); + } + recoveringEngine.recoverFromTranslog(); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + assertEquals(0, recoveringEngine.fillSequenceNumberHistory(3)); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + } finally { + IOUtils.close(recoveringEngine); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index eccd958c36e..3f01a0c0a9a 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -77,6 +78,7 @@ import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbersService; @@ -896,6 +898,46 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(newShard); } + /* This test just verifies that we fill up local checkpoint up to max seen seqID on primary recovery */ + public void testRecoverFromStoreWithNoOps() throws IOException { + final IndexShard shard = newStartedShard(true); + indexDoc(shard, "test", "0"); + Engine.Index test = indexDoc(shard, "test", "1"); + // start a replica shard and index the second doc + final IndexShard otherShard = newStartedShard(false); + test = otherShard.prepareIndexOnReplica( + SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(), + XContentType.JSON), + 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + otherShard.index(test); + + final ShardRouting primaryShardRouting = shard.routingEntry(); + IndexShard newShard = reinitShard(otherShard, ShardRoutingHelper.initWithSameId(primaryShardRouting, + RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE)); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); + assertTrue(newShard.recoverFromStore()); + assertEquals(1, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(1, newShard.recoveryState().getTranslog().totalOperations()); + assertEquals(1, newShard.recoveryState().getTranslog().totalOperationsOnStart()); + assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + Translog.Snapshot snapshot = newShard.getTranslog().newSnapshot(); + Translog.Operation operation; + int numNoops = 0; + while((operation = snapshot.next()) != null) { + if (operation.opType() == Translog.Operation.Type.NO_OP) { + numNoops++; + assertEquals(1, operation.primaryTerm()); + assertEquals(0, operation.seqNo()); + } + } + assertEquals(1, numNoops); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + assertDocCount(newShard, 1); + assertDocCount(shard, 2); + closeShards(newShard, shard); + } + public void testRecoverFromCleanStore() throws IOException { final IndexShard shard = newStartedShard(true); indexDoc(shard, "test", "0"); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index bbe4d8ed12e..261e53064fe 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -44,7 +44,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { long seqNo = 0; for (int i = 0; i < docs; i++) { Engine.Index indexOp = replica.prepareIndexOnReplica( - SourceToParse.source(SourceToParse.Origin.REPLICA, index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON), + SourceToParse.source(index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON), seqNo++, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); replica.index(indexOp); if (rarely()) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 4062666ddbb..65576dcf0a2 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -484,7 +484,7 @@ public abstract class IndexShardTestCase extends ESTestCase { final Engine.Index index; if (shard.routingEntry().primary()) { index = shard.prepareIndexOnPrimary( - SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source), + SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType), Versions.MATCH_ANY, VersionType.INTERNAL, @@ -492,7 +492,7 @@ public abstract class IndexShardTestCase extends ESTestCase { false); } else { index = shard.prepareIndexOnReplica( - SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source), + SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType), randomInt(1 << 10), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); }