add reason for state change logging in index shard, add debug logging on ignore recovery in when handling cluster change in indices cluster

This commit is contained in:
kimchy 2010-12-23 10:56:37 +02:00
parent 5ac42f2a4f
commit 473c2fa8f4
14 changed files with 64 additions and 59 deletions

View File

@ -191,7 +191,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
try {
mapperService.add(MapperService.DEFAULT_MAPPING, XContentFactory.jsonBuilder().map(mappings.get(MapperService.DEFAULT_MAPPING)).string());
} catch (Exception e) {
indicesService.deleteIndex(request.index);
indicesService.deleteIndex(request.index, "failed on parsing default mapping on index creation");
throw new MapperParsingException("mapping [" + MapperService.DEFAULT_MAPPING + "]", e);
}
}
@ -202,7 +202,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
try {
mapperService.add(entry.getKey(), XContentFactory.jsonBuilder().map(entry.getValue()).string());
} catch (Exception e) {
indicesService.deleteIndex(request.index);
indicesService.deleteIndex(request.index, "failed on parsing mappings on index creation");
throw new MapperParsingException("mapping [" + entry.getKey() + "]", e);
}
}

View File

@ -127,7 +127,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
return;
}
try {
indexShard.recovering();
indexShard.recovering("from gateway");
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
listener.onIgnoreRecovery("already in recovering process, " + e.getMessage());
@ -150,7 +150,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
// start the shard if the gateway has not started it already
if (indexShard.state() != IndexShardState.STARTED) {
indexShard.start();
indexShard.start("post recovery from gateway");
}
// refresh the shard
indexShard.refresh(new Engine.Refresh(false));

View File

@ -421,7 +421,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
private void recoverTranslog(CommitPoint commitPoint, ImmutableMap<String, BlobMetaData> blobs) throws IndexShardGatewayRecoveryException {
if (commitPoint.translogFiles().isEmpty()) {
// no translog files, bail
indexShard.start();
indexShard.start("post recovery from gateway, no translog");
return;
}

View File

@ -96,7 +96,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
recoveryStatus.translog().startTime(System.currentTimeMillis());
if (version == -1) {
// no translog files, bail
indexShard.start();
indexShard.start("post recovery from gateway, no translog");
// no index, just start the shard and bail
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
return;
@ -119,7 +119,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
if (!recoveringTranslogFile.exists()) {
// no translog to recovery from, start and bail
// no translog files, bail
indexShard.start();
indexShard.start("post recovery from gateway, no translog");
// no index, just start the shard and bail
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
return;

View File

@ -65,7 +65,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
} catch (IOException e) {
logger.warn("failed to clean store before starting shard", e);
}
indexShard.start();
indexShard.start("post recovery from gateway");
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().startTime(System.currentTimeMillis());
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.service;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.IndexComponent;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.analysis.AnalysisService;
@ -38,7 +37,7 @@ import org.elasticsearch.index.store.IndexStore;
/**
* @author kimchy (shay.banon)
*/
public interface IndexService extends IndexComponent, Iterable<IndexShard>, CloseableIndexComponent {
public interface IndexService extends IndexComponent, Iterable<IndexShard> {
Injector injector();
@ -63,12 +62,12 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard>, Clos
/**
* Cleans the shard locally, does not touch the gateway!.
*/
void cleanShard(int shardId) throws ElasticSearchException;
void cleanShard(int shardId, String reason) throws ElasticSearchException;
/**
* Removes the shard, does not delete local data or the gateway.
*/
void removeShard(int shardId) throws ElasticSearchException;
void removeShard(int shardId, String reason) throws ElasticSearchException;
int numberOfShards();

View File

@ -51,6 +51,7 @@ import org.elasticsearch.index.shard.IndexShardManagement;
import org.elasticsearch.index.shard.IndexShardModule;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
@ -199,7 +200,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexEngine;
}
@Override public void close(final boolean delete) {
public void close(final boolean delete, final String reason) {
try {
Set<Integer> shardIds = shardIds();
final CountDownLatch latch = new CountDownLatch(shardIds.size());
@ -207,7 +208,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
deleteShard(shardId, delete, !delete, delete);
deleteShard(shardId, delete, !delete, delete, reason);
} catch (Exception e) {
logger.warn("failed to close shard, delete [{}]", e, delete);
} finally {
@ -272,15 +273,15 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexShard;
}
@Override public synchronized void cleanShard(int shardId) throws ElasticSearchException {
deleteShard(shardId, true, false, false);
@Override public synchronized void cleanShard(int shardId, String reason) throws ElasticSearchException {
deleteShard(shardId, true, false, false, reason);
}
@Override public synchronized void removeShard(int shardId) throws ElasticSearchException {
deleteShard(shardId, false, false, false);
@Override public synchronized void removeShard(int shardId, String reason) throws ElasticSearchException {
deleteShard(shardId, false, false, false, reason);
}
private void deleteShard(int shardId, boolean delete, boolean snapshotGateway, boolean deleteGateway) throws ElasticSearchException {
private void deleteShard(int shardId, boolean delete, boolean snapshotGateway, boolean deleteGateway, String reason) throws ElasticSearchException {
Injector shardInjector;
IndexShard indexShard;
synchronized (this) {
@ -329,7 +330,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {
indexShard.close();
((InternalIndexShard) indexShard).close(reason);
}
try {
shardInjector.getInstance(Engine.class).close();

View File

@ -233,7 +233,7 @@ public class RecoverySource extends AbstractComponent {
if (request.markAsRelocated()) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
shard.relocated();
shard.relocated("to " + request.targetNode());
} catch (IllegalIndexShardStateException e) {
// we can ignore this exception since, on the other node, when it moved to phase3
// it will also send shard started, which might cause the index shard we work against

View File

@ -111,24 +111,24 @@ public class RecoveryTarget extends AbstractComponent {
public void startRecovery(final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) {
if (request.sourceNode() == null) {
listener.onIgnoreRecovery(false, "No node to recovery from, retry on next cluster state update");
listener.onIgnoreRecovery(false, "No node to recover from, retry on next cluster state update");
return;
}
IndexService indexService = indicesService.indexService(request.shardId().index().name());
if (indexService == null) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "index missing, stop recovery");
listener.onIgnoreRecovery(false, "index missing locally, stop recovery");
return;
}
final InternalIndexShard shard = (InternalIndexShard) indexService.shard(request.shardId().id());
if (shard == null) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "shard missing, stop recovery");
listener.onIgnoreRecovery(false, "shard missing locally, stop recovery");
return;
}
if (!fromRetry) {
try {
shard.recovering();
shard.recovering("from " + request.sourceNode());
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage());
@ -137,7 +137,7 @@ public class RecoveryTarget extends AbstractComponent {
}
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
threadPool.cached().execute(new Runnable() {
@ -150,7 +150,7 @@ public class RecoveryTarget extends AbstractComponent {
private void doRecovery(final InternalIndexShard shard, final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) {
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
@ -172,7 +172,8 @@ public class RecoveryTarget extends AbstractComponent {
}
}).txGet();
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
removeAndCleanOnGoingRecovery(shard.shardId());
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
stopWatch.stop();
@ -197,7 +198,7 @@ public class RecoveryTarget extends AbstractComponent {
// logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
@ -215,7 +216,7 @@ public class RecoveryTarget extends AbstractComponent {
// here, we would add checks against exception that need to be retried (and not removeAndClean in this case)
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
// no need to retry here, since we only get to try and recover when there is an existing shard on the other side
// if the target is not ready yet, retry
listener.onRetryRecovery(TimeValue.timeValueMillis(500));
return;
}
@ -228,12 +229,12 @@ public class RecoveryTarget extends AbstractComponent {
removeAndCleanOnGoingRecovery(request.shardId());
if (cause instanceof ConnectTransportException) {
listener.onIgnoreRecovery(true, "source node disconnected");
listener.onIgnoreRecovery(true, "source node disconnected (" + request.sourceNode() + ")");
return;
}
if (cause instanceof IndexShardClosedException) {
listener.onIgnoreRecovery(true, "source shard is closed");
listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")");
return;
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.shard.service;
import org.apache.lucene.index.Term;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.engine.Engine;
@ -38,7 +37,7 @@ import javax.annotation.Nullable;
* @author kimchy (shay.banon)
*/
@ThreadSafe
public interface IndexShard extends IndexShardComponent, CloseableComponent {
public interface IndexShard extends IndexShardComponent {
ShardRouting routingEntry();

View File

@ -143,7 +143,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
/**
* Marks the shard as recovering, fails with exception is recovering is not allowed to be set.
*/
public IndexShardState recovering() throws IndexShardStartedException,
public IndexShardState recovering(String reason) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
synchronized (mutex) {
IndexShardState returnValue = state;
@ -159,24 +159,24 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (state == IndexShardState.RECOVERING) {
throw new IndexShardRecoveringException(shardId);
}
logger.debug("state: [{}]->[{}]", state, IndexShardState.RECOVERING);
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RECOVERING, reason);
state = IndexShardState.RECOVERING;
return returnValue;
}
}
public InternalIndexShard relocated() throws IndexShardNotStartedException {
public InternalIndexShard relocated(String reason) throws IndexShardNotStartedException {
synchronized (mutex) {
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
}
logger.debug("state: [{}]->[{}]", state, IndexShardState.RELOCATED);
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RELOCATED, reason);
state = IndexShardState.RELOCATED;
}
return this;
}
public InternalIndexShard start() throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
public InternalIndexShard start(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
@ -192,7 +192,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
engine.start();
scheduleRefresherIfNeeded();
logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED);
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.STARTED, reason);
state = IndexShardState.STARTED;
}
return this;
@ -391,7 +391,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return engine.searcher();
}
@Override public void close() {
public void close(String reason) {
synchronized (mutex) {
if (state != IndexShardState.CLOSED) {
if (refreshScheduledFuture != null) {
@ -399,7 +399,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
refreshScheduledFuture = null;
}
}
logger.debug("state: [{}]->[{}]", state, IndexShardState.CLOSED);
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason);
state = IndexShardState.CLOSED;
}
}
@ -435,7 +435,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.flush(new Engine.Flush());
}
synchronized (mutex) {
logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED);
logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.STARTED);
state = IndexShardState.STARTED;
}
scheduleRefresherIfNeeded();

View File

@ -52,10 +52,10 @@ public interface IndicesService extends Iterable<IndexService>, LifecycleCompone
IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticSearchException;
void deleteIndex(String index) throws ElasticSearchException;
void deleteIndex(String index, String reason) throws ElasticSearchException;
/**
* Cleans the index without actually deleting any content for it.
*/
void cleanIndex(String index) throws ElasticSearchException;
void cleanIndex(String index, String reason) throws ElasticSearchException;
}

View File

@ -49,6 +49,7 @@ import org.elasticsearch.index.gateway.IndexGatewayModule;
import org.elasticsearch.index.mapper.MapperServiceModule;
import org.elasticsearch.index.query.IndexQueryParserModule;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
@ -128,7 +129,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
deleteIndex(index, false);
deleteIndex(index, false, "shutdown");
} catch (Exception e) {
logger.warn("failed to delete index on stop [" + index + "]", e);
} finally {
@ -244,15 +245,15 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
return indexService;
}
@Override public synchronized void cleanIndex(String index) throws ElasticSearchException {
deleteIndex(index, false);
@Override public synchronized void cleanIndex(String index, String reason) throws ElasticSearchException {
deleteIndex(index, false, reason);
}
@Override public synchronized void deleteIndex(String index) throws ElasticSearchException {
deleteIndex(index, true);
@Override public synchronized void deleteIndex(String index, String reason) throws ElasticSearchException {
deleteIndex(index, true, reason);
}
private void deleteIndex(String index, boolean delete) throws ElasticSearchException {
private void deleteIndex(String index, boolean delete, String reason) throws ElasticSearchException {
Injector indexInjector;
IndexService indexService;
synchronized (this) {
@ -278,7 +279,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
indexInjector.getInstance(closeable).close(delete);
}
indexService.close(delete);
((InternalIndexService) indexService).close(delete, reason);
indexInjector.getInstance(IndexCache.class).close();
indexInjector.getInstance(AnalysisService.class).close();

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices.cluster;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
@ -141,7 +142,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
// clean the index
try {
indicesService.cleanIndex(index);
indicesService.cleanIndex(index, "cleaning index (no shards allocated)");
} catch (Exception e) {
logger.warn("failed to clean index (no shards of that index are allocated on this node)", e);
}
@ -156,7 +157,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.debug("[{}] deleting index", index);
}
try {
indicesService.deleteIndex(index);
indicesService.deleteIndex(index, "deleting index");
threadPool.execute(new Runnable() {
@Override public void run() {
nodeIndexDeletedAction.nodeIndexDeleted(index, event.state().nodes().localNodeId());
@ -194,12 +195,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] removing shard (index is closed)", index, existingShardId);
}
indexService.removeShard(existingShardId);
indexService.removeShard(existingShardId, "removing shard (index is closed)");
} else {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] cleaning shard locally (not allocated)", index, existingShardId);
}
indexService.cleanShard(existingShardId);
indexService.cleanShard(existingShardId, "cleaning shard locally (not allocated)");
}
}
}
@ -370,7 +371,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} catch (Exception e) {
logger.warn("[{}][{}] failed to create shard", e, shardRouting.index(), shardRouting.id());
try {
indexService.cleanShard(shardId);
indexService.cleanShard(shardId, "failed to create [" + ExceptionsHelper.detailedMessage(e) + "]");
} catch (IndexShardMissingException e1) {
// ignore
} catch (Exception e1) {
@ -470,8 +471,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] removing shard on ignored recovery, reason [{}]", shardRouting.index(), shardRouting.shardId().id(), reason);
}
try {
indexService.removeShard(shardRouting.shardId().id());
indexService.removeShard(shardRouting.shardId().id(), "ignore recovery: " + reason);
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {
@ -491,7 +495,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
indexService.cleanShard(shardRouting.shardId().id());
indexService.cleanShard(shardRouting.shardId().id(), "recovery failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {