IndexShard should not return null stats (#31528)

IndexShard should not return null stats - empty stats or AlreadyCloseException if it's closed is better
This commit is contained in:
Vladimir Dolzhenko 2018-06-22 21:08:11 +02:00 committed by Colin Goodheart-Smithe
parent 1d11fdaad8
commit 2ba50dbf50
No known key found for this signature in database
GPG Key ID: F975E7BDD739B3C7
8 changed files with 148 additions and 69 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.stats; package org.elasticsearch.action.admin.cluster.stats;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -36,6 +37,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeService; import org.elasticsearch.node.NodeService;
@ -96,13 +99,23 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
for (IndexShard indexShard : indexService) { for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) { if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
// only report on fully started shards // only report on fully started shards
CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}
shardsStats.add( shardsStats.add(
new ShardStats( new ShardStats(
indexShard.routingEntry(), indexShard.routingEntry(),
indexShard.shardPath(), indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS), new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
indexShard.commitStats(), commitStats,
indexShard.seqNoStats())); seqNoStats));
} }
} }
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.stats; package org.elasticsearch.action.admin.indices.stats;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -167,6 +168,7 @@ public class CommonStats implements Writeable, ToXContentFragment {
public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) { public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) {
CommonStatsFlags.Flag[] setFlags = flags.getFlags(); CommonStatsFlags.Flag[] setFlags = flags.getFlags();
for (CommonStatsFlags.Flag flag : setFlags) { for (CommonStatsFlags.Flag flag : setFlags) {
try {
switch (flag) { switch (flag) {
case Docs: case Docs:
docs = indexShard.docStats(); docs = indexShard.docStats();
@ -219,6 +221,9 @@ public class CommonStats implements Writeable, ToXContentFragment {
default: default:
throw new IllegalStateException("Unknown Flag: " + flag); throw new IllegalStateException("Unknown Flag: " + flag);
} }
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
}
} }
} }

View File

@ -70,6 +70,7 @@ public class ShardStats implements Streamable, Writeable, ToXContentFragment {
return this.commonStats; return this.commonStats;
} }
@Nullable
public CommitStats getCommitStats() { public CommitStats getCommitStats() {
return this.commitStats; return this.commitStats;
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.stats; package org.elasticsearch.action.admin.indices.stats;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
@ -33,6 +34,8 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -100,7 +103,17 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
} }
CommonStats commonStats = new CommonStats(indicesService.getIndicesQueryCache(), indexShard, request.flags()); CommonStats commonStats = new CommonStats(indicesService.getIndicesQueryCache(), indexShard, request.flags());
CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), commonStats, return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), commonStats,
indexShard.commitStats(), indexShard.seqNoStats()); commitStats, seqNoStats);
} }
} }

View File

@ -868,21 +868,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} }
/** /**
* @return {@link CommitStats} if engine is open, otherwise null * @return {@link CommitStats}
* @throws AlreadyClosedException if shard is closed
*/ */
@Nullable
public CommitStats commitStats() { public CommitStats commitStats() {
Engine engine = getEngineOrNull(); return getEngine().commitStats();
return engine == null ? null : engine.commitStats();
} }
/** /**
* @return {@link SeqNoStats} if engine is open, otherwise null * @return {@link SeqNoStats}
* @throws AlreadyClosedException if shard is closed
*/ */
@Nullable
public SeqNoStats seqNoStats() { public SeqNoStats seqNoStats() {
Engine engine = getEngineOrNull(); return getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
return engine == null ? null : engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint());
} }
public IndexingStats indexingStats(String... types) { public IndexingStats indexingStats(String... types) {
@ -912,8 +910,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return store.stats(); return store.stats();
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchException("io exception while building 'store stats'", e); throw new ElasticsearchException("io exception while building 'store stats'", e);
} catch (AlreadyClosedException ex) {
return null; // already closed
} }
} }

View File

@ -79,6 +79,7 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.request.ShardRequestCache; import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexFieldDataCache;
@ -91,6 +92,7 @@ import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
@ -333,13 +335,24 @@ public class IndicesService extends AbstractLifecycleComponent
return null; return null;
} }
CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}
return new IndexShardStats(indexShard.shardId(), return new IndexShardStats(indexShard.shardId(),
new ShardStats[] { new ShardStats[] {
new ShardStats(indexShard.routingEntry(), new ShardStats(indexShard.routingEntry(),
indexShard.shardPath(), indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags), new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags),
indexShard.commitStats(), commitStats,
indexShard.seqNoStats()) seqNoStats)
}); });
} }

View File

@ -73,6 +73,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.EngineTestCase;
@ -88,6 +89,7 @@ import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
@ -3082,4 +3084,36 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(primary); closeShards(primary);
} }
public void testOnCloseStats() throws IOException {
final IndexShard indexShard = newStartedShard(true);
for (int i = 0; i < 3; i++) {
indexDoc(indexShard, "_doc", "" + i, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}");
indexShard.refresh("test"); // produce segments
}
// check stats on closed and on opened shard
if (randomBoolean()) {
closeShards(indexShard);
expectThrows(AlreadyClosedException.class, () -> indexShard.seqNoStats());
expectThrows(AlreadyClosedException.class, () -> indexShard.commitStats());
expectThrows(AlreadyClosedException.class, () -> indexShard.storeStats());
} else {
final SeqNoStats seqNoStats = indexShard.seqNoStats();
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(2L));
final CommitStats commitStats = indexShard.commitStats();
assertThat(commitStats.getGeneration(), equalTo(2L));
final StoreStats storeStats = indexShard.storeStats();
assertThat(storeStats.sizeInBytes(), greaterThan(0L));
closeShards(indexShard);
}
}
} }

View File

@ -1111,17 +1111,21 @@ public final class InternalTestCluster extends TestCluster {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) { for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) { for (IndexShard indexShard : indexService) {
try {
CommitStats commitStats = indexShard.commitStats(); CommitStats commitStats = indexShard.commitStats();
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID); String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
if (syncId != null) { if (syncId != null) {
long liveDocsOnShard = commitStats.getNumDocs(); long liveDocsOnShard = commitStats.getNumDocs();
if (docsOnShards.get(syncId) != null) { if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard)); assertThat("sync id is equal but number of docs does not match on node "
+ nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got "
+ liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
} else { } else {
docsOnShards.put(syncId, liveDocsOnShard); docsOnShards.put(syncId, liveDocsOnShard);
} }
} }
} catch (AlreadyClosedException e) {
// the engine is closed or if the shard is recovering
} }
} }
} }