Guarantee that translog generations are seqNo conflict free (#24825)

With #24779 in place, we can now guaranteed that a single translog generation file will never have a sequence number conflict that needs to be resolved by looking at primary terms. These conflicts can a occur when a replica contains an operation which isn't part of the history of a newly promoted primary. That primary can then assign a different operation to the same slot and replicate it to the replica.

PS. Knowing that each generation file is conflict free will simplifying repairing these conflicts when we read from the translog.

PPS. This PR also fixes some bugs in the piping of primary terms in the bulk shard action. These bugs are a result of the legacy of IndexRequest/DeleteRequest being a ReplicationRequest. We need to change that as a follow up.

Relates to #10708
This commit is contained in:
Boaz Leskes 2017-05-24 13:26:39 +02:00 committed by GitHub
parent ac6a6d6fe8
commit 6bc5b1dbcd
11 changed files with 205 additions and 121 deletions

View File

@ -446,6 +446,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
final long primaryTerm = request.primaryTerm();
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
final Engine.Result operationResult;
@ -457,10 +458,12 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
operationResult = executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, replica);
operationResult =
executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, primaryTerm, replica);
break;
case DELETE:
operationResult = executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, replica);
operationResult =
executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, primaryTerm, replica);
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
@ -528,14 +531,12 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
private static Engine.IndexResult executeIndexRequestOnReplica(
DocWriteResponse primaryResponse,
IndexRequest request,
IndexShard replica) throws IOException {
private static Engine.IndexResult executeIndexRequestOnReplica(DocWriteResponse primaryResponse, IndexRequest request,
long primaryTerm, IndexShard replica) throws IOException {
final Engine.Index operation;
try {
operation = prepareIndexOperationOnReplica(primaryResponse, request, replica);
operation = prepareIndexOperationOnReplica(primaryResponse, request, primaryTerm, replica);
} catch (MapperParsingException e) {
return new Engine.IndexResult(e, primaryResponse.getVersion(), primaryResponse.getSeqNo());
}
@ -553,6 +554,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
static Engine.Index prepareIndexOperationOnReplica(
DocWriteResponse primaryResponse,
IndexRequest request,
long primaryTerm,
IndexShard replica) {
final ShardId shardId = replica.shardId();
@ -565,7 +567,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
assert versionType.validateVersionForWrites(version);
return replica.prepareIndexOnReplica(sourceToParse, seqNo, version, versionType,
return replica.prepareIndexOnReplica(sourceToParse, seqNo, primaryTerm, version, versionType,
request.getAutoGeneratedTimestamp(), request.isRetry());
}
@ -647,7 +649,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteResponse primaryResponse, DeleteRequest request,
IndexShard replica) throws Exception {
final long primaryTerm, IndexShard replica) throws Exception {
if (replica.indexSettings().isSingleType()) {
// We need to wait for the replica to have the mappings
Mapping update;
@ -667,7 +669,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
final long version = primaryResponse.getVersion();
assert versionType.validateVersionForWrites(version);
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
primaryResponse.getSeqNo(), request.primaryTerm(), version, versionType);
primaryResponse.getSeqNo(), primaryTerm, version, versionType);
return replica.delete(delete);
}

View File

@ -522,11 +522,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long version, VersionType versionType, long autoGeneratedIdTimestamp,
boolean isRetry) {
public Engine.Index prepareIndexOnReplica(SourceToParse source, long opSeqNo, long opPrimaryTerm, long version, VersionType versionType,
long autoGeneratedIdTimestamp, boolean isRetry) {
try {
verifyReplicationTarget();
return prepareIndex(docMapper(source.type()), source, seqNo, primaryTerm, version, versionType,
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
return prepareIndex(docMapper(source.type()), source, opSeqNo, opPrimaryTerm, version, versionType,
Engine.Operation.Origin.REPLICA, autoGeneratedIdTimestamp, isRetry);
} catch (Exception e) {
verifyNotClosed(e);
@ -594,11 +595,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
versionType, Engine.Operation.Origin.PRIMARY);
}
public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long primaryTerm,
public Engine.Delete prepareDeleteOnReplica(String type, String id, long opSeqNo, long opPrimaryTerm,
long version, VersionType versionType) {
verifyReplicationTarget();
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
final Term uid = extractUidForDelete(type, id);
return prepareDelete(type, id, uid, seqNo, primaryTerm, version, versionType, Engine.Operation.Origin.REPLICA);
return prepareDelete(type, id, uid, opSeqNo, opPrimaryTerm, version, versionType, Engine.Operation.Origin.REPLICA);
}
private static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version,
@ -1875,10 +1877,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (operationPrimaryTerm > primaryTerm) {
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
assert operationPrimaryTerm > primaryTerm;
assert operationPrimaryTerm > primaryTerm :
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
primaryTerm = operationPrimaryTerm;
getEngine().getTranslog().rollGeneration();
});
} catch (final InterruptedException | TimeoutException e) {
} catch (final Exception e) {
onPermitAcquired.onFailure(e);
return;
}

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
@ -70,7 +71,8 @@ final class IndexShardOperationPermits implements Closeable {
* @throws TimeoutException if timed out waiting for in-flight operations to finish
* @throws IndexShardClosedException if operation permit has been closed
*/
public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException {
public <E extends Exception> void blockOperations(long timeout, TimeUnit timeUnit, CheckedRunnable<E> onBlocked) throws
InterruptedException, TimeoutException, E {
if (closed) {
throw new IndexShardClosedException(shardId);
}
@ -109,9 +111,9 @@ final class IndexShardOperationPermits implements Closeable {
/**
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
* {@link ActionListener} will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)},
* permit acquisition can be delayed. The provided ActionListener will then be called using the provided executor once operations are no
* longer blocked.
* {@link ActionListener} will be called on the calling thread. During calls of
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided ActionListener will
* then be called using the provided executor once operations are no longer blocked.
*
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param executorOnDelay executor to use for delayed call

View File

@ -910,11 +910,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp();
}
public Index(String type, String id, byte[] source) {
public Index(String type, String id, long seqNo, byte[] source) {
this.type = type;
this.id = id;
this.source = new BytesArray(source);
this.seqNo = 0;
this.seqNo = seqNo;
version = Versions.MATCH_ANY;
versionType = VersionType.INTERNAL;
routing = null;
@ -1037,9 +1037,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override
public String toString() {
return "Index{" +
"id='" + id + '\'' +
", type='" + type + '\'' +
'}';
"id='" + id + '\'' +
", type='" + type + '\'' +
", seqNo=" + seqNo +
", primaryTerm=" + primaryTerm +
'}';
}
public long getAutoGeneratedIdTimestamp() {
@ -1079,8 +1081,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
/** utility for testing */
public Delete(String type, String id, Term uid) {
this(type, id, uid, 0, 0, Versions.MATCH_ANY, VersionType.INTERNAL);
public Delete(String type, String id, long seqNo, Term uid) {
this(type, id, uid, seqNo, 0, Versions.MATCH_ANY, VersionType.INTERNAL);
}
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) {
@ -1180,10 +1182,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override
public String toString() {
return "Delete{" +
"uid=" + uid +
'}';
"uid=" + uid +
", seqNo=" + seqNo +
", primaryTerm=" + primaryTerm +
'}';
}
}
public static class NoOp implements Operation {
@ -1260,9 +1263,16 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return 31 * 31 * 31 + 31 * 31 * Long.hashCode(seqNo) + 31 * Long.hashCode(primaryTerm) + reason().hashCode();
}
@Override
public String toString() {
return "NoOp{" +
"seqNo=" + seqNo +
", primaryTerm=" + primaryTerm +
", reason='" + reason + '\'' +
'}';
}
}
public enum Durability {
/**

View File

@ -24,7 +24,10 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Assertions;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.seqno.SequenceNumbers;
@ -39,6 +42,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
@ -71,6 +76,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
// lock order synchronized(syncLock) -> synchronized(this)
private final Object syncLock = new Object();
private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;
private TranslogWriter(
final ChannelFactory channelFactory,
final ShardId shardId,
@ -90,6 +97,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
assert initialCheckpoint.maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo;
this.maxSeqNo = initialCheckpoint.maxSeqNo;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
}
static int getHeaderLength(String translogUUID) {
@ -195,9 +203,30 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
operationCounter++;
assert assertNoSeqNumberConflict(seqNo, data);
return new Translog.Location(generation, offset, data.length());
}
private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReference data) throws IOException {
if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
// nothing to do
} else if (seenSequenceNumbers.containsKey(seqNo)) {
final Tuple<BytesReference, Exception> previous = seenSequenceNumbers.get(seqNo);
if (previous.v1().equals(data) == false) {
Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput()));
Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput()));
throw new AssertionError(
"seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " +
"prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2());
}
} else {
seenSequenceNumbers.put(seqNo,
new Tuple<>(new BytesArray(data.toBytesRef(), true), new RuntimeException("stack capture previous op")));
}
return true;
}
/**
* write all buffered ops to disk and fsync file.
*

View File

@ -631,16 +631,17 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(false);
DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id", 1, 17, 1, randomBoolean());
DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id", 17, 0, 1, randomBoolean());
IndexRequest request = new IndexRequest("index", "type", "id")
.source(Requests.INDEX_CONTENT_TYPE, "field", "value");
Engine.Index op = TransportShardBulkAction.prepareIndexOperationOnReplica(
primaryResponse, request, shard);
primaryResponse, request, shard.getPrimaryTerm(), shard);
assertThat(op.version(), equalTo(primaryResponse.getVersion()));
assertThat(op.seqNo(), equalTo(primaryResponse.getSeqNo()));
assertThat(op.versionType(), equalTo(VersionType.EXTERNAL));
assertThat(op.primaryTerm(), equalTo(shard.getPrimaryTerm()));
closeShards(shard);
}

View File

@ -2730,7 +2730,7 @@ public class InternalEngineTests extends ESTestCase {
new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE),
null,
() -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8"))));
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
translog.close();
@ -3015,8 +3015,8 @@ public class InternalEngineTests extends ESTestCase {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits);
}
operation = randomAppendOnly(doc, false, 1);
retry = randomAppendOnly(doc, true, 1);
operation = appendOnlyPrimary(doc, false, 1);
retry = appendOnlyPrimary(doc, true, 1);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(indexResult.getTranslogLocation());
@ -3328,10 +3328,11 @@ public class InternalEngineTests extends ESTestCase {
int numDocs = randomIntBetween(1000, 10000);
assertEquals(0, engine.getNumVersionLookups());
assertEquals(0, engine.getNumIndexVersionsLookups());
boolean primary = randomBoolean();
List<Engine.Index> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index index = randomAppendOnly(doc, false, i);
Engine.Index index = primary ? appendOnlyPrimary(doc, false, i) : appendOnlyReplica(doc, false, i, i);
docs.add(index);
}
Collections.shuffle(docs, random());

View File

@ -122,8 +122,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -338,11 +336,12 @@ public class IndexShardTests extends IndexShardTestCase {
public void testOperationPermitOnReplicaShards() throws InterruptedException, ExecutionException, IOException, BrokenBarrierException {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;
final boolean engineClosed;
switch (randomInt(2)) {
case 0:
// started replica
indexShard = newStartedShard(false);
engineClosed = false;
break;
case 1: {
// initializing replica / primary
@ -353,6 +352,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRoutingState.INITIALIZING,
relocating ? AllocationId.newRelocation(AllocationId.newInitializing()) : AllocationId.newInitializing());
indexShard = newShard(routing);
engineClosed = true;
break;
}
case 2: {
@ -363,6 +363,7 @@ public class IndexShardTests extends IndexShardTestCase {
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
indexShard.updateRoutingEntry(routing);
indexShard.relocated("test");
engineClosed = false;
break;
}
default:
@ -380,6 +381,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
final long primaryTerm = indexShard.getPrimaryTerm();
final long translogGen = engineClosed ? -1 : indexShard.getTranslog().getGeneration().translogFileGeneration;
final Releasable operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
assertEquals(1, indexShard.getActiveOperationsCount());
@ -414,8 +416,9 @@ public class IndexShardTests extends IndexShardTestCase {
{
final AtomicBoolean onResponse = new AtomicBoolean();
final AtomicBoolean onFailure = new AtomicBoolean();
final AtomicReference<Exception> onFailure = new AtomicReference<>();
final CyclicBarrier barrier = new CyclicBarrier(2);
final long newPrimaryTerm = primaryTerm + 1 + randomInt(20);
// but you can not increment with a new primary term until the operations on the older primary term complete
final Thread thread = new Thread(() -> {
try {
@ -424,23 +427,29 @@ public class IndexShardTests extends IndexShardTestCase {
throw new RuntimeException(e);
}
indexShard.acquireReplicaOperationPermit(
primaryTerm + 1 + randomInt(20),
newPrimaryTerm,
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
onResponse.set(true);
releasable.close();
finish();
}
@Override
public void onFailure(Exception e) {
onFailure.set(e);
finish();
}
private void finish() {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void onFailure(Exception e) {
onFailure.set(true);
}
},
ThreadPool.Names.SAME);
});
@ -448,16 +457,25 @@ public class IndexShardTests extends IndexShardTestCase {
barrier.await();
// our operation should be blocked until the previous operations complete
assertFalse(onResponse.get());
assertFalse(onFailure.get());
assertNull(onFailure.get());
assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
Releasables.close(operation1);
// our operation should still be blocked
assertFalse(onResponse.get());
assertFalse(onFailure.get());
assertNull(onFailure.get());
assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
Releasables.close(operation2);
barrier.await();
// now lock acquisition should have succeeded
assertTrue(onResponse.get());
assertFalse(onFailure.get());
assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
if (engineClosed) {
assertFalse(onResponse.get());
assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class));
} else {
assertTrue(onResponse.get());
assertNull(onFailure.get());
assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
}
thread.join();
assertEquals(0, indexShard.getActiveOperationsCount());
}
@ -1046,7 +1064,7 @@ public class IndexShardTests extends IndexShardTestCase {
test = otherShard.prepareIndexOnReplica(
SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(),
XContentType.JSON),
1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
1, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
otherShard.index(test);
final ShardRouting primaryShardRouting = shard.routingEntry();

View File

@ -248,12 +248,12 @@ public class TranslogTests extends ESTestCase {
Translog.Snapshot snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(0));
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1}));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
addToTranslogAndList(translog, ops, new Translog.Delete("test", "2", newUid("2")));
addToTranslogAndList(translog, ops, new Translog.Delete("test", "2", 1, newUid("2")));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
@ -316,7 +316,7 @@ public class TranslogTests extends ESTestCase {
assertThat(stats.estimatedNumberOfOperations(), equalTo(0L));
}
assertThat((int) firstOperationPosition, greaterThan(CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC)));
translog.add(new Translog.Index("test", "1", new byte[]{1}));
translog.add(new Translog.Index("test", "1", 0, new byte[]{1}));
{
final TranslogStats stats = stats();
@ -324,23 +324,21 @@ public class TranslogTests extends ESTestCase {
assertThat(stats.getTranslogSizeInBytes(), equalTo(97L));
}
translog.add(new Translog.Delete("test", "2", newUid("2")));
translog.add(new Translog.Delete("test", "2", 1, newUid("2")));
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(2L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(139L));
}
translog.add(new Translog.Delete("test", "3", newUid("3")));
translog.add(new Translog.Delete("test", "3", 2, newUid("3")));
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(3L));
assertThat(stats.getTranslogSizeInBytes(), equalTo(181L));
}
final long seqNo = 1;
final long primaryTerm = 1;
translog.add(new Translog.NoOp(seqNo, primaryTerm, randomAlphaOfLength(16)));
translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16)));
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(4L));
@ -416,7 +414,7 @@ public class TranslogTests extends ESTestCase {
Translog.Snapshot snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(0));
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1}));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
@ -436,15 +434,15 @@ public class TranslogTests extends ESTestCase {
Translog.Snapshot snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(0));
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1}));
Translog.Snapshot snapshot1 = translog.newSnapshot();
addToTranslogAndList(translog, ops, new Translog.Index("test", "2", new byte[]{2}));
addToTranslogAndList(translog, ops, new Translog.Index("test", "2", 1, new byte[]{2}));
assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0)));
translog.prepareCommit();
addToTranslogAndList(translog, ops, new Translog.Index("test", "3", new byte[]{3}));
addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, new byte[]{3}));
try (Translog.View view = translog.newView()) {
Translog.Snapshot snapshot2 = translog.newSnapshot();
@ -456,7 +454,7 @@ public class TranslogTests extends ESTestCase {
public void testSnapshotOnClosedTranslog() throws IOException {
assertTrue(Files.exists(translogDir.resolve(Translog.getFilename(1))));
translog.add(new Translog.Index("test", "1", new byte[]{1}));
translog.add(new Translog.Index("test", "1", 0, new byte[]{1}));
translog.close();
try {
Translog.Snapshot snapshot = translog.newSnapshot();
@ -501,10 +499,11 @@ public class TranslogTests extends ESTestCase {
Thread[] threads = new Thread[threadCount];
final Exception[] threadExceptions = new Exception[threadCount];
final AtomicLong seqNoGenerator = new AtomicLong();
final CountDownLatch downLatch = new CountDownLatch(1);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions);
threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, seqNoGenerator, threadExceptions);
threads[i].setDaemon(true);
threads[i].start();
}
@ -566,7 +565,7 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations; op++) {
String ascii = randomAlphaOfLengthBetween(1, 50);
locations.add(translog.add(new Translog.Index("test", "" + op, ascii.getBytes("UTF-8"))));
locations.add(translog.add(new Translog.Index("test", "" + op, op, ascii.getBytes("UTF-8"))));
}
translog.sync();
@ -592,7 +591,7 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations; op++) {
String ascii = randomAlphaOfLengthBetween(1, 50);
locations.add(translog.add(new Translog.Index("test", "" + op, ascii.getBytes("UTF-8"))));
locations.add(translog.add(new Translog.Index("test", "" + op, op, ascii.getBytes("UTF-8"))));
}
translog.sync();
@ -655,7 +654,7 @@ public class TranslogTests extends ESTestCase {
public void testVerifyTranslogIsNotDeleted() throws IOException {
assertFileIsPresent(translog, 1);
translog.add(new Translog.Index("test", "1", new byte[]{1}));
translog.add(new Translog.Index("test", "1", 0, new byte[]{1}));
Translog.Snapshot snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(1));
assertFileIsPresent(translog, 1);
@ -702,13 +701,13 @@ public class TranslogTests extends ESTestCase {
switch (type) {
case CREATE:
case INDEX:
op = new Translog.Index("type", "" + id, new byte[]{(byte) id});
op = new Translog.Index("type", "" + id, id, new byte[]{(byte) id});
break;
case DELETE:
op = new Translog.Delete("test", Long.toString(id), newUid(Long.toString(id)));
op = new Translog.Delete("test", Long.toString(id), id, newUid(Long.toString(id)));
break;
case NO_OP:
op = new Translog.NoOp(id, id, Long.toString(id));
op = new Translog.NoOp(id, 1, Long.toString(id));
break;
default:
throw new AssertionError("unsupported operation type [" + type + "]");
@ -853,12 +852,15 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
int count = 0;
for (int op = 0; op < translogOperations; op++) {
final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
int seqNo = ++count;
final Translog.Location location =
translog.add(new Translog.Index("test", "" + op, seqNo, Integer.toString(seqNo).getBytes(Charset.forName("UTF-8"))));
if (randomBoolean()) {
assertTrue("at least one operation pending", translog.syncNeeded());
assertTrue("this operation has not been synced", translog.ensureSynced(location));
assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced
translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
seqNo = ++count;
translog.add(new Translog.Index("test", "" + op, seqNo, Integer.toString(seqNo).getBytes(Charset.forName("UTF-8"))));
assertTrue("one pending operation", translog.syncNeeded());
assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now
assertTrue("we only synced a previous operation yet", translog.syncNeeded());
@ -886,7 +888,8 @@ public class TranslogTests extends ESTestCase {
if (rarely()) {
translog.commit(translog.currentFileGeneration()); // do this first so that there is at least one pending tlog entry
}
final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
final Translog.Location location =
translog.add(new Translog.Index("test", "" + op, op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
locations.add(location);
}
Collections.shuffle(locations, random());
@ -913,7 +916,8 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
int count = 0;
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
locations.add(
translog.add(new Translog.Index("test", "" + op, op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
if (rarely() && translogOperations > op + 1) {
translog.commit(translog.currentFileGeneration());
}
@ -949,7 +953,7 @@ public class TranslogTests extends ESTestCase {
int lastSynced = -1;
long lastSyncedGlobalCheckpoint = globalCheckpoint.get();
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (randomBoolean()) {
globalCheckpoint.set(globalCheckpoint.get() + randomIntBetween(1, 16));
}
@ -960,7 +964,8 @@ public class TranslogTests extends ESTestCase {
}
}
assertEquals(translogOperations, translog.totalOperations());
translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index(
"test", "" + translogOperations, translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME));
try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) {
@ -1077,7 +1082,7 @@ public class TranslogTests extends ESTestCase {
int minUncommittedOp = -1;
final boolean commitOften = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
final boolean commit = commitOften ? frequently() : rarely();
if (commit && op < translogOperations - 1) {
translog.commit(translog.currentFileGeneration());
@ -1116,7 +1121,7 @@ public class TranslogTests extends ESTestCase {
Translog.TranslogGeneration translogGeneration = null;
final boolean sync = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (op == prepareOp) {
translogGeneration = translog.getGeneration();
translog.prepareCommit();
@ -1139,7 +1144,7 @@ public class TranslogTests extends ESTestCase {
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
}
}
if (randomBoolean()) { // recover twice
@ -1152,7 +1157,7 @@ public class TranslogTests extends ESTestCase {
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
}
}
}
@ -1166,7 +1171,7 @@ public class TranslogTests extends ESTestCase {
Translog.TranslogGeneration translogGeneration = null;
final boolean sync = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (op == prepareOp) {
translogGeneration = translog.getGeneration();
translog.prepareCommit();
@ -1193,7 +1198,7 @@ public class TranslogTests extends ESTestCase {
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
}
}
@ -1208,7 +1213,7 @@ public class TranslogTests extends ESTestCase {
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
}
}
}
@ -1221,7 +1226,7 @@ public class TranslogTests extends ESTestCase {
Translog.TranslogGeneration translogGeneration = null;
final boolean sync = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (op == prepareOp) {
translogGeneration = translog.getGeneration();
translog.prepareCommit();
@ -1240,7 +1245,9 @@ public class TranslogTests extends ESTestCase {
try (Translog ignored = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
fail("corrupted");
} catch (IllegalStateException ex) {
assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, numOps=55, generation=2, minSeqNo=0, maxSeqNo=0, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2}", ex.getMessage());
assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, " +
"numOps=55, generation=2, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, " +
"generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2}", ex.getMessage());
}
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
@ -1252,7 +1259,7 @@ public class TranslogTests extends ESTestCase {
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
}
}
}
@ -1262,7 +1269,7 @@ public class TranslogTests extends ESTestCase {
List<Translog.Operation> ops = new ArrayList<>();
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations; op++) {
Translog.Index test = new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")));
Translog.Index test = new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")));
ops.add(test);
}
Translog.writeOperations(out, ops);
@ -1277,8 +1284,8 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
try (Translog translog2 = create(createTempDir())) {
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations2.add(translog2.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations2.add(translog2.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
}
int iters = randomIntBetween(10, 100);
for (int i = 0; i < iters; i++) {
@ -1304,7 +1311,7 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(1, 10);
int firstUncommitted = 0;
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (randomBoolean()) {
translog.commit(translog.currentFileGeneration());
firstUncommitted = op + 1;
@ -1333,13 +1340,13 @@ public class TranslogTests extends ESTestCase {
}
public void testFailOnClosedWrite() throws IOException {
translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "1", 0, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
translog.close();
try {
translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "1", 0, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
fail("closed");
} catch (AlreadyClosedException ex) {
// all is welll
// all is well
}
}
@ -1353,9 +1360,10 @@ public class TranslogTests extends ESTestCase {
Thread[] threads = new Thread[threadCount];
final Exception[] threadExceptions = new Exception[threadCount];
final CountDownLatch downLatch = new CountDownLatch(1);
final AtomicLong seqNoGenerator = new AtomicLong();
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions);
threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, seqNoGenerator, threadExceptions);
threads[i].setDaemon(true);
threads[i].start();
}
@ -1380,13 +1388,16 @@ public class TranslogTests extends ESTestCase {
private final Collection<LocationOperation> writtenOperations;
private final Exception[] threadExceptions;
private final Translog translog;
private final AtomicLong seqNoGenerator;
TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, Collection<LocationOperation> writtenOperations, Exception[] threadExceptions) {
TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId,
Collection<LocationOperation> writtenOperations, AtomicLong seqNoGenerator, Exception[] threadExceptions) {
this.translog = translog;
this.downLatch = downLatch;
this.opsPerThread = opsPerThread;
this.threadId = threadId;
this.writtenOperations = writtenOperations;
this.seqNoGenerator = seqNoGenerator;
this.threadExceptions = threadExceptions;
}
@ -1400,20 +1411,20 @@ public class TranslogTests extends ESTestCase {
switch (type) {
case CREATE:
case INDEX:
op = new Translog.Index("test", threadId + "_" + opCount,
op = new Translog.Index("test", threadId + "_" + opCount, seqNoGenerator.getAndIncrement(),
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
case DELETE:
op = new Translog.Delete(
"test", threadId + "_" + opCount,
new Term("_uid", threadId + "_" + opCount),
opCount,
seqNoGenerator.getAndIncrement(),
0,
1 + randomInt(100000),
randomFrom(VersionType.values()));
break;
case NO_OP:
op = new Translog.NoOp(randomNonNegativeLong(), randomNonNegativeLong(), randomAlphaOfLength(16));
op = new Translog.NoOp(seqNoGenerator.getAndIncrement(), randomNonNegativeLong(), randomAlphaOfLength(16));
break;
default:
throw new AssertionError("unsupported operation type [" + type + "]");
@ -1447,7 +1458,8 @@ public class TranslogTests extends ESTestCase {
boolean failed = false;
while (failed == false) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(
new Translog.Index("test", "" + opsSynced, opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
translog.sync();
opsSynced++;
} catch (MockDirectoryWrapper.FakeIOException ex) {
@ -1467,7 +1479,8 @@ public class TranslogTests extends ESTestCase {
fail.failNever();
if (randomBoolean()) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(
new Translog.Index("test", "" + opsSynced, opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
fail("we are already closed");
} catch (AlreadyClosedException ex) {
assertNotNull(ex.getCause());
@ -1517,9 +1530,10 @@ public class TranslogTests extends ESTestCase {
public void testTranslogOpsCountIsCorrect() throws IOException {
List<Translog.Location> locations = new ArrayList<>();
int numOps = randomIntBetween(100, 200);
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer borders regularly
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
locations.add(translog.add(new Translog.Index("test", "" + opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(
new Translog.Index("test", "" + opsAdded, opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))));
Translog.Snapshot snapshot = this.translog.newSnapshot();
assertEquals(opsAdded + 1, snapshot.totalOperations());
for (int i = 0; i < opsAdded; i++) {
@ -1536,10 +1550,11 @@ public class TranslogTests extends ESTestCase {
TranslogConfig config = getTranslogConfig(tempDir);
Translog translog = getFailableTranslog(fail, config, false, true, null);
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "1", 0, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
fail.failAlways();
try {
Translog.Location location = translog.add(new Translog.Index("test", "2", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
Translog.Location location = translog.add(
new Translog.Index("test", "2", 1, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
if (randomBoolean()) {
translog.ensureSynced(location);
} else {
@ -1568,10 +1583,11 @@ public class TranslogTests extends ESTestCase {
final Exception[] threadExceptions = new Exception[threadCount];
final CountDownLatch downLatch = new CountDownLatch(1);
final CountDownLatch added = new CountDownLatch(randomIntBetween(10, 100));
final AtomicLong seqNoGenerator = new AtomicLong();
List<LocationOperation> writtenOperations = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
threads[i] = new TranslogThread(translog, downLatch, 200, threadId, writtenOperations, threadExceptions) {
threads[i] = new TranslogThread(translog, downLatch, 200, threadId, writtenOperations, seqNoGenerator, threadExceptions) {
@Override
protected Translog.Location add(Translog.Operation op) throws IOException {
Translog.Location add = super.add(op);
@ -1794,7 +1810,7 @@ public class TranslogTests extends ESTestCase {
Path tempDir = createTempDir();
TranslogConfig config = getTranslogConfig(tempDir);
Translog translog = new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
translog.add(new Translog.Index("test", "boom", "boom".getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "boom", 0, "boom".getBytes(Charset.forName("UTF-8"))));
Translog.TranslogGeneration generation = translog.getGeneration();
translog.close();
try {
@ -1812,7 +1828,7 @@ public class TranslogTests extends ESTestCase {
}
public void testRecoverWithUnbackedNextGen() throws IOException {
translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
translog.close();
TranslogConfig config = translog.getConfig();
@ -1830,7 +1846,7 @@ public class TranslogTests extends ESTestCase {
assertNotNull("operation " + i + " must be non-null", next);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString()));
}
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
}
try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
assertNotNull(translogGeneration);
@ -1845,7 +1861,7 @@ public class TranslogTests extends ESTestCase {
}
public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException {
translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
translog.close();
TranslogConfig config = translog.getConfig();
@ -1865,7 +1881,7 @@ public class TranslogTests extends ESTestCase {
}
public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException {
translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
translog.close();
TranslogConfig config = translog.getConfig();
@ -1885,7 +1901,7 @@ public class TranslogTests extends ESTestCase {
assertNotNull("operation " + i + " must be non-null", next);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString()));
}
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
}
try {
@ -1923,7 +1939,7 @@ public class TranslogTests extends ESTestCase {
LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
String doc = lineFileDocs.nextDoc().toString();
failableTLog.add(new Translog.Index("test", "" + opsAdded, doc.getBytes(Charset.forName("UTF-8"))));
failableTLog.add(new Translog.Index("test", "" + opsAdded, opsAdded, doc.getBytes(Charset.forName("UTF-8"))));
unsynced.add(doc);
if (randomBoolean()) {
failableTLog.sync();
@ -2034,16 +2050,16 @@ public class TranslogTests extends ESTestCase {
* Tests that closing views after the translog is fine and we can reopen the translog
*/
public void testPendingDelete() throws IOException {
translog.add(new Translog.Index("test", "1", new byte[]{1}));
translog.add(new Translog.Index("test", "1", 0, new byte[]{1}));
translog.prepareCommit();
Translog.TranslogGeneration generation = translog.getGeneration();
TranslogConfig config = translog.getConfig();
translog.close();
translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
translog.add(new Translog.Index("test", "2", new byte[]{2}));
translog.add(new Translog.Index("test", "2", 1, new byte[]{2}));
translog.prepareCommit();
Translog.View view = translog.newView();
translog.add(new Translog.Index("test", "3", new byte[]{3}));
translog.add(new Translog.Index("test", "3", 2, new byte[]{3}));
translog.close();
IOUtils.close(view);
translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
@ -2197,7 +2213,7 @@ public class TranslogTests extends ESTestCase {
for (final Long seqNo : shuffledSeqNos) {
seqNos.add(Tuple.tuple(seqNo, terms.computeIfAbsent(seqNo, k -> 0L)));
Long repeatingTermSeqNo = randomFrom(seqNos.stream().map(Tuple::v1).collect(Collectors.toList()));
seqNos.add(Tuple.tuple(repeatingTermSeqNo, terms.computeIfPresent(repeatingTermSeqNo, (s, t) -> t + 1)));
seqNos.add(Tuple.tuple(repeatingTermSeqNo, terms.get(repeatingTermSeqNo)));
}
for (final Tuple<Long, Long> tuple : seqNos) {

View File

@ -61,7 +61,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
for (int i = 0; i < docs; i++) {
Engine.Index indexOp = replica.prepareIndexOnReplica(
SourceToParse.source(index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON),
seqNo++, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
seqNo++, replica.getPrimaryTerm(), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
replica.index(indexOp);
if (rarely()) {
// insert a gap

View File

@ -489,7 +489,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
index = shard.prepareIndexOnReplica(
SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source),
xContentType),
randomInt(1 << 10), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
shard.seqNoStats().getMaxSeqNo() + 1, shard.getPrimaryTerm(), 0,
VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
}
shard.index(index);
return index;