From 92eb6e7844b410660df706477deaf47c39f753e5 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 1 Sep 2020 11:12:52 +0200 Subject: [PATCH] Remove cluster state listener in BlobStoreCacheService (#61726) (#61769) BlobStoreCacheService implements ClusterStateListener in order to maintain a ready flag that can be used to know when the snapshot blob cache should be queries or not. Now the getAsync() method correctly handles the various exceptions that can be thrown when the .snapshot-blob-cache index is not available(in isExpectedCacheGetException()) and logs as DEBUG we can safely remove the ready flag. --- .../cache/BlobStoreCacheService.java | 44 +------------------ 1 file changed, 1 insertion(+), 43 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java index ab7c21ecdb2..24cb658850b 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java @@ -23,13 +23,9 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.xcontent.ToXContent; @@ -41,13 +37,12 @@ import org.elasticsearch.transport.ConnectTransportException; import java.io.IOException; import java.time.Instant; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; -public class BlobStoreCacheService extends AbstractLifecycleComponent implements ClusterStateListener { +public class BlobStoreCacheService { private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class); @@ -55,46 +50,16 @@ public class BlobStoreCacheService extends AbstractLifecycleComponent implements private final ClusterService clusterService; private final ThreadPool threadPool; - private final AtomicBoolean ready; private final Client client; private final String index; public BlobStoreCacheService(ClusterService clusterService, ThreadPool threadPool, Client client, String index) { this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN); - this.ready = new AtomicBoolean(false); this.clusterService = clusterService; this.threadPool = threadPool; this.index = index; } - @Override - protected void doStart() { - clusterService.addListener(this); - } - - @Override - protected void doStop() { - clusterService.removeListener(this); - } - - @Override - protected void doClose() {} - - @Override - public void clusterChanged(ClusterChangedEvent event) { - if (lifecycle.started() == false || event.routingTableChanged() == false) { - return; - } - if (event.indexRoutingTableChanged(index)) { - final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(index); - if (indexRoutingTable == null) { - ready.set(false); - return; - } - ready.set(indexRoutingTable.allPrimaryShardsActive()); - } - } - private void createIndexIfNecessary(ActionListener listener) { if (clusterService.state().routingTable().hasIndex(index)) { listener.onResponse(index); @@ -252,13 +217,6 @@ public class BlobStoreCacheService extends AbstractLifecycleComponent implements } protected void getAsync(String repository, String name, String path, long offset, ActionListener listener) { - if ((lifecycle.started() && ready.get()) == false) { - // TODO TBD can we just execute the GET request and let it fail if the index isn't ready yet? - // We might get lucky and hit a started shard anyway. - logger.debug("not ready : [{}]", CachedBlob.generateId(repository, name, path, offset)); - listener.onResponse(CachedBlob.CACHE_NOT_READY); - return; - } final GetRequest request = new GetRequest(index).id(CachedBlob.generateId(repository, name, path, offset)); client.get(request, new ActionListener() { @Override