Failure to recover a shard might cause loosing translog data (especially with no replicas), closes #869.

This commit is contained in:
kimchy 2011-04-20 04:52:47 +03:00
parent 3c233347b8
commit f5dbcb2fa4
2 changed files with 17 additions and 8 deletions

View File

@ -144,15 +144,24 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
try { try {
InputStreamStreamInput si = new InputStreamStreamInput(new FileInputStream(recoveringTranslogFile)); InputStreamStreamInput si = new InputStreamStreamInput(new FileInputStream(recoveringTranslogFile));
while (true) { while (true) {
int opSize = si.readInt(); Translog.Operation operation;
Translog.Operation operation = TranslogStreams.readTranslogOperation(si); try {
int opSize = si.readInt();
operation = TranslogStreams.readTranslogOperation(si);
} catch (EOFException e) {
// ignore, not properly written the last op
break;
} catch (IOException e) {
// ignore, not properly written last op
break;
}
recoveryStatus.translog().addTranslogOperations(1); recoveryStatus.translog().addTranslogOperations(1);
indexShard.performRecoveryOperation(operation); indexShard.performRecoveryOperation(operation);
} }
} catch (EOFException e) { } catch (Throwable e) {
// ignore this exception, its fine // we failed to recovery, make sure to delete the translog file (and keep the recovering one)
} catch (IOException e) { indexShard.translog().close(true);
// ignore this as well throw new IndexShardGatewayRecoveryException(shardId, "failed to recover shard", e);
} }
indexShard.performRecoveryFinalization(true); indexShard.performRecoveryFinalization(true);

View File

@ -539,7 +539,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
synchronized (mutex) { synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) { if (indexService.hasShard(shardRouting.shardId().id())) {
try { try {
indexService.cleanShard(shardRouting.shardId().id(), "recovery failure [" + ExceptionsHelper.detailedMessage(failure) + "]"); indexService.removeShard(shardRouting.shardId().id(), "recovery failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
} catch (IndexShardMissingException e) { } catch (IndexShardMissingException e) {
// the node got closed on us, ignore it // the node got closed on us, ignore it
} catch (Exception e1) { } catch (Exception e1) {
@ -576,7 +576,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
synchronized (mutex) { synchronized (mutex) {
if (indexService.hasShard(shardId.id())) { if (indexService.hasShard(shardId.id())) {
try { try {
indexService.cleanShard(shardId.id(), "engine failure [" + ExceptionsHelper.detailedMessage(failure) + "]"); indexService.removeShard(shardId.id(), "engine failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
} catch (IndexShardMissingException e) { } catch (IndexShardMissingException e) {
// the node got closed on us, ignore it // the node got closed on us, ignore it
} catch (Exception e1) { } catch (Exception e1) {