Refactor InternalEngine inner methods
This commit refactors InternalEngine#innerIndex and InternalEngine#innerDelete to collapse some common logic into a single method. This has the advantage that it shrinks the bytecode size of InternalEngine#innerIndex so that it can be inlined.
This commit is contained in:
parent
abae58b5fb
commit
4f49a261a7
|
@ -840,6 +840,10 @@ public abstract class Engine implements Closeable {
|
||||||
public long endTime() {
|
public long endTime() {
|
||||||
return this.endTime;
|
return this.endTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
abstract String type();
|
||||||
|
|
||||||
|
abstract String id();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Index extends Operation {
|
public static class Index extends Operation {
|
||||||
|
@ -863,10 +867,12 @@ public abstract class Engine implements Closeable {
|
||||||
return this.doc;
|
return this.doc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String type() {
|
public String type() {
|
||||||
return this.doc.type();
|
return this.doc.type();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String id() {
|
public String id() {
|
||||||
return this.doc.id();
|
return this.doc.id();
|
||||||
}
|
}
|
||||||
|
@ -929,10 +935,12 @@ public abstract class Engine implements Closeable {
|
||||||
this(template.type(), template.id(), template.uid(), template.version(), versionType, template.origin(), template.startTime(), template.found());
|
this(template.type(), template.id(), template.uid(), template.version(), versionType, template.origin(), template.startTime(), template.found());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String type() {
|
public String type() {
|
||||||
return this.type;
|
return this.type;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String id() {
|
public String id() {
|
||||||
return this.id;
|
return this.id;
|
||||||
}
|
}
|
||||||
|
|
|
@ -314,7 +314,7 @@ public class InternalEngine extends Engine {
|
||||||
try (ReleasableLock lock = readLock.acquire()) {
|
try (ReleasableLock lock = readLock.acquire()) {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
if (get.realtime()) {
|
if (get.realtime()) {
|
||||||
VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes());
|
VersionValue versionValue = versionMap.getUnderLock(get.uid());
|
||||||
if (versionValue != null) {
|
if (versionValue != null) {
|
||||||
if (versionValue.delete()) {
|
if (versionValue.delete()) {
|
||||||
return GetResult.NOT_EXISTS;
|
return GetResult.NOT_EXISTS;
|
||||||
|
@ -336,6 +336,59 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean checkVersionConflict(
|
||||||
|
final Operation op,
|
||||||
|
final long currentVersion,
|
||||||
|
final long expectedVersion,
|
||||||
|
final boolean deleted) {
|
||||||
|
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
|
||||||
|
if (op.origin().isRecovery()) {
|
||||||
|
// version conflict, but okay
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
// fatal version conflict
|
||||||
|
throw new VersionConflictEngineException(shardId, op.type(), op.id(),
|
||||||
|
op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long checkDeletedAndGCed(VersionValue versionValue) {
|
||||||
|
long currentVersion;
|
||||||
|
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
|
||||||
|
currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
||||||
|
} else {
|
||||||
|
currentVersion = versionValue.version();
|
||||||
|
}
|
||||||
|
return currentVersion;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static VersionValueSupplier NEW_VERSION_VALUE = (u, t, l) -> new VersionValue(u, l);
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
private interface VersionValueSupplier {
|
||||||
|
VersionValue apply(long updatedVersion, long time, Translog.Location location);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T extends Engine.Operation> void maybeAddToTranslog(
|
||||||
|
final T op,
|
||||||
|
final long updatedVersion,
|
||||||
|
final Function<T, Translog.Operation> toTranslogOp,
|
||||||
|
final VersionValueSupplier toVersionValue) throws IOException {
|
||||||
|
if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||||
|
final Translog.Location translogLocation = translog.add(toTranslogOp.apply(op));
|
||||||
|
op.setTranslogLocation(translogLocation);
|
||||||
|
versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), op.getTranslogLocation()));
|
||||||
|
} else {
|
||||||
|
// we do not replay in to the translog, so there is no
|
||||||
|
// translog location; that is okay because real-time
|
||||||
|
// gets are not possible during recovery and we will
|
||||||
|
// flush when the recovery is complete
|
||||||
|
versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), null));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean index(Index index) {
|
public boolean index(Index index) {
|
||||||
final boolean created;
|
final boolean created;
|
||||||
|
@ -361,58 +414,56 @@ public class InternalEngine extends Engine {
|
||||||
lastWriteNanos = index.startTime();
|
lastWriteNanos = index.startTime();
|
||||||
final long currentVersion;
|
final long currentVersion;
|
||||||
final boolean deleted;
|
final boolean deleted;
|
||||||
VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
|
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
|
||||||
if (versionValue == null) {
|
if (versionValue == null) {
|
||||||
currentVersion = loadCurrentVersionFromIndex(index.uid());
|
currentVersion = loadCurrentVersionFromIndex(index.uid());
|
||||||
deleted = currentVersion == Versions.NOT_FOUND;
|
deleted = currentVersion == Versions.NOT_FOUND;
|
||||||
} else {
|
} else {
|
||||||
|
currentVersion = checkDeletedAndGCed(versionValue);
|
||||||
deleted = versionValue.delete();
|
deleted = versionValue.delete();
|
||||||
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
|
|
||||||
currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
|
||||||
} else {
|
|
||||||
currentVersion = versionValue.version();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long expectedVersion = index.version();
|
final long expectedVersion = index.version();
|
||||||
if (isVersionConflictForWrites(index, currentVersion, deleted, expectedVersion)) {
|
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) return false;
|
||||||
if (!index.origin().isRecovery()) {
|
|
||||||
throw new VersionConflictEngineException(shardId, index.type(), index.id(),
|
|
||||||
index.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
|
|
||||||
|
|
||||||
final boolean created;
|
final long updatedVersion = updateVersion(index, currentVersion, expectedVersion);
|
||||||
index.updateVersion(updatedVersion);
|
|
||||||
|
|
||||||
if (currentVersion == Versions.NOT_FOUND) {
|
final boolean created = indexOrUpdate(index, currentVersion, versionValue);
|
||||||
// document does not exists, we can optimize for create
|
|
||||||
created = true;
|
|
||||||
index(index, indexWriter);
|
|
||||||
} else {
|
|
||||||
created = update(index, versionValue, indexWriter);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE);
|
||||||
final Translog.Location translogLocation = translog.add(new Translog.Index(index));
|
|
||||||
index.setTranslogLocation(translogLocation);
|
|
||||||
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, index.getTranslogLocation()));
|
|
||||||
} else {
|
|
||||||
// we do not replay in to the translog, so there is no
|
|
||||||
// translog location; that is okay because real-time
|
|
||||||
// gets are not possible during recovery and we will
|
|
||||||
// flush when the recovery is complete
|
|
||||||
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, null));
|
|
||||||
}
|
|
||||||
|
|
||||||
return created;
|
return created;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean update(Index index, VersionValue versionValue, IndexWriter indexWriter) throws IOException {
|
private long updateVersion(Engine.Operation op, long currentVersion, long expectedVersion) {
|
||||||
boolean created;
|
final long updatedVersion = op.versionType().updateVersion(currentVersion, expectedVersion);
|
||||||
|
op.updateVersion(updatedVersion);
|
||||||
|
return updatedVersion;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean indexOrUpdate(final Index index, final long currentVersion, final VersionValue versionValue) throws IOException {
|
||||||
|
final boolean created;
|
||||||
|
if (currentVersion == Versions.NOT_FOUND) {
|
||||||
|
// document does not exists, we can optimize for create
|
||||||
|
created = true;
|
||||||
|
index(index, indexWriter);
|
||||||
|
} else {
|
||||||
|
created = update(index, versionValue, indexWriter);
|
||||||
|
}
|
||||||
|
return created;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void index(final Index index, final IndexWriter indexWriter) throws IOException {
|
||||||
|
if (index.docs().size() > 1) {
|
||||||
|
indexWriter.addDocuments(index.docs());
|
||||||
|
} else {
|
||||||
|
indexWriter.addDocument(index.docs().get(0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean update(final Index index, final VersionValue versionValue, final IndexWriter indexWriter) throws IOException {
|
||||||
|
final boolean created;
|
||||||
if (versionValue != null) {
|
if (versionValue != null) {
|
||||||
created = versionValue.delete(); // we have a delete which is not GC'ed...
|
created = versionValue.delete(); // we have a delete which is not GC'ed...
|
||||||
} else {
|
} else {
|
||||||
|
@ -426,18 +477,6 @@ public class InternalEngine extends Engine {
|
||||||
return created;
|
return created;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void index(Index index, IndexWriter indexWriter) throws IOException {
|
|
||||||
if (index.docs().size() > 1) {
|
|
||||||
indexWriter.addDocuments(index.docs());
|
|
||||||
} else {
|
|
||||||
indexWriter.addDocument(index.docs().get(0));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isVersionConflictForWrites(Index index, long currentVersion, boolean deleted, long expectedVersion) {
|
|
||||||
return index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void delete(Delete delete) throws EngineException {
|
public void delete(Delete delete) throws EngineException {
|
||||||
try (ReleasableLock lock = readLock.acquire()) {
|
try (ReleasableLock lock = readLock.acquire()) {
|
||||||
|
@ -465,59 +504,44 @@ public class InternalEngine extends Engine {
|
||||||
lastWriteNanos = delete.startTime();
|
lastWriteNanos = delete.startTime();
|
||||||
final long currentVersion;
|
final long currentVersion;
|
||||||
final boolean deleted;
|
final boolean deleted;
|
||||||
VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes());
|
final VersionValue versionValue = versionMap.getUnderLock(delete.uid());
|
||||||
if (versionValue == null) {
|
if (versionValue == null) {
|
||||||
currentVersion = loadCurrentVersionFromIndex(delete.uid());
|
currentVersion = loadCurrentVersionFromIndex(delete.uid());
|
||||||
deleted = currentVersion == Versions.NOT_FOUND;
|
deleted = currentVersion == Versions.NOT_FOUND;
|
||||||
} else {
|
} else {
|
||||||
|
currentVersion = checkDeletedAndGCed(versionValue);
|
||||||
deleted = versionValue.delete();
|
deleted = versionValue.delete();
|
||||||
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
|
|
||||||
currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
|
||||||
} else {
|
|
||||||
currentVersion = versionValue.version();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long updatedVersion;
|
final long expectedVersion = delete.version();
|
||||||
long expectedVersion = delete.version();
|
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) return;
|
||||||
if (delete.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
|
|
||||||
if (delete.origin().isRecovery()) {
|
final long updatedVersion = updateVersion(delete, currentVersion, expectedVersion);
|
||||||
return;
|
|
||||||
} else {
|
final boolean found = deleteIfFound(delete, currentVersion, deleted, versionValue);
|
||||||
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(),
|
|
||||||
delete.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
|
|
||||||
final boolean found;
|
|
||||||
if (currentVersion == Versions.NOT_FOUND) {
|
|
||||||
// doc does not exist and no prior deletes
|
|
||||||
found = false;
|
|
||||||
} else if (versionValue != null && versionValue.delete()) {
|
|
||||||
// a "delete on delete", in this case, we still increment the version, log it, and return that version
|
|
||||||
found = false;
|
|
||||||
} else {
|
|
||||||
// we deleted a currently existing document
|
|
||||||
indexWriter.deleteDocuments(delete.uid());
|
|
||||||
found = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
delete.updateVersion(updatedVersion, found);
|
delete.updateVersion(updatedVersion, found);
|
||||||
|
|
||||||
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
maybeAddToTranslog(delete, updatedVersion, Translog.Delete::new, DeleteVersionValue::new);
|
||||||
final Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
|
|
||||||
delete.setTranslogLocation(translogLocation);
|
|
||||||
versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), delete.getTranslogLocation()));
|
|
||||||
} else {
|
|
||||||
// we do not replay in to the translog, so there is no
|
|
||||||
// translog location; that is okay because real-time
|
|
||||||
// gets are not possible during recovery and we will
|
|
||||||
// flush when the recovery is complete
|
|
||||||
versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), null));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean deleteIfFound(Delete delete, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException {
|
||||||
|
final boolean found;
|
||||||
|
if (currentVersion == Versions.NOT_FOUND) {
|
||||||
|
// doc does not exist and no prior deletes
|
||||||
|
found = false;
|
||||||
|
} else if (versionValue != null && deleted) {
|
||||||
|
// a "delete on delete", in this case, we still increment the version, log it, and return that version
|
||||||
|
found = false;
|
||||||
|
} else {
|
||||||
|
// we deleted a currently existing document
|
||||||
|
indexWriter.deleteDocuments(delete.uid());
|
||||||
|
found = true;
|
||||||
|
}
|
||||||
|
return found;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void refresh(String source) throws EngineException {
|
public void refresh(String source) throws EngineException {
|
||||||
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
|
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.engine;
|
package org.elasticsearch.index.engine;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.search.ReferenceManager;
|
import org.apache.lucene.search.ReferenceManager;
|
||||||
import org.apache.lucene.util.Accountable;
|
import org.apache.lucene.util.Accountable;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
|
@ -126,21 +127,21 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns the live version (add or delete) for this uid. */
|
/** Returns the live version (add or delete) for this uid. */
|
||||||
VersionValue getUnderLock(BytesRef uid) {
|
VersionValue getUnderLock(final Term uid) {
|
||||||
Maps currentMaps = maps;
|
Maps currentMaps = maps;
|
||||||
|
|
||||||
// First try to get the "live" value:
|
// First try to get the "live" value:
|
||||||
VersionValue value = currentMaps.current.get(uid);
|
VersionValue value = currentMaps.current.get(uid.bytes());
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
value = currentMaps.old.get(uid);
|
value = currentMaps.old.get(uid.bytes());
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
return tombstones.get(uid);
|
return tombstones.get(uid.bytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Adds this uid/version to the pending adds map. */
|
/** Adds this uid/version to the pending adds map. */
|
||||||
|
|
Loading…
Reference in New Issue