Fail shard (recovery) allocation on a node when the index does not exists, closes #1148.

This commit is contained in:
kimchy 2011-07-22 06:56:34 +03:00
parent a7190ea8a3
commit 8c49da12ff
7 changed files with 30 additions and 11 deletions

View File

@ -51,7 +51,7 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
/** /**
* Recovers the state of the shard from the gateway. * Recovers the state of the shard from the gateway.
*/ */
void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException; void recover(boolean indexShouldExists, RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException;
/** /**
* Snapshots the given shard into the gateway. * Snapshots the given shard into the gateway.

View File

@ -26,6 +26,10 @@ import org.elasticsearch.index.shard.ShardId;
*/ */
public class IndexShardGatewayRecoveryException extends IndexShardGatewayException { public class IndexShardGatewayRecoveryException extends IndexShardGatewayException {
public IndexShardGatewayRecoveryException(ShardId shardId, String msg) {
super(shardId, msg);
}
public IndexShardGatewayRecoveryException(ShardId shardId, String msg, Throwable cause) { public IndexShardGatewayRecoveryException(ShardId shardId, String msg, Throwable cause) {
super(shardId, msg, cause); super(shardId, msg, cause);
} }

View File

@ -151,7 +151,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
/** /**
* Recovers the state of the shard from the gateway. * Recovers the state of the shard from the gateway.
*/ */
public void recover(final RecoveryListener listener) throws IndexShardGatewayRecoveryException, IgnoreGatewayRecoveryException { public void recover(final boolean indexShouldExists, final RecoveryListener listener) throws IndexShardGatewayRecoveryException, IgnoreGatewayRecoveryException {
if (indexShard.state() == IndexShardState.CLOSED) { if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery // got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed"); listener.onIgnoreRecovery("shard closed");
@ -176,7 +176,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
try { try {
logger.debug("starting recovery from {} ...", shardGateway); logger.debug("starting recovery from {} ...", shardGateway);
shardGateway.recover(recoveryStatus); shardGateway.recover(indexShouldExists, recoveryStatus);
lastIndexVersion = recoveryStatus.index().version(); lastIndexVersion = recoveryStatus.index().version();
lastTranslogId = -1; lastTranslogId = -1;

View File

@ -24,7 +24,11 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Iterables; import org.elasticsearch.common.collect.Iterables;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
@ -36,7 +40,14 @@ import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.*; import org.elasticsearch.index.gateway.CommitPoint;
import org.elasticsearch.index.gateway.CommitPoints;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.index.gateway.SnapshotStatus;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -358,7 +369,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
} }
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException { @Override public void recover(boolean indexShouldExists, RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
this.recoveryStatus = recoveryStatus; this.recoveryStatus = recoveryStatus;
final ImmutableMap<String, BlobMetaData> blobs; final ImmutableMap<String, BlobMetaData> blobs;

View File

@ -82,7 +82,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
return recoveryStatus; return recoveryStatus;
} }
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException { @Override public void recover(boolean indexShouldExists, RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
recoveryStatus.index().startTime(System.currentTimeMillis()); recoveryStatus.index().startTime(System.currentTimeMillis());
long version = -1; long version = -1;
long translogId = -1; long translogId = -1;
@ -95,6 +95,8 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
} else { } else {
translogId = version; translogId = version;
} }
} else if (indexShouldExists) {
throw new IndexShardGatewayRecoveryException(shardId(), "shard allocated for local recovery (post api), should exists, but doesn't");
} }
} catch (IOException e) { } catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e); throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);

View File

@ -56,7 +56,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
return recoveryStatus; return recoveryStatus;
} }
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException { @Override public void recover(boolean indexShouldExists, RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
recoveryStatus().index().startTime(System.currentTimeMillis()); recoveryStatus().index().startTime(System.currentTimeMillis());
// in the none case, we simply start the shard // in the none case, we simply start the shard
// clean the store, there should be nothing there... // clean the store, there should be nothing there...

View File

@ -492,12 +492,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} }
if (shardRouting.initializing()) { if (shardRouting.initializing()) {
applyInitializingShard(routingTable, nodes, shardRouting); applyInitializingShard(routingTable, nodes, routingTable.index(shardRouting.index()).shard(shardRouting.id()), shardRouting);
} }
} }
} }
private void applyInitializingShard(final RoutingTable routingTable, final DiscoveryNodes nodes, final ShardRouting shardRouting) throws ElasticSearchException { private void applyInitializingShard(final RoutingTable routingTable, final DiscoveryNodes nodes, final IndexShardRoutingTable indexShardRouting, final ShardRouting shardRouting) throws ElasticSearchException {
final IndexService indexService = indicesService.indexServiceSafe(shardRouting.index()); final IndexService indexService = indicesService.indexServiceSafe(shardRouting.index());
final int shardId = shardRouting.id(); final int shardId = shardRouting.id();
@ -582,8 +582,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} else { } else {
if (shardRouting.relocatingNodeId() == null) { if (shardRouting.relocatingNodeId() == null) {
// we are the first primary, recover from the gateway // we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
boolean indexShouldExists = indexShardRouting.allocatedPostApi();
IndexShardGatewayService shardGatewayService = indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class); IndexShardGatewayService shardGatewayService = indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class);
shardGatewayService.recover(new IndexShardGatewayService.RecoveryListener() { shardGatewayService.recover(indexShouldExists, new IndexShardGatewayService.RecoveryListener() {
@Override public void onRecoveryDone() { @Override public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, "after recovery from gateway"); shardStateAction.shardStarted(shardRouting, "after recovery from gateway");
} }