Replace 0L with an UNASSIGNED_PRIMARY_TERM constant (#36819)

* Replace 0L with an UNASSIGNED_PRIMARY_TERM constant

0 is an illegal value for a primary term that is often used to indicate
the primary term isn't assigned or is irrelevant. This PR replaces the
usage of 0 with a constant, to improve readability and so it can be
tracked and if needed, replaced.

* feedback
This commit is contained in:
Boaz Leskes 2018-12-19 13:15:05 +01:00 committed by GitHub
parent 978713a67c
commit 216b154107
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 94 additions and 70 deletions

View File

@ -43,6 +43,8 @@ import java.net.URLEncoder;
import java.util.Locale;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
/**
* A base class for the response of a write operation that involves a single doc
@ -266,8 +268,8 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
primaryTerm = 0;
seqNo = UNASSIGNED_SEQ_NO;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
forcedRefresh = in.readBoolean();
result = Result.readFrom(in);
@ -378,8 +380,8 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
protected Result result = null;
protected boolean forcedRefresh;
protected ShardInfo shardInfo = null;
protected Long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
protected Long primaryTerm = 0L;
protected Long seqNo = UNASSIGNED_SEQ_NO;
protected Long primaryTerm = UNASSIGNED_PRIMARY_TERM;
public ShardId getShardId() {
return shardId;

View File

@ -56,6 +56,7 @@ import java.util.Objects;
import java.util.Set;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
/**
* A bulk request holds an ordered {@link IndexRequest}s, {@link DeleteRequest}s and {@link UpdateRequest}s
@ -351,7 +352,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTerm = 0;
long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
int retryOnConflict = 0;
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);

View File

@ -31,12 +31,13 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
/**
* A request to delete a document from an index based on its type and id. Best created using
@ -58,8 +59,8 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
private String routing;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = 0;
private long ifSeqNo = UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
public DeleteRequest() {
}
@ -116,16 +117,16 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
validationException = addValidationError("version type [force] may no longer be used", validationException);
}
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO && (
if (ifSeqNo != UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}
if (ifPrimaryTerm == 0 && ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (ifPrimaryTerm != 0 && ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) {
validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
}
@ -239,7 +240,7 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
ifSeqNo = seqNo;
@ -286,8 +287,8 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTerm = 0;
ifSeqNo = UNASSIGNED_SEQ_NO;
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
}
}
@ -305,7 +306,7 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
} else if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTerm != 0) {
} else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) {
assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +

View File

@ -43,7 +43,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
@ -54,6 +53,8 @@ import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
/**
* Index request to index a typed JSON document into a specific index and make it searchable. Best
@ -106,8 +107,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;
private boolean isRetry = false;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = 0;
private long ifSeqNo = UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
public IndexRequest() {
@ -174,7 +175,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
return validationException;
}
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTerm != 0) {
if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) {
validationException = addValidationError("create operations do not support compare and set. use index instead",
validationException);
return validationException;
@ -207,15 +208,15 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
validationException = addValidationError("pipeline cannot be an empty string", validationException);
}
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO && (
if (ifSeqNo != UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}
if (ifPrimaryTerm == 0 && ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (ifPrimaryTerm != 0 && ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) {
validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
}
@ -511,7 +512,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public IndexRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
ifSeqNo = seqNo;
@ -573,8 +574,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
// generate id if not already provided
if (id == null) {
assert autoGeneratedTimestamp == -1 : "timestamp has already been generated!";
assert ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO;
assert ifPrimaryTerm == 0;
assert ifSeqNo == UNASSIGNED_SEQ_NO;
assert ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM;
autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia
String uid;
if (indexCreatedVersion.onOrAfter(Version.V_6_0_0_beta1)) {
@ -620,8 +621,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTerm = 0;
ifSeqNo = UNASSIGNED_SEQ_NO;
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
}
}
@ -657,7 +658,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
} else if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTerm != 0) {
} else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) {
assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +

View File

@ -87,6 +87,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
/**
* Base class for requests that should be executed on a primary copy followed by replica copies.
* Subclasses can resolve the target shard and provide implementation for primary and replica operations.
@ -1248,7 +1250,7 @@ public abstract class TransportReplicationAction<
request = requestSupplier.get();
// null now, but will be populated by reading from the streams
targetAllocationID = null;
primaryTerm = 0L;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
@ -48,7 +49,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
@ -74,6 +74,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
public class ShardStateAction {
private static final Logger logger = LogManager.getLogger(ShardStateAction.class);
@ -603,7 +605,7 @@ public class ShardStateAction {
allocationId = in.readString();
if (in.getVersion().before(Version.V_6_3_0)) {
final long primaryTerm = in.readVLong();
assert primaryTerm == 0L : "shard is only started by itself: primary term [" + primaryTerm + "]";
assert primaryTerm == UNASSIGNED_PRIMARY_TERM : "shard is only started by itself: primary term [" + primaryTerm + "]";
}
this.message = in.readString();
if (in.getVersion().before(Version.V_6_3_0)) {

View File

@ -55,6 +55,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
@ -1079,6 +1080,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContentFragmen
throw new IllegalStateException("you must set the number of shards before setting/reading primary terms");
}
primaryTerms = new long[numberOfShards()];
Arrays.fill(primaryTerms, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

View File

@ -33,10 +33,12 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqN
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.IOException;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
/** Utility class to do efficient primary-key (only 1 doc contains the
* given term) lookups by segment, re-using the enums. This class is
@ -116,18 +118,18 @@ final class PerThreadIDVersionAndSeqNoLookup {
if (seqNos != null && seqNos.advanceExact(docID)) {
seqNo = seqNos.longValue();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
seqNo = UNASSIGNED_SEQ_NO;
}
NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
if (terms != null && terms.advanceExact(docID)) {
term = terms.longValue();
} else {
term = 0;
term = UNASSIGNED_PRIMARY_TERM;
}
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
term = 0;
seqNo = UNASSIGNED_SEQ_NO;
term = UNASSIGNED_PRIMARY_TERM;
}
return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase);
} else {
@ -175,7 +177,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
seqNo = seqNoDV.longValue();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
seqNo = UNASSIGNED_SEQ_NO;
}
final boolean isLive = (liveDocs == null || liveDocs.get(docID));
if (isLive) {

View File

@ -72,7 +72,6 @@ import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
@ -106,6 +105,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";
@ -147,7 +149,7 @@ public abstract class Engine implements Closeable {
* 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete.
* 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes).
*/
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(UNASSIGNED_SEQ_NO);
protected Engine(EngineConfig engineConfig) {
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
@ -425,8 +427,8 @@ public abstract class Engine implements Closeable {
protected Result(Operation.TYPE operationType, Mapping requiredMappingUpdate) {
this.operationType = operationType;
this.version = Versions.NOT_FOUND;
this.seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
this.term = 0L;
this.seqNo = UNASSIGNED_SEQ_NO;
this.term = UNASSIGNED_PRIMARY_TERM;
this.failure = null;
this.requiredMappingUpdate = requiredMappingUpdate;
this.resultType = Type.MAPPING_UPDATE_REQUIRED;
@ -522,7 +524,7 @@ public abstract class Engine implements Closeable {
* use in case of the index operation failed before getting to internal engine
**/
public IndexResult(Exception failure, long version, long term) {
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO);
this(failure, version, term, UNASSIGNED_SEQ_NO);
}
public IndexResult(Exception failure, long version, long term, long seqNo) {
@ -554,7 +556,7 @@ public abstract class Engine implements Closeable {
* use in case of the delete operation failed before getting to internal engine
**/
public DeleteResult(Exception failure, long version, long term) {
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
this(failure, version, term, UNASSIGNED_SEQ_NO, false);
}
public DeleteResult(Exception failure, long version, long term, long seqNo, boolean found) {
@ -1353,9 +1355,9 @@ public abstract class Engine implements Closeable {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
assert ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNo >=0 :
assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >=0 :
"ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTerm == 0) :
assert (origin == Origin.PRIMARY) || (ifSeqNo == UNASSIGNED_SEQ_NO && ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM) :
"cas operations are only allowed if origin is primary. get [" + origin + "]";
this.doc = doc;
this.isRetry = isRetry;
@ -1369,8 +1371,8 @@ public abstract class Engine implements Closeable {
} // TEST ONLY
Index(Term uid, long primaryTerm, ParsedDocument doc, long version) {
this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
this(uid, doc, UNASSIGNED_SEQ_NO, primaryTerm, version, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
} // TEST ONLY
public ParsedDocument parsedDoc() {
@ -1447,9 +1449,9 @@ public abstract class Engine implements Closeable {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
assert ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNo >=0 :
assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >=0 :
"ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTerm == 0) :
assert (origin == Origin.PRIMARY) || (ifSeqNo == UNASSIGNED_SEQ_NO && ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM) :
"cas operations are only allowed if origin is primary. get [" + origin + "]";
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
@ -1458,13 +1460,13 @@ public abstract class Engine implements Closeable {
}
public Delete(String type, String id, Term uid, long primaryTerm) {
this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
this(type, id, uid, UNASSIGNED_SEQ_NO, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), UNASSIGNED_SEQ_NO, 0);
}
public Delete(Delete template, VersionType versionType) {
this(template.type(), template.id(), template.uid(), template.seqNo(), template.primaryTerm(), template.version(),
versionType, template.origin(), template.startTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
versionType, template.origin(), template.startTime(), UNASSIGNED_SEQ_NO, 0);
}
@Override

View File

@ -34,7 +34,6 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.IgnoredFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
@ -48,6 +47,8 @@ import java.util.Objects;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
public class GetResult implements Streamable, Iterable<DocumentField>, ToXContentObject {
@ -82,9 +83,9 @@ public class GetResult implements Streamable, Iterable<DocumentField>, ToXConten
this.id = id;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
assert (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO && primaryTerm == 0) || (seqNo >= 0 && primaryTerm >= 1) :
assert (seqNo == UNASSIGNED_SEQ_NO && primaryTerm == UNASSIGNED_PRIMARY_TERM) || (seqNo >= 0 && primaryTerm >= 1) :
"seqNo: " + seqNo + " primaryTerm: " + primaryTerm;
assert exists || (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO && primaryTerm == 0) :
assert exists || (seqNo == UNASSIGNED_SEQ_NO && primaryTerm == UNASSIGNED_PRIMARY_TERM) :
"doc not found but seqNo/primaryTerm are set";
this.version = version;
this.exists = exists;
@ -239,7 +240,7 @@ public class GetResult implements Streamable, Iterable<DocumentField>, ToXConten
}
public XContentBuilder toXContentEmbedded(XContentBuilder builder, Params params) throws IOException {
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { // seqNo may not be assigned if read from an old node
if (seqNo != UNASSIGNED_SEQ_NO) { // seqNo may not be assigned if read from an old node
builder.field(_SEQ_NO, seqNo);
builder.field(_PRIMARY_TERM, primaryTerm);
}
@ -313,8 +314,8 @@ public class GetResult implements Streamable, Iterable<DocumentField>, ToXConten
String currentFieldName = parser.currentName();
long version = -1;
long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
long primaryTerm = 0;
long seqNo = UNASSIGNED_SEQ_NO;
long primaryTerm = UNASSIGNED_PRIMARY_TERM;
Boolean found = null;
BytesReference source = null;
Map<String, DocumentField> fields = new HashMap<>();
@ -388,8 +389,8 @@ public class GetResult implements Streamable, Iterable<DocumentField>, ToXConten
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
primaryTerm = 0L;
seqNo = UNASSIGNED_SEQ_NO;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
version = in.readLong();
exists = in.readBoolean();

View File

@ -41,6 +41,11 @@ public class SequenceNumbers {
*/
public static final long NO_OPS_PERFORMED = -1L;
/**
* Represents an unassigned primary term (e.g., when a primary shard was not yet allocated)
*/
public static final long UNASSIGNED_PRIMARY_TERM = 0L;
/**
* Reads the sequence number stats from the commit data (maximum sequence number and local checkpoint).
*

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.translog;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -113,7 +114,7 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws IOException {
final Translog.Operation op = Translog.readOperation(inStream);
if (op.primaryTerm() > getPrimaryTerm() && getPrimaryTerm() != TranslogHeader.UNKNOWN_PRIMARY_TERM) {
if (op.primaryTerm() > getPrimaryTerm() && getPrimaryTerm() != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
throw new TranslogCorruptedException(
path.toString(),
"operation's term is newer than translog header term; " +

View File

@ -34,6 +34,8 @@ import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
/**
* Each translog file is started with a translog header then followed by translog operations.
*/
@ -45,8 +47,6 @@ final class TranslogHeader {
public static final int VERSION_PRIMARY_TERM = 3; // added primary term
public static final int CURRENT_VERSION = VERSION_PRIMARY_TERM;
public static final long UNKNOWN_PRIMARY_TERM = 0L;
private final String translogUUID;
private final long primaryTerm;
private final int headerSizeInBytes;
@ -146,7 +146,7 @@ final class TranslogHeader {
primaryTerm = in.readLong();
} else {
assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]";
primaryTerm = UNKNOWN_PRIMARY_TERM;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
// Verify the checksum
if (version >= VERSION_PRIMARY_TERM) {

View File

@ -207,7 +207,7 @@ public class TruncateTranslogAction {
*/
private static int writeEmptyTranslog(Path filename, String translogUUID) throws IOException {
try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)) {
TranslogHeader header = new TranslogHeader(translogUUID, TranslogHeader.UNKNOWN_PRIMARY_TERM);
TranslogHeader header = new TranslogHeader(translogUUID, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
header.write(fc);
return header.sizeInBytes();
}

View File

@ -172,6 +172,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANS
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.CoreMatchers.instanceOf;
@ -1801,7 +1802,7 @@ public class InternalEngineTests extends EngineTestCase {
int opsPerformed = 0;
long lastOpVersion = currentOpVersion;
long lastOpSeqNo = UNASSIGNED_SEQ_NO;
long lastOpTerm = 0;
long lastOpTerm = UNASSIGNED_PRIMARY_TERM;
final AtomicLong currentTerm = new AtomicLong(1);
BiFunction<Long, Engine.Index, Engine.Index> indexWithVersion = (version, index) -> new Engine.Index(index.uid(), index.parsedDoc(),
UNASSIGNED_SEQ_NO, currentTerm.get(), version, index.versionType(), index.origin(), index.startTime(),
@ -1900,7 +1901,7 @@ public class InternalEngineTests extends EngineTestCase {
docDeleted = true;
lastOpVersion = result.getVersion();
lastOpSeqNo = UNASSIGNED_SEQ_NO;
lastOpTerm = 0;
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
opsPerformed++;
}
}
@ -4309,7 +4310,7 @@ public class InternalEngineTests extends EngineTestCase {
final long seqNo;
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), get.uid());
if (docIdAndSeqNo == null) {
primaryTerm = 0;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
seqNo = UNASSIGNED_SEQ_NO;
} else {
seqNo = docIdAndSeqNo.seqNo;

View File

@ -44,6 +44,7 @@ import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.index.get.DocumentFieldTests.randomDocumentField;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
@ -209,7 +210,7 @@ public class GetResultTests extends ESTestCase {
}
} else {
seqNo = UNASSIGNED_SEQ_NO;
primaryTerm = 0;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
version = -1;
exists = false;
}

View File

@ -83,7 +83,7 @@ public class TranslogHeaderTests extends ESTestCase {
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
final TranslogHeader inHeader = TranslogHeader.read(translogUUID, translogFile, channel);
assertThat(inHeader.getTranslogUUID(), equalTo(translogUUID));
assertThat(inHeader.getPrimaryTerm(), equalTo(TranslogHeader.UNKNOWN_PRIMARY_TERM));
assertThat(inHeader.getPrimaryTerm(), equalTo(SequenceNumbers.UNASSIGNED_PRIMARY_TERM));
assertThat(inHeader.sizeInBytes(), equalTo((int)channel.position()));
}
expectThrows(TranslogCorruptedException.class, () -> {