Core: simultaneous create/delete against same id can cause silently inconsistent replica

If simultaneous create & delete operations arrive against the same id,
it's possible that primary and replica see those operations in
different orders, which may result in replica throwing
DocumentAlreadyExistsException when the primary didn't which would
lead to replica being inconsistent (missing a document that primary
had indexed).

This push fixes the issue, by never throwing DAEE from the replica on
create.

Closes #7146 #7142
This commit is contained in:
mikemccand 2014-08-04 09:14:09 -04:00
parent 8989d062cd
commit a58d9a1dd0
2 changed files with 21 additions and 20 deletions

View File

@ -46,7 +46,6 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
@ -168,14 +167,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} }
Throwable cause = ExceptionsHelper.unwrapCause(e); Throwable cause = ExceptionsHelper.unwrapCause(e);
// on version conflict or document missing, it means // on version conflict or document missing, it means
// that a news change has crept into the replica, and its fine // that a new change has crept into the replica, and it's fine
if (cause instanceof VersionConflictEngineException) { if (cause instanceof VersionConflictEngineException) {
return true; return true;
} }
// same here
if (cause instanceof DocumentAlreadyExistsException) {
return true;
}
return false; return false;
} }

View File

@ -445,31 +445,37 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
} }
updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion); updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion);
// if the doc does not exist or it exists but is not deleted // if the doc exists
if (versionValue != null) { boolean doUpdate = false;
if (!versionValue.delete()) { if ((versionValue != null && versionValue.delete() == false) || (versionValue == null && currentVersion != Versions.NOT_FOUND)) {
if (create.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new DocumentAlreadyExistsException(shardId, create.type(), create.id());
}
}
} else if (currentVersion != Versions.NOT_FOUND) {
// its not deleted, its already there
if (create.origin() == Operation.Origin.RECOVERY) { if (create.origin() == Operation.Origin.RECOVERY) {
return; return;
} else if (create.origin() == Operation.Origin.REPLICA) {
// #7142: the primary already determined it's OK to index this document, and we confirmed above that the version doesn't
// conflict, so we must also update here on the replica to remain consistent:
doUpdate = true;
} else { } else {
// On primary, we throw DAEE if the _uid is already in the index with an older version:
assert create.origin() == Operation.Origin.PRIMARY;
throw new DocumentAlreadyExistsException(shardId, create.type(), create.id()); throw new DocumentAlreadyExistsException(shardId, create.type(), create.id());
} }
} }
create.updateVersion(updatedVersion); create.updateVersion(updatedVersion);
if (doUpdate) {
if (create.docs().size() > 1) {
writer.updateDocuments(create.uid(), create.docs(), create.analyzer());
} else {
writer.updateDocument(create.uid(), create.docs().get(0), create.analyzer());
}
} else {
if (create.docs().size() > 1) { if (create.docs().size() > 1) {
writer.addDocuments(create.docs(), create.analyzer()); writer.addDocuments(create.docs(), create.analyzer());
} else { } else {
writer.addDocument(create.docs().get(0), create.analyzer()); writer.addDocument(create.docs().get(0), create.analyzer());
} }
}
Translog.Location translogLocation = translog.add(new Translog.Create(create)); Translog.Location translogLocation = translog.add(new Translog.Create(create));
versionMap.putUnderLock(create.uid().bytes(), new VersionValue(updatedVersion, translogLocation)); versionMap.putUnderLock(create.uid().bytes(), new VersionValue(updatedVersion, translogLocation));