allow to perform operations on a recovering shard using versioning to do conflict detection

This commit is contained in:
Shay Banon 2011-12-08 17:41:27 +02:00
parent 1cd3af9de0
commit e56086cf7b
3 changed files with 217 additions and 226 deletions

View File

@ -374,21 +374,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private void innerCreate(Create create, IndexWriter writer) throws IOException { private void innerCreate(Create create, IndexWriter writer) throws IOException {
synchronized (dirtyLock(create.uid())) { synchronized (dirtyLock(create.uid())) {
UidField uidField = create.uidField(); UidField uidField = create.uidField();
if (create.origin() == Operation.Origin.RECOVERY) { final long currentVersion;
uidField.version(create.version());
// we use update doc and not addDoc since we might get duplicates when using transient translog
if (create.docs().size() > 1) {
writer.updateDocuments(create.uid(), create.docs(), create.analyzer());
} else {
writer.updateDocument(create.uid(), create.docs().get(0), create.analyzer());
}
Translog.Location translogLocation = translog.add(new Translog.Create(create));
// on recovery, we get the actual version we want to use
if (create.version() != 0) {
versionMap.put(create.uid().text(), new VersionValue(create.version(), false, threadPool.estimatedTimeInMillis(), translogLocation));
}
} else {
long currentVersion;
VersionValue versionValue = versionMap.get(create.uid().text()); VersionValue versionValue = versionMap.get(create.uid().text());
if (versionValue == null) { if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(create.uid()); currentVersion = loadCurrentVersionFromIndex(create.uid());
@ -427,18 +413,22 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
updatedVersion = create.version(); updatedVersion = create.version();
} }
} else { // if (index.origin() == Operation.Origin.REPLICA) { } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
long expectedVersion = create.version(); long expectedVersion = create.version();
if (currentVersion != -2) { // -2 means we don't have a version, so ignore... if (currentVersion != -2) { // -2 means we don't have a version, so ignore...
// if it does not exists, and its considered the first index operation (replicas are 1 of) // if it does not exists, and its considered the first index operation (replicas/recovery are 1 of)
// then nothing to do // then nothing to check
if (!(currentVersion == -1 && create.version() == 1)) { if (!(currentVersion == -1 && create.version() == 1)) {
// with replicas, we only check for previous version, we allow to set a future version // with replicas/recovery, we only check for previous version, we allow to set a future version
if (expectedVersion <= currentVersion) { if (expectedVersion <= currentVersion) {
if (create.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion); throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
} }
} }
} }
}
// replicas already hold the "future" version // replicas already hold the "future" version
updatedVersion = create.version(); updatedVersion = create.version();
} }
@ -446,12 +436,20 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// if the doc does not exists or it exists but not delete // if the doc does not exists or it exists but not delete
if (versionValue != null) { if (versionValue != null) {
if (!versionValue.delete()) { if (!versionValue.delete()) {
if (create.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new DocumentAlreadyExistsEngineException(shardId, create.type(), create.id()); throw new DocumentAlreadyExistsEngineException(shardId, create.type(), create.id());
} }
}
} else if (currentVersion != -1) { } else if (currentVersion != -1) {
// its not deleted, its already there // its not deleted, its already there
if (create.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new DocumentAlreadyExistsEngineException(shardId, create.type(), create.id()); throw new DocumentAlreadyExistsEngineException(shardId, create.type(), create.id());
} }
}
uidField.version(updatedVersion); uidField.version(updatedVersion);
create.version(updatedVersion); create.version(updatedVersion);
@ -466,7 +464,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation)); versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
} }
} }
}
@Override @Override
public void index(Index index) throws EngineException { public void index(Index index) throws EngineException {
@ -499,20 +496,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private void innerIndex(Index index, IndexWriter writer) throws IOException { private void innerIndex(Index index, IndexWriter writer) throws IOException {
synchronized (dirtyLock(index.uid())) { synchronized (dirtyLock(index.uid())) {
UidField uidField = index.uidField(); UidField uidField = index.uidField();
if (index.origin() == Operation.Origin.RECOVERY) { final long currentVersion;
uidField.version(index.version());
if (index.docs().size() > 1) {
writer.updateDocuments(index.uid(), index.docs(), index.analyzer());
} else {
writer.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
}
Translog.Location translogLocation = translog.add(new Translog.Index(index));
// on recovery, we get the actual version we want to use
if (index.version() != 0) {
versionMap.put(index.uid().text(), new VersionValue(index.version(), false, threadPool.estimatedTimeInMillis(), translogLocation));
}
} else {
long currentVersion;
VersionValue versionValue = versionMap.get(index.uid().text()); VersionValue versionValue = versionMap.get(index.uid().text());
if (versionValue == null) { if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid()); currentVersion = loadCurrentVersionFromIndex(index.uid());
@ -550,18 +534,22 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
updatedVersion = index.version(); updatedVersion = index.version();
} }
} else { // if (index.origin() == Operation.Origin.REPLICA) { } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
long expectedVersion = index.version(); long expectedVersion = index.version();
if (currentVersion != -2) { // -2 means we don't have a version, so ignore... if (currentVersion != -2) { // -2 means we don't have a version, so ignore...
// if it does not exists, and its considered the first index operation (replicas are 1 of) // if it does not exists, and its considered the first index operation (replicas/recovery are 1 of)
// then nothing to do // then nothing to check
if (!(currentVersion == -1 && index.version() == 1)) { if (!(currentVersion == -1 && index.version() == 1)) {
// with replicas, we only check for previous version, we allow to set a future version // with replicas/recovery, we only check for previous version, we allow to set a future version
if (expectedVersion <= currentVersion) { if (expectedVersion <= currentVersion) {
if (index.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion); throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
} }
} }
} }
}
// replicas already hold the "future" version // replicas already hold the "future" version
updatedVersion = index.version(); updatedVersion = index.version();
} }
@ -588,7 +576,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation)); versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));
} }
} }
}
@Override @Override
public void delete(Delete delete) throws EngineException { public void delete(Delete delete) throws EngineException {
@ -619,15 +606,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private void innerDelete(Delete delete, IndexWriter writer) throws IOException { private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
synchronized (dirtyLock(delete.uid())) { synchronized (dirtyLock(delete.uid())) {
if (delete.origin() == Operation.Origin.RECOVERY) { final long currentVersion;
writer.deleteDocuments(delete.uid());
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
// update the version with the exact version from recovery, assuming we have it
if (delete.version() != 0) {
versionMap.put(delete.uid().text(), new VersionValue(delete.version(), true, threadPool.estimatedTimeInMillis(), translogLocation));
}
} else {
long currentVersion;
VersionValue versionValue = versionMap.get(delete.uid().text()); VersionValue versionValue = versionMap.get(delete.uid().text());
if (versionValue == null) { if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(delete.uid()); currentVersion = loadCurrentVersionFromIndex(delete.uid());
@ -662,17 +641,21 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
updatedVersion = delete.version(); updatedVersion = delete.version();
} }
} else { // if (delete.origin() == Operation.Origin.REPLICA) { } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
// on replica, the version is the future value expected (returned from the operation on the primary) // on replica, the version is the future value expected (returned from the operation on the primary)
if (currentVersion != -2) { // -2 means we don't have a version in the index, ignore if (currentVersion != -2) { // -2 means we don't have a version in the index, ignore
// only check if we have a version for it, otherwise, ignore (see later) // only check if we have a version for it, otherwise, ignore (see later)
if (currentVersion != -1) { if (currentVersion != -1) {
// with replicas, we only check for previous version, we allow to set a future version // with replicas, we only check for previous version, we allow to set a future version
if (delete.version() <= currentVersion) { if (delete.version() <= currentVersion) {
if (delete.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion - 1, delete.version()); throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion - 1, delete.version());
} }
} }
} }
}
// replicas already hold the "future" version // replicas already hold the "future" version
updatedVersion = delete.version(); updatedVersion = delete.version();
} }
@ -695,7 +678,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
} }
} }
}
@Override @Override
public void delete(DeleteByQuery delete) throws EngineException { public void delete(DeleteByQuery delete) throws EngineException {

View File

@ -475,7 +475,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Override @Override
public void flush(Engine.Flush flush) throws ElasticSearchException { public void flush(Engine.Flush flush) throws ElasticSearchException {
verifyStarted(); // we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
// we don't gc).
verifyStartedOrRecovering();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("flush with {}", flush); logger.trace("flush with {}", flush);
} }
@ -544,6 +547,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (checkIndexOnStartup) { if (checkIndexOnStartup) {
checkIndex(true); checkIndex(true);
} }
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
engine.enableGcDeletes(false);
engine.start(); engine.start();
} }
@ -572,6 +578,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
startScheduledTasksIfNeeded(); startScheduledTasksIfNeeded();
indicesLifecycle.afterIndexShardStarted(this); indicesLifecycle.afterIndexShardStarted(this);
engine.enableGcDeletes(true);
} }
public void performRecoveryOperation(Translog.Operation operation) throws ElasticSearchException { public void performRecoveryOperation(Translog.Operation operation) throws ElasticSearchException {
@ -641,14 +648,18 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
} }
public void writeAllowed() throws IllegalIndexShardStateException { private void writeAllowed() throws IllegalIndexShardStateException {
verifyStartedOrRecovering();
}
private void verifyStartedOrRecovering() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED) { if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING) {
throw new IndexShardNotStartedException(shardId, state); throw new IllegalIndexShardStateException(shardId, state, "write operation only allowed when started/recovering");
} }
} }
public void verifyStarted() throws IllegalIndexShardStateException { private void verifyStarted() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED) { if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state); throw new IndexShardNotStartedException(shardId, state);

View File

@ -170,9 +170,7 @@ public class TranslogService extends AbstractIndexShardComponent {
@Override @Override
public void run() { public void run() {
try { try {
if (indexShard.state() == IndexShardState.STARTED) {
indexShard.flush(new Engine.Flush()); indexShard.flush(new Engine.Flush());
}
} catch (EngineClosedException e) { } catch (EngineClosedException e) {
// we are being closed, ignore // we are being closed, ignore
} catch (FlushNotAllowedEngineException e) { } catch (FlushNotAllowedEngineException e) {