diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index f0aede639e6..597b27eae4b 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -446,6 +446,7 @@ public class TransportShardBulkAction extends TransportWriteAction shard term [" + this.primaryTerm + "]"; + return prepareIndex(docMapper(source.type()), source, opSeqNo, opPrimaryTerm, version, versionType, Engine.Operation.Origin.REPLICA, autoGeneratedIdTimestamp, isRetry); } catch (Exception e) { verifyNotClosed(e); @@ -594,11 +595,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl versionType, Engine.Operation.Origin.PRIMARY); } - public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long primaryTerm, + public Engine.Delete prepareDeleteOnReplica(String type, String id, long opSeqNo, long opPrimaryTerm, long version, VersionType versionType) { verifyReplicationTarget(); + assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; final Term uid = extractUidForDelete(type, id); - return prepareDelete(type, id, uid, seqNo, primaryTerm, version, versionType, Engine.Operation.Origin.REPLICA); + return prepareDelete(type, id, uid, opSeqNo, opPrimaryTerm, version, versionType, Engine.Operation.Origin.REPLICA); } private static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, @@ -1875,10 +1877,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl if (operationPrimaryTerm > primaryTerm) { try { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { - assert operationPrimaryTerm > primaryTerm; + assert operationPrimaryTerm > primaryTerm : + "shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; primaryTerm = operationPrimaryTerm; + getEngine().getTranslog().rollGeneration(); }); - } catch (final InterruptedException | TimeoutException e) { + } catch (final Exception e) { onPermitAcquired.onFailure(e); return; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 016067259c1..fea26168efa 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ThreadedActionListener; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; @@ -70,7 +71,8 @@ final class IndexShardOperationPermits implements Closeable { * @throws TimeoutException if timed out waiting for in-flight operations to finish * @throws IndexShardClosedException if operation permit has been closed */ - public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException { + public void blockOperations(long timeout, TimeUnit timeUnit, CheckedRunnable onBlocked) throws + InterruptedException, TimeoutException, E { if (closed) { throw new IndexShardClosedException(shardId); } @@ -109,9 +111,9 @@ final class IndexShardOperationPermits implements Closeable { /** * Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided - * {@link ActionListener} will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)}, - * permit acquisition can be delayed. The provided ActionListener will then be called using the provided executor once operations are no - * longer blocked. + * {@link ActionListener} will be called on the calling thread. During calls of + * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided ActionListener will + * then be called using the provided executor once operations are no longer blocked. * * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed * @param executorOnDelay executor to use for delayed call diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 8d2be410413..7b6922e7867 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -910,11 +910,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp(); } - public Index(String type, String id, byte[] source) { + public Index(String type, String id, long seqNo, byte[] source) { this.type = type; this.id = id; this.source = new BytesArray(source); - this.seqNo = 0; + this.seqNo = seqNo; version = Versions.MATCH_ANY; versionType = VersionType.INTERNAL; routing = null; @@ -1037,9 +1037,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC @Override public String toString() { return "Index{" + - "id='" + id + '\'' + - ", type='" + type + '\'' + - '}'; + "id='" + id + '\'' + + ", type='" + type + '\'' + + ", seqNo=" + seqNo + + ", primaryTerm=" + primaryTerm + + '}'; } public long getAutoGeneratedIdTimestamp() { @@ -1079,8 +1081,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } /** utility for testing */ - public Delete(String type, String id, Term uid) { - this(type, id, uid, 0, 0, Versions.MATCH_ANY, VersionType.INTERNAL); + public Delete(String type, String id, long seqNo, Term uid) { + this(type, id, uid, seqNo, 0, Versions.MATCH_ANY, VersionType.INTERNAL); } public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) { @@ -1180,10 +1182,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC @Override public String toString() { return "Delete{" + - "uid=" + uid + - '}'; + "uid=" + uid + + ", seqNo=" + seqNo + + ", primaryTerm=" + primaryTerm + + '}'; } - } public static class NoOp implements Operation { @@ -1260,9 +1263,16 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return 31 * 31 * 31 + 31 * 31 * Long.hashCode(seqNo) + 31 * Long.hashCode(primaryTerm) + reason().hashCode(); } + @Override + public String toString() { + return "NoOp{" + + "seqNo=" + seqNo + + ", primaryTerm=" + primaryTerm + + ", reason='" + reason + '\'' + + '}'; + } } - public enum Durability { /** diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index daf9a44b666..4a98365e02f 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -24,7 +24,10 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.OutputStreamDataOutput; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.Assertions; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -39,6 +42,8 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; @@ -71,6 +76,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { // lock order synchronized(syncLock) -> synchronized(this) private final Object syncLock = new Object(); + private final Map> seenSequenceNumbers; + private TranslogWriter( final ChannelFactory channelFactory, final ShardId shardId, @@ -90,6 +97,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { assert initialCheckpoint.maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo; this.maxSeqNo = initialCheckpoint.maxSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; + this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; } static int getHeaderLength(String translogUUID) { @@ -195,9 +203,30 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { operationCounter++; + assert assertNoSeqNumberConflict(seqNo, data); + return new Translog.Location(generation, offset, data.length()); } + private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReference data) throws IOException { + if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + // nothing to do + } else if (seenSequenceNumbers.containsKey(seqNo)) { + final Tuple previous = seenSequenceNumbers.get(seqNo); + if (previous.v1().equals(data) == false) { + Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput())); + Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput())); + throw new AssertionError( + "seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " + + "prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2()); + } + } else { + seenSequenceNumbers.put(seqNo, + new Tuple<>(new BytesArray(data.toBytesRef(), true), new RuntimeException("stack capture previous op"))); + } + return true; + } + /** * write all buffered ops to disk and fsync file. * diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 20406acc9b6..39a4bb2feca 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -631,16 +631,17 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { IndexMetaData metaData = indexMetaData(); IndexShard shard = newStartedShard(false); - DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id", 1, 17, 1, randomBoolean()); + DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id", 17, 0, 1, randomBoolean()); IndexRequest request = new IndexRequest("index", "type", "id") .source(Requests.INDEX_CONTENT_TYPE, "field", "value"); Engine.Index op = TransportShardBulkAction.prepareIndexOperationOnReplica( - primaryResponse, request, shard); + primaryResponse, request, shard.getPrimaryTerm(), shard); assertThat(op.version(), equalTo(primaryResponse.getVersion())); assertThat(op.seqNo(), equalTo(primaryResponse.getSeqNo())); assertThat(op.versionType(), equalTo(VersionType.EXTERNAL)); + assertThat(op.primaryTerm(), equalTo(shard.getPrimaryTerm())); closeShards(shard); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e72f68e1d2b..9f4a33ada6e 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2730,7 +2730,7 @@ public class InternalEngineTests extends ESTestCase { new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); - translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8")))); assertEquals(generation.translogFileGeneration, translog.currentFileGeneration()); translog.close(); @@ -3015,8 +3015,8 @@ public class InternalEngineTests extends ESTestCase { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(1, topDocs.totalHits); } - operation = randomAppendOnly(doc, false, 1); - retry = randomAppendOnly(doc, true, 1); + operation = appendOnlyPrimary(doc, false, 1); + retry = appendOnlyPrimary(doc, true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertNotNull(indexResult.getTranslogLocation()); @@ -3328,10 +3328,11 @@ public class InternalEngineTests extends ESTestCase { int numDocs = randomIntBetween(1000, 10000); assertEquals(0, engine.getNumVersionLookups()); assertEquals(0, engine.getNumIndexVersionsLookups()); + boolean primary = randomBoolean(); List docs = new ArrayList<>(); for (int i = 0; i < numDocs; i++) { final ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - Engine.Index index = randomAppendOnly(doc, false, i); + Engine.Index index = primary ? appendOnlyPrimary(doc, false, i) : appendOnlyReplica(doc, false, i, i); docs.add(index); } Collections.shuffle(docs, random()); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index daaff4b1fc4..a113132351b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -122,8 +122,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Function; import java.util.function.LongFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -338,11 +336,12 @@ public class IndexShardTests extends IndexShardTestCase { public void testOperationPermitOnReplicaShards() throws InterruptedException, ExecutionException, IOException, BrokenBarrierException { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; - + final boolean engineClosed; switch (randomInt(2)) { case 0: // started replica indexShard = newStartedShard(false); + engineClosed = false; break; case 1: { // initializing replica / primary @@ -353,6 +352,7 @@ public class IndexShardTests extends IndexShardTestCase { ShardRoutingState.INITIALIZING, relocating ? AllocationId.newRelocation(AllocationId.newInitializing()) : AllocationId.newInitializing()); indexShard = newShard(routing); + engineClosed = true; break; } case 2: { @@ -363,6 +363,7 @@ public class IndexShardTests extends IndexShardTestCase { true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); indexShard.updateRoutingEntry(routing); indexShard.relocated("test"); + engineClosed = false; break; } default: @@ -380,6 +381,7 @@ public class IndexShardTests extends IndexShardTestCase { } final long primaryTerm = indexShard.getPrimaryTerm(); + final long translogGen = engineClosed ? -1 : indexShard.getTranslog().getGeneration().translogFileGeneration; final Releasable operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm); assertEquals(1, indexShard.getActiveOperationsCount()); @@ -414,8 +416,9 @@ public class IndexShardTests extends IndexShardTestCase { { final AtomicBoolean onResponse = new AtomicBoolean(); - final AtomicBoolean onFailure = new AtomicBoolean(); + final AtomicReference onFailure = new AtomicReference<>(); final CyclicBarrier barrier = new CyclicBarrier(2); + final long newPrimaryTerm = primaryTerm + 1 + randomInt(20); // but you can not increment with a new primary term until the operations on the older primary term complete final Thread thread = new Thread(() -> { try { @@ -424,23 +427,29 @@ public class IndexShardTests extends IndexShardTestCase { throw new RuntimeException(e); } indexShard.acquireReplicaOperationPermit( - primaryTerm + 1 + randomInt(20), + newPrimaryTerm, new ActionListener() { @Override public void onResponse(Releasable releasable) { + assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); onResponse.set(true); releasable.close(); + finish(); + } + + @Override + public void onFailure(Exception e) { + onFailure.set(e); + finish(); + } + + private void finish() { try { barrier.await(); } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); } } - - @Override - public void onFailure(Exception e) { - onFailure.set(true); - } }, ThreadPool.Names.SAME); }); @@ -448,16 +457,25 @@ public class IndexShardTests extends IndexShardTestCase { barrier.await(); // our operation should be blocked until the previous operations complete assertFalse(onResponse.get()); - assertFalse(onFailure.get()); + assertNull(onFailure.get()); + assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); Releasables.close(operation1); // our operation should still be blocked assertFalse(onResponse.get()); - assertFalse(onFailure.get()); + assertNull(onFailure.get()); + assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); Releasables.close(operation2); barrier.await(); // now lock acquisition should have succeeded - assertTrue(onResponse.get()); - assertFalse(onFailure.get()); + assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); + if (engineClosed) { + assertFalse(onResponse.get()); + assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class)); + } else { + assertTrue(onResponse.get()); + assertNull(onFailure.get()); + assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1)); + } thread.join(); assertEquals(0, indexShard.getActiveOperationsCount()); } @@ -1046,7 +1064,7 @@ public class IndexShardTests extends IndexShardTestCase { test = otherShard.prepareIndexOnReplica( SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(), XContentType.JSON), - 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + 1, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); otherShard.index(test); final ShardRouting primaryShardRouting = shard.routingEntry(); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 26e8aa2ea18..0021139adb9 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -248,12 +248,12 @@ public class TranslogTests extends ESTestCase { Translog.Snapshot snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(0)); - addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1})); + addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1})); snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(ops.size())); - addToTranslogAndList(translog, ops, new Translog.Delete("test", "2", newUid("2"))); + addToTranslogAndList(translog, ops, new Translog.Delete("test", "2", 1, newUid("2"))); snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(ops.size())); @@ -316,7 +316,7 @@ public class TranslogTests extends ESTestCase { assertThat(stats.estimatedNumberOfOperations(), equalTo(0L)); } assertThat((int) firstOperationPosition, greaterThan(CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC))); - translog.add(new Translog.Index("test", "1", new byte[]{1})); + translog.add(new Translog.Index("test", "1", 0, new byte[]{1})); { final TranslogStats stats = stats(); @@ -324,23 +324,21 @@ public class TranslogTests extends ESTestCase { assertThat(stats.getTranslogSizeInBytes(), equalTo(97L)); } - translog.add(new Translog.Delete("test", "2", newUid("2"))); + translog.add(new Translog.Delete("test", "2", 1, newUid("2"))); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(2L)); assertThat(stats.getTranslogSizeInBytes(), equalTo(139L)); } - translog.add(new Translog.Delete("test", "3", newUid("3"))); + translog.add(new Translog.Delete("test", "3", 2, newUid("3"))); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(3L)); assertThat(stats.getTranslogSizeInBytes(), equalTo(181L)); } - final long seqNo = 1; - final long primaryTerm = 1; - translog.add(new Translog.NoOp(seqNo, primaryTerm, randomAlphaOfLength(16))); + translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16))); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4L)); @@ -416,7 +414,7 @@ public class TranslogTests extends ESTestCase { Translog.Snapshot snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(0)); - addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1})); + addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1})); snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); @@ -436,15 +434,15 @@ public class TranslogTests extends ESTestCase { Translog.Snapshot snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(0)); - addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1})); + addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1})); Translog.Snapshot snapshot1 = translog.newSnapshot(); - addToTranslogAndList(translog, ops, new Translog.Index("test", "2", new byte[]{2})); + addToTranslogAndList(translog, ops, new Translog.Index("test", "2", 1, new byte[]{2})); assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0))); translog.prepareCommit(); - addToTranslogAndList(translog, ops, new Translog.Index("test", "3", new byte[]{3})); + addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, new byte[]{3})); try (Translog.View view = translog.newView()) { Translog.Snapshot snapshot2 = translog.newSnapshot(); @@ -456,7 +454,7 @@ public class TranslogTests extends ESTestCase { public void testSnapshotOnClosedTranslog() throws IOException { assertTrue(Files.exists(translogDir.resolve(Translog.getFilename(1)))); - translog.add(new Translog.Index("test", "1", new byte[]{1})); + translog.add(new Translog.Index("test", "1", 0, new byte[]{1})); translog.close(); try { Translog.Snapshot snapshot = translog.newSnapshot(); @@ -501,10 +499,11 @@ public class TranslogTests extends ESTestCase { Thread[] threads = new Thread[threadCount]; final Exception[] threadExceptions = new Exception[threadCount]; + final AtomicLong seqNoGenerator = new AtomicLong(); final CountDownLatch downLatch = new CountDownLatch(1); for (int i = 0; i < threadCount; i++) { final int threadId = i; - threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions); + threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, seqNoGenerator, threadExceptions); threads[i].setDaemon(true); threads[i].start(); } @@ -566,7 +565,7 @@ public class TranslogTests extends ESTestCase { int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations; op++) { String ascii = randomAlphaOfLengthBetween(1, 50); - locations.add(translog.add(new Translog.Index("test", "" + op, ascii.getBytes("UTF-8")))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, ascii.getBytes("UTF-8")))); } translog.sync(); @@ -592,7 +591,7 @@ public class TranslogTests extends ESTestCase { int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations; op++) { String ascii = randomAlphaOfLengthBetween(1, 50); - locations.add(translog.add(new Translog.Index("test", "" + op, ascii.getBytes("UTF-8")))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, ascii.getBytes("UTF-8")))); } translog.sync(); @@ -655,7 +654,7 @@ public class TranslogTests extends ESTestCase { public void testVerifyTranslogIsNotDeleted() throws IOException { assertFileIsPresent(translog, 1); - translog.add(new Translog.Index("test", "1", new byte[]{1})); + translog.add(new Translog.Index("test", "1", 0, new byte[]{1})); Translog.Snapshot snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(1)); assertFileIsPresent(translog, 1); @@ -702,13 +701,13 @@ public class TranslogTests extends ESTestCase { switch (type) { case CREATE: case INDEX: - op = new Translog.Index("type", "" + id, new byte[]{(byte) id}); + op = new Translog.Index("type", "" + id, id, new byte[]{(byte) id}); break; case DELETE: - op = new Translog.Delete("test", Long.toString(id), newUid(Long.toString(id))); + op = new Translog.Delete("test", Long.toString(id), id, newUid(Long.toString(id))); break; case NO_OP: - op = new Translog.NoOp(id, id, Long.toString(id)); + op = new Translog.NoOp(id, 1, Long.toString(id)); break; default: throw new AssertionError("unsupported operation type [" + type + "]"); @@ -853,12 +852,15 @@ public class TranslogTests extends ESTestCase { int translogOperations = randomIntBetween(10, 100); int count = 0; for (int op = 0; op < translogOperations; op++) { - final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))); + int seqNo = ++count; + final Translog.Location location = + translog.add(new Translog.Index("test", "" + op, seqNo, Integer.toString(seqNo).getBytes(Charset.forName("UTF-8")))); if (randomBoolean()) { assertTrue("at least one operation pending", translog.syncNeeded()); assertTrue("this operation has not been synced", translog.ensureSynced(location)); assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced - translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))); + seqNo = ++count; + translog.add(new Translog.Index("test", "" + op, seqNo, Integer.toString(seqNo).getBytes(Charset.forName("UTF-8")))); assertTrue("one pending operation", translog.syncNeeded()); assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now assertTrue("we only synced a previous operation yet", translog.syncNeeded()); @@ -886,7 +888,8 @@ public class TranslogTests extends ESTestCase { if (rarely()) { translog.commit(translog.currentFileGeneration()); // do this first so that there is at least one pending tlog entry } - final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))); + final Translog.Location location = + translog.add(new Translog.Index("test", "" + op, op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))); locations.add(location); } Collections.shuffle(locations, random()); @@ -913,7 +916,8 @@ public class TranslogTests extends ESTestCase { int translogOperations = randomIntBetween(10, 100); int count = 0; for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))))); + locations.add( + translog.add(new Translog.Index("test", "" + op, op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))))); if (rarely() && translogOperations > op + 1) { translog.commit(translog.currentFileGeneration()); } @@ -949,7 +953,7 @@ public class TranslogTests extends ESTestCase { int lastSynced = -1; long lastSyncedGlobalCheckpoint = globalCheckpoint.get(); for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (randomBoolean()) { globalCheckpoint.set(globalCheckpoint.get() + randomIntBetween(1, 16)); } @@ -960,7 +964,8 @@ public class TranslogTests extends ESTestCase { } } assertEquals(translogOperations, translog.totalOperations()); - translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index( + "test", "" + translogOperations, translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8")))); final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)); try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) { @@ -1077,7 +1082,7 @@ public class TranslogTests extends ESTestCase { int minUncommittedOp = -1; final boolean commitOften = randomBoolean(); for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); final boolean commit = commitOften ? frequently() : rarely(); if (commit && op < translogOperations - 1) { translog.commit(translog.currentFileGeneration()); @@ -1116,7 +1121,7 @@ public class TranslogTests extends ESTestCase { Translog.TranslogGeneration translogGeneration = null; final boolean sync = randomBoolean(); for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (op == prepareOp) { translogGeneration = translog.getGeneration(); translog.prepareCommit(); @@ -1139,7 +1144,7 @@ public class TranslogTests extends ESTestCase { for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); } } if (randomBoolean()) { // recover twice @@ -1152,7 +1157,7 @@ public class TranslogTests extends ESTestCase { for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); } } } @@ -1166,7 +1171,7 @@ public class TranslogTests extends ESTestCase { Translog.TranslogGeneration translogGeneration = null; final boolean sync = randomBoolean(); for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (op == prepareOp) { translogGeneration = translog.getGeneration(); translog.prepareCommit(); @@ -1193,7 +1198,7 @@ public class TranslogTests extends ESTestCase { for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); } } @@ -1208,7 +1213,7 @@ public class TranslogTests extends ESTestCase { for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); } } } @@ -1221,7 +1226,7 @@ public class TranslogTests extends ESTestCase { Translog.TranslogGeneration translogGeneration = null; final boolean sync = randomBoolean(); for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (op == prepareOp) { translogGeneration = translog.getGeneration(); translog.prepareCommit(); @@ -1240,7 +1245,9 @@ public class TranslogTests extends ESTestCase { try (Translog ignored = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { fail("corrupted"); } catch (IllegalStateException ex) { - assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, numOps=55, generation=2, minSeqNo=0, maxSeqNo=0, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2}", ex.getMessage()); + assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, " + + "numOps=55, generation=2, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, " + + "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2}", ex.getMessage()); } Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { @@ -1252,7 +1259,7 @@ public class TranslogTests extends ESTestCase { for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); } } } @@ -1262,7 +1269,7 @@ public class TranslogTests extends ESTestCase { List ops = new ArrayList<>(); int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations; op++) { - Translog.Index test = new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))); + Translog.Index test = new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))); ops.add(test); } Translog.writeOperations(out, ops); @@ -1277,8 +1284,8 @@ public class TranslogTests extends ESTestCase { int translogOperations = randomIntBetween(10, 100); try (Translog translog2 = create(createTempDir())) { for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); - locations2.add(translog2.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations2.add(translog2.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); } int iters = randomIntBetween(10, 100); for (int i = 0; i < iters; i++) { @@ -1304,7 +1311,7 @@ public class TranslogTests extends ESTestCase { int translogOperations = randomIntBetween(1, 10); int firstUncommitted = 0; for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (randomBoolean()) { translog.commit(translog.currentFileGeneration()); firstUncommitted = op + 1; @@ -1333,13 +1340,13 @@ public class TranslogTests extends ESTestCase { } public void testFailOnClosedWrite() throws IOException { - translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "1", 0, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); translog.close(); try { - translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "1", 0, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); fail("closed"); } catch (AlreadyClosedException ex) { - // all is welll + // all is well } } @@ -1353,9 +1360,10 @@ public class TranslogTests extends ESTestCase { Thread[] threads = new Thread[threadCount]; final Exception[] threadExceptions = new Exception[threadCount]; final CountDownLatch downLatch = new CountDownLatch(1); + final AtomicLong seqNoGenerator = new AtomicLong(); for (int i = 0; i < threadCount; i++) { final int threadId = i; - threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions); + threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, seqNoGenerator, threadExceptions); threads[i].setDaemon(true); threads[i].start(); } @@ -1380,13 +1388,16 @@ public class TranslogTests extends ESTestCase { private final Collection writtenOperations; private final Exception[] threadExceptions; private final Translog translog; + private final AtomicLong seqNoGenerator; - TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, Collection writtenOperations, Exception[] threadExceptions) { + TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, + Collection writtenOperations, AtomicLong seqNoGenerator, Exception[] threadExceptions) { this.translog = translog; this.downLatch = downLatch; this.opsPerThread = opsPerThread; this.threadId = threadId; this.writtenOperations = writtenOperations; + this.seqNoGenerator = seqNoGenerator; this.threadExceptions = threadExceptions; } @@ -1400,20 +1411,20 @@ public class TranslogTests extends ESTestCase { switch (type) { case CREATE: case INDEX: - op = new Translog.Index("test", threadId + "_" + opCount, + op = new Translog.Index("test", threadId + "_" + opCount, seqNoGenerator.getAndIncrement(), randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8")); break; case DELETE: op = new Translog.Delete( "test", threadId + "_" + opCount, new Term("_uid", threadId + "_" + opCount), - opCount, + seqNoGenerator.getAndIncrement(), 0, 1 + randomInt(100000), randomFrom(VersionType.values())); break; case NO_OP: - op = new Translog.NoOp(randomNonNegativeLong(), randomNonNegativeLong(), randomAlphaOfLength(16)); + op = new Translog.NoOp(seqNoGenerator.getAndIncrement(), randomNonNegativeLong(), randomAlphaOfLength(16)); break; default: throw new AssertionError("unsupported operation type [" + type + "]"); @@ -1447,7 +1458,8 @@ public class TranslogTests extends ESTestCase { boolean failed = false; while (failed == false) { try { - locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add( + new Translog.Index("test", "" + opsSynced, opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); translog.sync(); opsSynced++; } catch (MockDirectoryWrapper.FakeIOException ex) { @@ -1467,7 +1479,8 @@ public class TranslogTests extends ESTestCase { fail.failNever(); if (randomBoolean()) { try { - locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add( + new Translog.Index("test", "" + opsSynced, opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); fail("we are already closed"); } catch (AlreadyClosedException ex) { assertNotNull(ex.getCause()); @@ -1517,9 +1530,10 @@ public class TranslogTests extends ESTestCase { public void testTranslogOpsCountIsCorrect() throws IOException { List locations = new ArrayList<>(); int numOps = randomIntBetween(100, 200); - LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly + LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer borders regularly for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { - locations.add(translog.add(new Translog.Index("test", "" + opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add( + new Translog.Index("test", "" + opsAdded, opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))))); Translog.Snapshot snapshot = this.translog.newSnapshot(); assertEquals(opsAdded + 1, snapshot.totalOperations()); for (int i = 0; i < opsAdded; i++) { @@ -1536,10 +1550,11 @@ public class TranslogTests extends ESTestCase { TranslogConfig config = getTranslogConfig(tempDir); Translog translog = getFailableTranslog(fail, config, false, true, null); LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly - translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "1", 0, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); fail.failAlways(); try { - Translog.Location location = translog.add(new Translog.Index("test", "2", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); + Translog.Location location = translog.add( + new Translog.Index("test", "2", 1, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); if (randomBoolean()) { translog.ensureSynced(location); } else { @@ -1568,10 +1583,11 @@ public class TranslogTests extends ESTestCase { final Exception[] threadExceptions = new Exception[threadCount]; final CountDownLatch downLatch = new CountDownLatch(1); final CountDownLatch added = new CountDownLatch(randomIntBetween(10, 100)); + final AtomicLong seqNoGenerator = new AtomicLong(); List writtenOperations = Collections.synchronizedList(new ArrayList<>()); for (int i = 0; i < threadCount; i++) { final int threadId = i; - threads[i] = new TranslogThread(translog, downLatch, 200, threadId, writtenOperations, threadExceptions) { + threads[i] = new TranslogThread(translog, downLatch, 200, threadId, writtenOperations, seqNoGenerator, threadExceptions) { @Override protected Translog.Location add(Translog.Operation op) throws IOException { Translog.Location add = super.add(op); @@ -1794,7 +1810,7 @@ public class TranslogTests extends ESTestCase { Path tempDir = createTempDir(); TranslogConfig config = getTranslogConfig(tempDir); Translog translog = new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); - translog.add(new Translog.Index("test", "boom", "boom".getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "boom", 0, "boom".getBytes(Charset.forName("UTF-8")))); Translog.TranslogGeneration generation = translog.getGeneration(); translog.close(); try { @@ -1812,7 +1828,7 @@ public class TranslogTests extends ESTestCase { } public void testRecoverWithUnbackedNextGen() throws IOException { - translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); Translog.TranslogGeneration translogGeneration = translog.getGeneration(); translog.close(); TranslogConfig config = translog.getConfig(); @@ -1830,7 +1846,7 @@ public class TranslogTests extends ESTestCase { assertNotNull("operation " + i + " must be non-null", next); assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); } - tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); } try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertNotNull(translogGeneration); @@ -1845,7 +1861,7 @@ public class TranslogTests extends ESTestCase { } public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException { - translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); Translog.TranslogGeneration translogGeneration = translog.getGeneration(); translog.close(); TranslogConfig config = translog.getConfig(); @@ -1865,7 +1881,7 @@ public class TranslogTests extends ESTestCase { } public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { - translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); Translog.TranslogGeneration translogGeneration = translog.getGeneration(); translog.close(); TranslogConfig config = translog.getConfig(); @@ -1885,7 +1901,7 @@ public class TranslogTests extends ESTestCase { assertNotNull("operation " + i + " must be non-null", next); assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); } - tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); } try { @@ -1923,7 +1939,7 @@ public class TranslogTests extends ESTestCase { LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { String doc = lineFileDocs.nextDoc().toString(); - failableTLog.add(new Translog.Index("test", "" + opsAdded, doc.getBytes(Charset.forName("UTF-8")))); + failableTLog.add(new Translog.Index("test", "" + opsAdded, opsAdded, doc.getBytes(Charset.forName("UTF-8")))); unsynced.add(doc); if (randomBoolean()) { failableTLog.sync(); @@ -2034,16 +2050,16 @@ public class TranslogTests extends ESTestCase { * Tests that closing views after the translog is fine and we can reopen the translog */ public void testPendingDelete() throws IOException { - translog.add(new Translog.Index("test", "1", new byte[]{1})); + translog.add(new Translog.Index("test", "1", 0, new byte[]{1})); translog.prepareCommit(); Translog.TranslogGeneration generation = translog.getGeneration(); TranslogConfig config = translog.getConfig(); translog.close(); translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); - translog.add(new Translog.Index("test", "2", new byte[]{2})); + translog.add(new Translog.Index("test", "2", 1, new byte[]{2})); translog.prepareCommit(); Translog.View view = translog.newView(); - translog.add(new Translog.Index("test", "3", new byte[]{3})); + translog.add(new Translog.Index("test", "3", 2, new byte[]{3})); translog.close(); IOUtils.close(view); translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); @@ -2197,7 +2213,7 @@ public class TranslogTests extends ESTestCase { for (final Long seqNo : shuffledSeqNos) { seqNos.add(Tuple.tuple(seqNo, terms.computeIfAbsent(seqNo, k -> 0L))); Long repeatingTermSeqNo = randomFrom(seqNos.stream().map(Tuple::v1).collect(Collectors.toList())); - seqNos.add(Tuple.tuple(repeatingTermSeqNo, terms.computeIfPresent(repeatingTermSeqNo, (s, t) -> t + 1))); + seqNos.add(Tuple.tuple(repeatingTermSeqNo, terms.get(repeatingTermSeqNo))); } for (final Tuple tuple : seqNos) { diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 1c588caadcd..4f0fec4c85e 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -61,7 +61,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { for (int i = 0; i < docs; i++) { Engine.Index indexOp = replica.prepareIndexOnReplica( SourceToParse.source(index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON), - seqNo++, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + seqNo++, replica.getPrimaryTerm(), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); replica.index(indexOp); if (rarely()) { // insert a gap diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index dbfa77635ab..515e01c0409 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -489,7 +489,8 @@ public abstract class IndexShardTestCase extends ESTestCase { index = shard.prepareIndexOnReplica( SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType), - randomInt(1 << 10), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + shard.seqNoStats().getMaxSeqNo() + 1, shard.getPrimaryTerm(), 0, + VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); } shard.index(index); return index;