From 473c2fa8f41daac0f8627eb398819ce506b3a144 Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 23 Dec 2010 10:56:37 +0200 Subject: [PATCH] add reason for state change logging in index shard, add debug logging on ignore recovery in when handling cluster change in indices cluster --- .../metadata/MetaDataCreateIndexService.java | 4 ++-- .../gateway/IndexShardGatewayService.java | 4 ++-- .../blobstore/BlobStoreIndexShardGateway.java | 2 +- .../gateway/local/LocalIndexShardGateway.java | 4 ++-- .../gateway/none/NoneIndexShardGateway.java | 2 +- .../index/service/IndexService.java | 7 +++--- .../index/service/InternalIndexService.java | 17 +++++++------- .../index/shard/recovery/RecoverySource.java | 2 +- .../index/shard/recovery/RecoveryTarget.java | 23 ++++++++++--------- .../index/shard/service/IndexShard.java | 3 +-- .../shard/service/InternalIndexShard.java | 18 +++++++-------- .../elasticsearch/indices/IndicesService.java | 4 ++-- .../indices/InternalIndicesService.java | 15 ++++++------ .../cluster/IndicesClusterStateService.java | 18 +++++++++------ 14 files changed, 64 insertions(+), 59 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index ad19b0b7720..35810d6d60c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -191,7 +191,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { try { mapperService.add(MapperService.DEFAULT_MAPPING, XContentFactory.jsonBuilder().map(mappings.get(MapperService.DEFAULT_MAPPING)).string()); } catch (Exception e) { - indicesService.deleteIndex(request.index); + indicesService.deleteIndex(request.index, "failed on parsing default mapping on index creation"); throw new MapperParsingException("mapping [" + MapperService.DEFAULT_MAPPING + "]", e); } } @@ -202,7 +202,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { try { mapperService.add(entry.getKey(), XContentFactory.jsonBuilder().map(entry.getValue()).string()); } catch (Exception e) { - indicesService.deleteIndex(request.index); + indicesService.deleteIndex(request.index, "failed on parsing mappings on index creation"); throw new MapperParsingException("mapping [" + entry.getKey() + "]", e); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 51eb5d497bb..b5b48013099 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -127,7 +127,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem return; } try { - indexShard.recovering(); + indexShard.recovering("from gateway"); } catch (IllegalIndexShardStateException e) { // that's fine, since we might be called concurrently, just ignore this, we are already recovering listener.onIgnoreRecovery("already in recovering process, " + e.getMessage()); @@ -150,7 +150,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem // start the shard if the gateway has not started it already if (indexShard.state() != IndexShardState.STARTED) { - indexShard.start(); + indexShard.start("post recovery from gateway"); } // refresh the shard indexShard.refresh(new Engine.Refresh(false)); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index e3520e9e7b4..eb8e2b861b4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -421,7 +421,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo private void recoverTranslog(CommitPoint commitPoint, ImmutableMap blobs) throws IndexShardGatewayRecoveryException { if (commitPoint.translogFiles().isEmpty()) { // no translog files, bail - indexShard.start(); + indexShard.start("post recovery from gateway, no translog"); return; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index 147b5ad8b4d..ee3e5b1951b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -96,7 +96,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen recoveryStatus.translog().startTime(System.currentTimeMillis()); if (version == -1) { // no translog files, bail - indexShard.start(); + indexShard.start("post recovery from gateway, no translog"); // no index, just start the shard and bail recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); return; @@ -119,7 +119,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen if (!recoveringTranslogFile.exists()) { // no translog to recovery from, start and bail // no translog files, bail - indexShard.start(); + indexShard.start("post recovery from gateway, no translog"); // no index, just start the shard and bail recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); return; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java index 14429d42686..4f8e3d81fe4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java @@ -65,7 +65,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement } catch (IOException e) { logger.warn("failed to clean store before starting shard", e); } - indexShard.start(); + indexShard.start("post recovery from gateway"); recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); recoveryStatus.translog().startTime(System.currentTimeMillis()); recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/IndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/IndexService.java index 6c82d143f7f..4cacad422c2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/IndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/IndexService.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.service; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.inject.Injector; -import org.elasticsearch.index.CloseableIndexComponent; import org.elasticsearch.index.IndexComponent; import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.analysis.AnalysisService; @@ -38,7 +37,7 @@ import org.elasticsearch.index.store.IndexStore; /** * @author kimchy (shay.banon) */ -public interface IndexService extends IndexComponent, Iterable, CloseableIndexComponent { +public interface IndexService extends IndexComponent, Iterable { Injector injector(); @@ -63,12 +62,12 @@ public interface IndexService extends IndexComponent, Iterable, Clos /** * Cleans the shard locally, does not touch the gateway!. */ - void cleanShard(int shardId) throws ElasticSearchException; + void cleanShard(int shardId, String reason) throws ElasticSearchException; /** * Removes the shard, does not delete local data or the gateway. */ - void removeShard(int shardId) throws ElasticSearchException; + void removeShard(int shardId, String reason) throws ElasticSearchException; int numberOfShards(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index e1b851e93f9..c333b68c479 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.shard.IndexShardManagement; import org.elasticsearch.index.shard.IndexShardModule; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.Store; @@ -199,7 +200,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde return indexEngine; } - @Override public void close(final boolean delete) { + public void close(final boolean delete, final String reason) { try { Set shardIds = shardIds(); final CountDownLatch latch = new CountDownLatch(shardIds.size()); @@ -207,7 +208,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde threadPool.cached().execute(new Runnable() { @Override public void run() { try { - deleteShard(shardId, delete, !delete, delete); + deleteShard(shardId, delete, !delete, delete, reason); } catch (Exception e) { logger.warn("failed to close shard, delete [{}]", e, delete); } finally { @@ -272,15 +273,15 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde return indexShard; } - @Override public synchronized void cleanShard(int shardId) throws ElasticSearchException { - deleteShard(shardId, true, false, false); + @Override public synchronized void cleanShard(int shardId, String reason) throws ElasticSearchException { + deleteShard(shardId, true, false, false, reason); } - @Override public synchronized void removeShard(int shardId) throws ElasticSearchException { - deleteShard(shardId, false, false, false); + @Override public synchronized void removeShard(int shardId, String reason) throws ElasticSearchException { + deleteShard(shardId, false, false, false, reason); } - private void deleteShard(int shardId, boolean delete, boolean snapshotGateway, boolean deleteGateway) throws ElasticSearchException { + private void deleteShard(int shardId, boolean delete, boolean snapshotGateway, boolean deleteGateway, String reason) throws ElasticSearchException { Injector shardInjector; IndexShard indexShard; synchronized (this) { @@ -329,7 +330,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde // this logic is tricky, we want to close the engine so we rollback the changes done to it // and close the shard so no operations are allowed to it if (indexShard != null) { - indexShard.close(); + ((InternalIndexShard) indexShard).close(reason); } try { shardInjector.getInstance(Engine.class).close(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java index 314f08f79cb..3d6893a764c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java @@ -233,7 +233,7 @@ public class RecoverySource extends AbstractComponent { if (request.markAsRelocated()) { // TODO what happens if the recovery process fails afterwards, we need to mark this back to started try { - shard.relocated(); + shard.relocated("to " + request.targetNode()); } catch (IllegalIndexShardStateException e) { // we can ignore this exception since, on the other node, when it moved to phase3 // it will also send shard started, which might cause the index shard we work against diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java index ceaf01466cd..efa527dd6c6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java @@ -111,24 +111,24 @@ public class RecoveryTarget extends AbstractComponent { public void startRecovery(final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) { if (request.sourceNode() == null) { - listener.onIgnoreRecovery(false, "No node to recovery from, retry on next cluster state update"); + listener.onIgnoreRecovery(false, "No node to recover from, retry on next cluster state update"); return; } IndexService indexService = indicesService.indexService(request.shardId().index().name()); if (indexService == null) { removeAndCleanOnGoingRecovery(request.shardId()); - listener.onIgnoreRecovery(false, "index missing, stop recovery"); + listener.onIgnoreRecovery(false, "index missing locally, stop recovery"); return; } final InternalIndexShard shard = (InternalIndexShard) indexService.shard(request.shardId().id()); if (shard == null) { removeAndCleanOnGoingRecovery(request.shardId()); - listener.onIgnoreRecovery(false, "shard missing, stop recovery"); + listener.onIgnoreRecovery(false, "shard missing locally, stop recovery"); return; } if (!fromRetry) { try { - shard.recovering(); + shard.recovering("from " + request.sourceNode()); } catch (IllegalIndexShardStateException e) { // that's fine, since we might be called concurrently, just ignore this, we are already recovering listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage()); @@ -137,7 +137,7 @@ public class RecoveryTarget extends AbstractComponent { } if (shard.state() == IndexShardState.CLOSED) { removeAndCleanOnGoingRecovery(request.shardId()); - listener.onIgnoreRecovery(false, "shard closed, stop recovery"); + listener.onIgnoreRecovery(false, "local shard closed, stop recovery"); return; } threadPool.cached().execute(new Runnable() { @@ -150,7 +150,7 @@ public class RecoveryTarget extends AbstractComponent { private void doRecovery(final InternalIndexShard shard, final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) { if (shard.state() == IndexShardState.CLOSED) { removeAndCleanOnGoingRecovery(request.shardId()); - listener.onIgnoreRecovery(false, "shard closed, stop recovery"); + listener.onIgnoreRecovery(false, "local shard closed, stop recovery"); return; } @@ -172,7 +172,8 @@ public class RecoveryTarget extends AbstractComponent { } }).txGet(); if (shard.state() == IndexShardState.CLOSED) { - listener.onIgnoreRecovery(false, "shard closed, stop recovery"); + removeAndCleanOnGoingRecovery(shard.shardId()); + listener.onIgnoreRecovery(false, "local shard closed, stop recovery"); return; } stopWatch.stop(); @@ -197,7 +198,7 @@ public class RecoveryTarget extends AbstractComponent { // logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id()); if (shard.state() == IndexShardState.CLOSED) { removeAndCleanOnGoingRecovery(request.shardId()); - listener.onIgnoreRecovery(false, "shard closed, stop recovery"); + listener.onIgnoreRecovery(false, "local shard closed, stop recovery"); return; } Throwable cause = ExceptionsHelper.unwrapCause(e); @@ -215,7 +216,7 @@ public class RecoveryTarget extends AbstractComponent { // here, we would add checks against exception that need to be retried (and not removeAndClean in this case) if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) { - // no need to retry here, since we only get to try and recover when there is an existing shard on the other side + // if the target is not ready yet, retry listener.onRetryRecovery(TimeValue.timeValueMillis(500)); return; } @@ -228,12 +229,12 @@ public class RecoveryTarget extends AbstractComponent { removeAndCleanOnGoingRecovery(request.shardId()); if (cause instanceof ConnectTransportException) { - listener.onIgnoreRecovery(true, "source node disconnected"); + listener.onIgnoreRecovery(true, "source node disconnected (" + request.sourceNode() + ")"); return; } if (cause instanceof IndexShardClosedException) { - listener.onIgnoreRecovery(true, "source shard is closed"); + listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")"); return; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index 4fabf27b8c8..56223019233 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.shard.service; import org.apache.lucene.index.Term; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.component.CloseableComponent; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.ThreadSafe; import org.elasticsearch.index.engine.Engine; @@ -38,7 +37,7 @@ import javax.annotation.Nullable; * @author kimchy (shay.banon) */ @ThreadSafe -public interface IndexShard extends IndexShardComponent, CloseableComponent { +public interface IndexShard extends IndexShardComponent { ShardRouting routingEntry(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 9e0919d7070..e69ee3c76b7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -143,7 +143,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I /** * Marks the shard as recovering, fails with exception is recovering is not allowed to be set. */ - public IndexShardState recovering() throws IndexShardStartedException, + public IndexShardState recovering(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException { synchronized (mutex) { IndexShardState returnValue = state; @@ -159,24 +159,24 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I if (state == IndexShardState.RECOVERING) { throw new IndexShardRecoveringException(shardId); } - logger.debug("state: [{}]->[{}]", state, IndexShardState.RECOVERING); + logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RECOVERING, reason); state = IndexShardState.RECOVERING; return returnValue; } } - public InternalIndexShard relocated() throws IndexShardNotStartedException { + public InternalIndexShard relocated(String reason) throws IndexShardNotStartedException { synchronized (mutex) { if (state != IndexShardState.STARTED) { throw new IndexShardNotStartedException(shardId, state); } - logger.debug("state: [{}]->[{}]", state, IndexShardState.RELOCATED); + logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RELOCATED, reason); state = IndexShardState.RELOCATED; } return this; } - public InternalIndexShard start() throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { + public InternalIndexShard start(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { synchronized (mutex) { if (state == IndexShardState.CLOSED) { throw new IndexShardClosedException(shardId); @@ -192,7 +192,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } engine.start(); scheduleRefresherIfNeeded(); - logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED); + logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.STARTED, reason); state = IndexShardState.STARTED; } return this; @@ -391,7 +391,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I return engine.searcher(); } - @Override public void close() { + public void close(String reason) { synchronized (mutex) { if (state != IndexShardState.CLOSED) { if (refreshScheduledFuture != null) { @@ -399,7 +399,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I refreshScheduledFuture = null; } } - logger.debug("state: [{}]->[{}]", state, IndexShardState.CLOSED); + logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason); state = IndexShardState.CLOSED; } } @@ -435,7 +435,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I engine.flush(new Engine.Flush()); } synchronized (mutex) { - logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED); + logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.STARTED); state = IndexShardState.STARTED; } scheduleRefresherIfNeeded(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesService.java index d77c3460a6f..ac3faeb87ee 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -52,10 +52,10 @@ public interface IndicesService extends Iterable, LifecycleCompone IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticSearchException; - void deleteIndex(String index) throws ElasticSearchException; + void deleteIndex(String index, String reason) throws ElasticSearchException; /** * Cleans the index without actually deleting any content for it. */ - void cleanIndex(String index) throws ElasticSearchException; + void cleanIndex(String index, String reason) throws ElasticSearchException; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index ac091122d58..a5d8c9bc3f1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.gateway.IndexGatewayModule; import org.elasticsearch.index.mapper.MapperServiceModule; import org.elasticsearch.index.query.IndexQueryParserModule; import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.service.InternalIndexService; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; @@ -128,7 +129,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent