_nodes/stats should not fail due to concurrent AlreadyClosedException (#25016)
This catches `AlreadyClosedException` during `stats` calls to avoid failing a `_nodes/stats` request because of the ignorable, concurrent index closure.
This commit is contained in:
parent
5a4a47332c
commit
f2eeceb10d
|
@ -22,6 +22,7 @@ package org.elasticsearch.indices;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
|
@ -292,35 +293,49 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
}
|
||||
}
|
||||
|
||||
Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
|
||||
for (IndexService indexService : this) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
return new NodeIndicesStats(oldStats, statsByShard(this, flags));
|
||||
}
|
||||
|
||||
Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) {
|
||||
final Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
|
||||
|
||||
for (final IndexService indexService : indicesService) {
|
||||
for (final IndexShard indexShard : indexService) {
|
||||
try {
|
||||
if (indexShard.routingEntry() == null) {
|
||||
final IndexShardStats indexShardStats = indicesService.indexShardStats(indicesService, indexShard, flags);
|
||||
|
||||
if (indexShardStats == null) {
|
||||
continue;
|
||||
}
|
||||
IndexShardStats indexShardStats =
|
||||
new IndexShardStats(indexShard.shardId(),
|
||||
new ShardStats[]{
|
||||
new ShardStats(
|
||||
indexShard.routingEntry(),
|
||||
indexShard.shardPath(),
|
||||
new CommonStats(indicesQueryCache, indexShard, flags),
|
||||
indexShard.commitStats(),
|
||||
indexShard.seqNoStats())});
|
||||
|
||||
if (!statsByShard.containsKey(indexService.index())) {
|
||||
if (statsByShard.containsKey(indexService.index()) == false) {
|
||||
statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats));
|
||||
} else {
|
||||
statsByShard.get(indexService.index()).add(indexShardStats);
|
||||
}
|
||||
} catch (IllegalIndexShardStateException e) {
|
||||
} catch (IllegalIndexShardStateException | AlreadyClosedException e) {
|
||||
// we can safely ignore illegal state on ones that are closing for example
|
||||
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} ignoring shard stats", indexShard.shardId()), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return new NodeIndicesStats(oldStats, statsByShard);
|
||||
|
||||
return statsByShard;
|
||||
}
|
||||
|
||||
IndexShardStats indexShardStats(final IndicesService indicesService, final IndexShard indexShard, final CommonStatsFlags flags) {
|
||||
if (indexShard.routingEntry() == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new IndexShardStats(indexShard.shardId(),
|
||||
new ShardStats[] {
|
||||
new ShardStats(indexShard.routingEntry(),
|
||||
indexShard.shardPath(),
|
||||
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags),
|
||||
indexShard.commitStats(),
|
||||
indexShard.seqNoStats())
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,7 +18,10 @@
|
|||
*/
|
||||
package org.elasticsearch.indices;
|
||||
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexGraveyard;
|
||||
|
@ -41,6 +44,9 @@ import org.elasticsearch.index.IndexSettings;
|
|||
import org.elasticsearch.index.mapper.KeywordFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Mapper;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardPath;
|
||||
import org.elasticsearch.index.similarity.BM25SimilarityProvider;
|
||||
|
@ -55,6 +61,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -66,6 +73,8 @@ import static org.hamcrest.Matchers.containsString;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class IndicesServiceTests extends ESSingleNodeTestCase {
|
||||
|
||||
|
@ -369,4 +378,57 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
|||
assertThat(mapperService.documentMapperParser().parserContext("type").getSimilarity("test"),
|
||||
instanceOf(BM25SimilarityProvider.class));
|
||||
}
|
||||
|
||||
public void testStatsByShardDoesNotDieFromExpectedExceptions() {
|
||||
final int shardCount = randomIntBetween(2, 5);
|
||||
final int failedShardId = randomIntBetween(0, shardCount - 1);
|
||||
|
||||
final Index index = new Index("test-index", "abc123");
|
||||
// the shard that is going to fail
|
||||
final ShardId shardId = new ShardId(index, failedShardId);
|
||||
|
||||
final List<IndexShard> shards = new ArrayList<>(shardCount);
|
||||
final List<IndexShardStats> shardStats = new ArrayList<>(shardCount - 1);
|
||||
|
||||
final IndexShardState state = randomFrom(IndexShardState.values());
|
||||
final String message = "TEST - expected";
|
||||
|
||||
final RuntimeException expectedException =
|
||||
randomFrom(new IllegalIndexShardStateException(shardId, state, message), new AlreadyClosedException(message));
|
||||
|
||||
// this allows us to control the indices that exist
|
||||
final IndicesService mockIndicesService = mock(IndicesService.class);
|
||||
final IndexService indexService = mock(IndexService.class);
|
||||
|
||||
// generate fake shards and their responses
|
||||
for (int i = 0; i < shardCount; ++i) {
|
||||
final IndexShard shard = mock(IndexShard.class);
|
||||
|
||||
shards.add(shard);
|
||||
|
||||
if (failedShardId != i) {
|
||||
final IndexShardStats successfulShardStats = mock(IndexShardStats.class);
|
||||
|
||||
shardStats.add(successfulShardStats);
|
||||
|
||||
when(mockIndicesService.indexShardStats(mockIndicesService, shard, CommonStatsFlags.ALL)).thenReturn(successfulShardStats);
|
||||
} else {
|
||||
when(mockIndicesService.indexShardStats(mockIndicesService, shard, CommonStatsFlags.ALL)).thenThrow(expectedException);
|
||||
}
|
||||
}
|
||||
|
||||
when(mockIndicesService.iterator()).thenReturn(Collections.singleton(indexService).iterator());
|
||||
when(indexService.iterator()).thenReturn(shards.iterator());
|
||||
when(indexService.index()).thenReturn(index);
|
||||
|
||||
// real one, which has a logger defined
|
||||
final IndicesService indicesService = getIndicesService();
|
||||
|
||||
final Map<Index, List<IndexShardStats>> indexStats = indicesService.statsByShard(mockIndicesService, CommonStatsFlags.ALL);
|
||||
|
||||
assertThat(indexStats.isEmpty(), equalTo(false));
|
||||
assertThat("index not defined", indexStats.containsKey(index), equalTo(true));
|
||||
assertThat("unexpected shard stats", indexStats.get(index), equalTo(shardStats));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue