add setters for translog location and took in engine operation result

This commit is contained in:
Areek Zillur 2016-10-25 09:58:14 -04:00
parent bb785483ae
commit 64a897e5f2
6 changed files with 75 additions and 63 deletions

View File

@ -243,7 +243,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
} catch (Exception failure) { } catch (Exception failure) {
// we may fail translating a update to index or delete operation // we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request // we use index result to communicate failure while translating update request
updateOperationResult = new Engine.IndexResult(failure, updateRequest.version(), 0, 0); updateOperationResult = new Engine.IndexResult(failure, updateRequest.version(), 0);
break; // out of retry loop break; // out of retry loop
} }
// execute translated update request // execute translated update request

View File

@ -167,7 +167,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
try { try {
operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
} catch (MapperParsingException | IllegalArgumentException e) { } catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), 0, 0); return new Engine.IndexResult(e, request.version(), 0);
} }
Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) { if (update != null) {
@ -189,7 +189,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
try { try {
operation = prepareIndexOperationOnPrimary(request, primary); operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) { } catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), 0, 0); return new Engine.IndexResult(e, request.version(), 0);
} }
Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = primary.shardId(); final ShardId shardId = primary.shardId();
@ -201,13 +201,13 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// throws IAE on conflicts merging dynamic mappings // throws IAE on conflicts merging dynamic mappings
return new Engine.IndexResult(e, request.version(), return new Engine.IndexResult(e, request.version(),
operation.startTime() - System.nanoTime(), 0); 0);
} }
try { try {
operation = prepareIndexOperationOnPrimary(request, primary); operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) { } catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), return new Engine.IndexResult(e, request.version(),
operation.startTime() - System.nanoTime(), 0); 0);
} }
update = operation.parsedDoc().dynamicMappingsUpdate(); update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) { if (update != null) {

View File

@ -284,30 +284,23 @@ public abstract class Engine implements Closeable {
public abstract static class Result { public abstract static class Result {
private final Operation.TYPE operationType; private final Operation.TYPE operationType;
private final Translog.Location location;
private final long version; private final long version;
private final Exception failure; private final Exception failure;
private final long took;
private final int estimatedSizeInBytes; private final int estimatedSizeInBytes;
private Translog.Location location;
private long took;
private boolean freeze;
private Result(Operation.TYPE operationType, Translog.Location location, Exception failure, protected Result(Operation.TYPE operationType, Exception failure,
long version, long took, int estimatedSizeInBytes) { long version, int estimatedSizeInBytes) {
this.operationType = operationType; this.operationType = operationType;
this.location = location;
this.failure = failure; this.failure = failure;
this.version = version; this.version = version;
this.took = took;
this.estimatedSizeInBytes = estimatedSizeInBytes; this.estimatedSizeInBytes = estimatedSizeInBytes;
} }
protected Result(Operation.TYPE operationType, Translog.Location location, protected Result(Operation.TYPE operationType, long version, int estimatedSizeInBytes) {
long version, long took, int estimatedSizeInBytes) { this(operationType, null, version, estimatedSizeInBytes);
this(operationType, location, null, version, took, estimatedSizeInBytes);
}
protected Result(Operation.TYPE operationType, Exception failure,
long version, long took, int estimatedSizeInBytes) {
this(operationType, null, failure, version, took, estimatedSizeInBytes);
} }
public boolean hasFailure() { public boolean hasFailure() {
@ -340,18 +333,38 @@ public abstract class Engine implements Closeable {
} }
return estimatedSizeInBytes; return estimatedSizeInBytes;
} }
public void setLocation(Translog.Location location) {
if (freeze == false) {
this.location = location;
} else {
throw new IllegalStateException("result is already frozen");
}
}
public void setTook(long took) {
if (freeze == false) {
this.took = took;
} else {
throw new IllegalStateException("result is already frozen");
}
}
public void freeze() {
this.freeze = true;
}
} }
public static class IndexResult extends Result { public static class IndexResult extends Result {
private final boolean created; private final boolean created;
public IndexResult(Translog.Location location, long version, boolean created, long took, int estimatedSizeInBytes) { public IndexResult(long version, boolean created, int estimatedSizeInBytes) {
super(Operation.TYPE.INDEX, location, version, took, estimatedSizeInBytes); super(Operation.TYPE.INDEX, version, estimatedSizeInBytes);
this.created = created; this.created = created;
} }
public IndexResult(Exception failure, long version, long took, int estimatedSizeInBytes) { public IndexResult(Exception failure, long version, int estimatedSizeInBytes) {
super(Operation.TYPE.INDEX, failure, version, took, estimatedSizeInBytes); super(Operation.TYPE.INDEX, failure, version, estimatedSizeInBytes);
this.created = false; this.created = false;
} }
@ -363,13 +376,13 @@ public abstract class Engine implements Closeable {
public static class DeleteResult extends Result { public static class DeleteResult extends Result {
private final boolean found; private final boolean found;
public DeleteResult(Translog.Location location, long version, boolean found, long took, int estimatedSizeInBytes) { public DeleteResult(long version, boolean found, int estimatedSizeInBytes) {
super(Operation.TYPE.DELETE, location, version, took, estimatedSizeInBytes); super(Operation.TYPE.DELETE, version, estimatedSizeInBytes);
this.found = found; this.found = found;
} }
DeleteResult(Exception failure, long version, long took, int estimatedSizeInBytes) { public DeleteResult(Exception failure, long version, int estimatedSizeInBytes) {
super(Operation.TYPE.DELETE, failure, version, took, estimatedSizeInBytes); super(Operation.TYPE.DELETE, failure, version, estimatedSizeInBytes);
this.found = false; this.found = false;
} }

View File

@ -387,26 +387,6 @@ public class InternalEngine extends Engine {
return currentVersion; return currentVersion;
} }
private static VersionValueSupplier NEW_VERSION_VALUE = (u, t) -> new VersionValue(u);
@FunctionalInterface
private interface VersionValueSupplier {
VersionValue apply(long updatedVersion, long time);
}
private <T extends Engine.Operation> Translog.Location maybeAddToTranslog(
final T op,
final long updatedVersion,
final Function<T, Translog.Operation> toTranslogOp,
final VersionValueSupplier toVersionValue) throws IOException {
Translog.Location location = null;
if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
location = translog.add(toTranslogOp.apply(op));
}
versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
return location;
}
@Override @Override
public IndexResult index(Index index) { public IndexResult index(Index index) {
IndexResult result; IndexResult result;
@ -423,7 +403,7 @@ public class InternalEngine extends Engine {
} catch (Exception e) { } catch (Exception e) {
Exception documentFailure = extractDocumentFailure(index, e); Exception documentFailure = extractDocumentFailure(index, e);
result = new IndexResult(documentFailure, index.version(), result = new IndexResult(documentFailure, index.version(),
index.startTime() - System.nanoTime(), index.estimatedSizeInBytes()); index.estimatedSizeInBytes());
} }
return result; return result;
} }
@ -551,7 +531,7 @@ public class InternalEngine extends Engine {
final long expectedVersion = index.version(); final long expectedVersion = index.version();
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) { if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) {
// skip index operation because of version conflict on recovery // skip index operation because of version conflict on recovery
return new IndexResult(null, expectedVersion, false, index.startTime() - System.nanoTime(), index.estimatedSizeInBytes()); return new IndexResult(expectedVersion, false, index.estimatedSizeInBytes());
} else { } else {
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion); index.parsedDoc().version().setLongValue(updatedVersion);
@ -561,8 +541,17 @@ public class InternalEngine extends Engine {
} else { } else {
update(index.uid(), index.docs(), indexWriter); update(index.uid(), index.docs(), indexWriter);
} }
location = maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE); IndexResult indexResult = new IndexResult(updatedVersion, deleted, index.estimatedSizeInBytes());
return new IndexResult(location, updatedVersion, deleted, index.startTime() - System.nanoTime(), index.estimatedSizeInBytes()); if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
location = translog.add(new Translog.Index(index, indexResult));
} else {
location = null;
}
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
indexResult.setLocation(location);
indexResult.setTook(index.startTime() - System.nanoTime());
indexResult.freeze();
return indexResult;
} }
} }
} }
@ -593,7 +582,7 @@ public class InternalEngine extends Engine {
} catch (Exception e) { } catch (Exception e) {
Exception documentFailure = extractDocumentFailure(delete, e); Exception documentFailure = extractDocumentFailure(delete, e);
result = new DeleteResult(documentFailure, delete.version(), result = new DeleteResult(documentFailure, delete.version(),
delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes()); delete.estimatedSizeInBytes());
} }
maybePruneDeletedTombstones(); maybePruneDeletedTombstones();
return result; return result;
@ -628,14 +617,24 @@ public class InternalEngine extends Engine {
final long expectedVersion = delete.version(); final long expectedVersion = delete.version();
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) { if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
// skip executing delete because of version conflict on recovery // skip executing delete because of version conflict on recovery
return new DeleteResult(null, expectedVersion, true, return new DeleteResult(expectedVersion, true,
delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes()); delete.estimatedSizeInBytes());
} else { } else {
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue); found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
location = maybeAddToTranslog(delete, updatedVersion, Translog.Delete::new, DeleteVersionValue::new); DeleteResult deleteResult = new DeleteResult(updatedVersion, found,
return new DeleteResult(location, updatedVersion, found, delete.estimatedSizeInBytes());
delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes()); if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
location = translog.add(new Translog.Delete(delete, deleteResult));
} else {
location = null;
}
versionMap.putUnderLock(delete.uid().bytes(),
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
deleteResult.setLocation(location);
deleteResult.setTook(delete.startTime() - System.nanoTime());
deleteResult.freeze();
return deleteResult;
} }
} }
} }

View File

@ -830,13 +830,13 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
} }
public Index(Engine.Index index) { public Index(Engine.Index index, Engine.IndexResult indexResult) {
this.id = index.id(); this.id = index.id();
this.type = index.type(); this.type = index.type();
this.source = index.source(); this.source = index.source();
this.routing = index.routing(); this.routing = index.routing();
this.parent = index.parent(); this.parent = index.parent();
this.version = index.version(); this.version = indexResult.getVersion();
this.timestamp = index.timestamp(); this.timestamp = index.timestamp();
this.ttl = index.ttl(); this.ttl = index.ttl();
this.versionType = index.versionType(); this.versionType = index.versionType();
@ -994,9 +994,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
assert versionType.validateVersionForWrites(this.version); assert versionType.validateVersionForWrites(this.version);
} }
public Delete(Engine.Delete delete) { public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) {
this.uid = delete.uid(); this.uid = delete.uid();
this.version = delete.version(); this.version = deleteResult.getVersion();
this.versionType = delete.versionType(); this.versionType = delete.versionType();
} }

View File

@ -112,7 +112,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger); IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1")); Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null); Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
compositeListener.postDelete(delete, new Engine.DeleteResult(null, 1, true, 0, 0)); compositeListener.postDelete(delete, new Engine.DeleteResult(1, true, 0));
assertEquals(0, preIndex.get()); assertEquals(0, preIndex.get());
assertEquals(0, postIndex.get()); assertEquals(0, postIndex.get());
assertEquals(0, postIndexException.get()); assertEquals(0, postIndexException.get());
@ -136,7 +136,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get()); assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get()); assertEquals(2, postDeleteException.get());
compositeListener.postIndex(index, new Engine.IndexResult(null, 0, false, 0, 0)); compositeListener.postIndex(index, new Engine.IndexResult(0, false, 0));
assertEquals(0, preIndex.get()); assertEquals(0, preIndex.get());
assertEquals(2, postIndex.get()); assertEquals(2, postIndex.get());
assertEquals(0, postIndexException.get()); assertEquals(0, postIndexException.get());