IndexShard should not return null stats (#31528)

IndexShard should not return null stats - empty stats or AlreadyCloseException if it's closed is better
This commit is contained in:
Vladimir Dolzhenko 2018-06-22 21:08:11 +02:00 committed by GitHub
parent 7313a987f4
commit f04c579203
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 148 additions and 69 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.stats;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -36,6 +37,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeService;
@ -96,13 +99,23 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
// only report on fully started shards
CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}
shardsStats.add(
new ShardStats(
indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
indexShard.commitStats(),
indexShard.seqNoStats()));
commitStats,
seqNoStats));
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.stats;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -167,57 +168,61 @@ public class CommonStats implements Writeable, ToXContentFragment {
public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) {
CommonStatsFlags.Flag[] setFlags = flags.getFlags();
for (CommonStatsFlags.Flag flag : setFlags) {
switch (flag) {
case Docs:
docs = indexShard.docStats();
break;
case Store:
store = indexShard.storeStats();
break;
case Indexing:
indexing = indexShard.indexingStats(flags.types());
break;
case Get:
get = indexShard.getStats();
break;
case Search:
search = indexShard.searchStats(flags.groups());
break;
case Merge:
merge = indexShard.mergeStats();
break;
case Refresh:
refresh = indexShard.refreshStats();
break;
case Flush:
flush = indexShard.flushStats();
break;
case Warmer:
warmer = indexShard.warmerStats();
break;
case QueryCache:
queryCache = indicesQueryCache.getStats(indexShard.shardId());
break;
case FieldData:
fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
break;
case Completion:
completion = indexShard.completionStats(flags.completionDataFields());
break;
case Segments:
segments = indexShard.segmentStats(flags.includeSegmentFileSizes());
break;
case Translog:
translog = indexShard.translogStats();
break;
case RequestCache:
requestCache = indexShard.requestCache().stats();
break;
case Recovery:
recoveryStats = indexShard.recoveryStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
try {
switch (flag) {
case Docs:
docs = indexShard.docStats();
break;
case Store:
store = indexShard.storeStats();
break;
case Indexing:
indexing = indexShard.indexingStats(flags.types());
break;
case Get:
get = indexShard.getStats();
break;
case Search:
search = indexShard.searchStats(flags.groups());
break;
case Merge:
merge = indexShard.mergeStats();
break;
case Refresh:
refresh = indexShard.refreshStats();
break;
case Flush:
flush = indexShard.flushStats();
break;
case Warmer:
warmer = indexShard.warmerStats();
break;
case QueryCache:
queryCache = indicesQueryCache.getStats(indexShard.shardId());
break;
case FieldData:
fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
break;
case Completion:
completion = indexShard.completionStats(flags.completionDataFields());
break;
case Segments:
segments = indexShard.segmentStats(flags.includeSegmentFileSizes());
break;
case Translog:
translog = indexShard.translogStats();
break;
case RequestCache:
requestCache = indexShard.requestCache().stats();
break;
case Recovery:
recoveryStats = indexShard.recoveryStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
}
}
}

View File

@ -70,6 +70,7 @@ public class ShardStats implements Streamable, Writeable, ToXContentFragment {
return this.commonStats;
}
@Nullable
public CommitStats getCommitStats() {
return this.commitStats;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.stats;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
@ -33,6 +34,8 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
@ -100,7 +103,17 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
}
CommonStats commonStats = new CommonStats(indicesService.getIndicesQueryCache(), indexShard, request.flags());
CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), commonStats,
indexShard.commitStats(), indexShard.seqNoStats());
commitStats, seqNoStats);
}
}

View File

@ -868,21 +868,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
* @return {@link CommitStats} if engine is open, otherwise null
* @return {@link CommitStats}
* @throws AlreadyClosedException if shard is closed
*/
@Nullable
public CommitStats commitStats() {
Engine engine = getEngineOrNull();
return engine == null ? null : engine.commitStats();
return getEngine().commitStats();
}
/**
* @return {@link SeqNoStats} if engine is open, otherwise null
* @return {@link SeqNoStats}
* @throws AlreadyClosedException if shard is closed
*/
@Nullable
public SeqNoStats seqNoStats() {
Engine engine = getEngineOrNull();
return engine == null ? null : engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint());
return getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
}
public IndexingStats indexingStats(String... types) {
@ -912,8 +910,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return store.stats();
} catch (IOException e) {
throw new ElasticsearchException("io exception while building 'store stats'", e);
} catch (AlreadyClosedException ex) {
return null; // already closed
}
}

View File

@ -79,6 +79,7 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
@ -91,6 +92,7 @@ import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@ -333,13 +335,24 @@ public class IndicesService extends AbstractLifecycleComponent
return null;
}
CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}
return new IndexShardStats(indexShard.shardId(),
new ShardStats[] {
new ShardStats(indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags),
indexShard.commitStats(),
indexShard.seqNoStats())
commitStats,
seqNoStats)
});
}

View File

@ -73,6 +73,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineTestCase;
@ -88,6 +89,7 @@ import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
@ -3082,4 +3084,36 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(primary);
}
public void testOnCloseStats() throws IOException {
final IndexShard indexShard = newStartedShard(true);
for (int i = 0; i < 3; i++) {
indexDoc(indexShard, "_doc", "" + i, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}");
indexShard.refresh("test"); // produce segments
}
// check stats on closed and on opened shard
if (randomBoolean()) {
closeShards(indexShard);
expectThrows(AlreadyClosedException.class, () -> indexShard.seqNoStats());
expectThrows(AlreadyClosedException.class, () -> indexShard.commitStats());
expectThrows(AlreadyClosedException.class, () -> indexShard.storeStats());
} else {
final SeqNoStats seqNoStats = indexShard.seqNoStats();
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(2L));
final CommitStats commitStats = indexShard.commitStats();
assertThat(commitStats.getGeneration(), equalTo(2L));
final StoreStats storeStats = indexShard.storeStats();
assertThat(storeStats.sizeInBytes(), greaterThan(0L));
closeShards(indexShard);
}
}
}

View File

@ -1111,17 +1111,21 @@ public final class InternalTestCluster extends TestCluster {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
CommitStats commitStats = indexShard.commitStats();
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
try {
CommitStats commitStats = indexShard.commitStats();
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
if (syncId != null) {
long liveDocsOnShard = commitStats.getNumDocs();
if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
assertThat("sync id is equal but number of docs does not match on node "
+ nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got "
+ liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
} else {
docsOnShards.put(syncId, liveDocsOnShard);
}
}
} catch (AlreadyClosedException e) {
// the engine is closed or if the shard is recovering
}
}
}