BlobStoreCacheService implements ClusterStateListener in order to maintain a ready flag that can be used to know when the snapshot blob cache should be queries or not. Now the getAsync() method correctly handles the various exceptions that can be thrown when the .snapshot-blob-cache index is not available(in isExpectedCacheGetException()) and logs as DEBUG we can safely remove the ready flag.
This commit is contained in:
parent
8fdd3d158b
commit
92eb6e7844
|
@ -23,13 +23,9 @@ import org.elasticsearch.action.support.PlainActionFuture;
|
|||
import org.elasticsearch.action.support.TransportActions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
|
@ -41,13 +37,12 @@ import org.elasticsearch.transport.ConnectTransportException;
|
|||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN;
|
||||
|
||||
public class BlobStoreCacheService extends AbstractLifecycleComponent implements ClusterStateListener {
|
||||
public class BlobStoreCacheService {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class);
|
||||
|
||||
|
@ -55,46 +50,16 @@ public class BlobStoreCacheService extends AbstractLifecycleComponent implements
|
|||
|
||||
private final ClusterService clusterService;
|
||||
private final ThreadPool threadPool;
|
||||
private final AtomicBoolean ready;
|
||||
private final Client client;
|
||||
private final String index;
|
||||
|
||||
public BlobStoreCacheService(ClusterService clusterService, ThreadPool threadPool, Client client, String index) {
|
||||
this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN);
|
||||
this.ready = new AtomicBoolean(false);
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
clusterService.removeListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (lifecycle.started() == false || event.routingTableChanged() == false) {
|
||||
return;
|
||||
}
|
||||
if (event.indexRoutingTableChanged(index)) {
|
||||
final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(index);
|
||||
if (indexRoutingTable == null) {
|
||||
ready.set(false);
|
||||
return;
|
||||
}
|
||||
ready.set(indexRoutingTable.allPrimaryShardsActive());
|
||||
}
|
||||
}
|
||||
|
||||
private void createIndexIfNecessary(ActionListener<String> listener) {
|
||||
if (clusterService.state().routingTable().hasIndex(index)) {
|
||||
listener.onResponse(index);
|
||||
|
@ -252,13 +217,6 @@ public class BlobStoreCacheService extends AbstractLifecycleComponent implements
|
|||
}
|
||||
|
||||
protected void getAsync(String repository, String name, String path, long offset, ActionListener<CachedBlob> listener) {
|
||||
if ((lifecycle.started() && ready.get()) == false) {
|
||||
// TODO TBD can we just execute the GET request and let it fail if the index isn't ready yet?
|
||||
// We might get lucky and hit a started shard anyway.
|
||||
logger.debug("not ready : [{}]", CachedBlob.generateId(repository, name, path, offset));
|
||||
listener.onResponse(CachedBlob.CACHE_NOT_READY);
|
||||
return;
|
||||
}
|
||||
final GetRequest request = new GetRequest(index).id(CachedBlob.generateId(repository, name, path, offset));
|
||||
client.get(request, new ActionListener<GetResponse>() {
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue