We were not closing repositories on Node shutdown. In production, this has little effect but in tests shutting down a node using `MockRepository` and is currently stuck in a simulated blocked-IO situation will only unblock when the node's threadpool is interrupted. This might in some edge cases (many snapshot threads and some CI slowness) result in the execution taking longer than 5s to release all the shard stores and thus we fail the assertion about unreleased shard stores in the internal test cluster. Regardless of tests, I think we should close repositories and release resources associated with them when closing a node and not just when removing a repository from the CS with running nodes as this behavior is really unexpected. Fixes #47689
This commit is contained in:
parent
f1bc3a0753
commit
0ca7cc1848
|
@ -668,6 +668,7 @@ public class Node implements Closeable {
|
|||
injector.getInstance(IndicesClusterStateService.class).start();
|
||||
injector.getInstance(SnapshotsService.class).start();
|
||||
injector.getInstance(SnapshotShardsService.class).start();
|
||||
injector.getInstance(RepositoriesService.class).start();
|
||||
injector.getInstance(SearchService.class).start();
|
||||
nodeService.getMonitorService().start();
|
||||
|
||||
|
@ -780,6 +781,7 @@ public class Node implements Closeable {
|
|||
|
||||
injector.getInstance(SnapshotsService.class).stop();
|
||||
injector.getInstance(SnapshotShardsService.class).stop();
|
||||
injector.getInstance(RepositoriesService.class).stop();
|
||||
// stop any changes happening as a result of cluster state changes
|
||||
injector.getInstance(IndicesClusterStateService.class).stop();
|
||||
// close discovery early to not react to pings anymore.
|
||||
|
|
|
@ -37,15 +37,19 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.snapshots.RestoreService;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -54,7 +58,7 @@ import java.util.Map;
|
|||
/**
|
||||
* Service responsible for maintaining and providing access to snapshot repositories on nodes.
|
||||
*/
|
||||
public class RepositoriesService implements ClusterStateApplier {
|
||||
public class RepositoriesService extends AbstractLifecycleComponent implements ClusterStateApplier {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(RepositoriesService.class);
|
||||
|
||||
|
@ -95,6 +99,8 @@ public class RepositoriesService implements ClusterStateApplier {
|
|||
* @param listener register repository listener
|
||||
*/
|
||||
public void registerRepository(final PutRepositoryRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
|
||||
assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]";
|
||||
|
||||
final RepositoryMetaData newRepositoryMetaData = new RepositoryMetaData(request.name(), request.type(), request.settings());
|
||||
validate(request.name());
|
||||
|
||||
|
@ -435,4 +441,23 @@ public class RepositoriesService implements ClusterStateApplier {
|
|||
throw new IllegalStateException("trying to modify or unregister repository that is currently used ");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws IOException {
|
||||
clusterService.removeApplier(this);
|
||||
final Collection<Repository> repos = new ArrayList<>();
|
||||
repos.addAll(internalRepositories.values());
|
||||
repos.addAll(repositories.values());
|
||||
IOUtils.close(repos);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ public class RepositoriesServiceTests extends ESTestCase {
|
|||
Collections.emptySet());
|
||||
repositoriesService = new RepositoriesService(Settings.EMPTY, mock(ClusterService.class),
|
||||
transportService, Collections.emptyMap(), Collections.singletonMap(TestRepository.TYPE, TestRepository::new), threadPool);
|
||||
repositoriesService.start();
|
||||
}
|
||||
|
||||
public void testRegisterInternalRepository() {
|
||||
|
|
|
@ -1186,6 +1186,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
transportService.acceptIncomingRequests();
|
||||
snapshotsService.start();
|
||||
snapshotShardsService.start();
|
||||
repositoriesService.start();
|
||||
final CoordinationState.PersistedState persistedState =
|
||||
new InMemoryPersistedState(initialState.term(), stateForNode(initialState, node));
|
||||
coordinator = new Coordinator(node.getName(), clusterService.getSettings(),
|
||||
|
|
|
@ -127,7 +127,6 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
|
|||
return settings.build();
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47689")
|
||||
public void testSnapshotInProgress() throws Exception {
|
||||
final String indexName = "test";
|
||||
final String policyName = "test-policy";
|
||||
|
|
Loading…
Reference in New Issue