Add CcrRestoreSourceService to track sessions (#36578)

This commit is related to #36127. It adds a CcrRestoreSourceService to
track Engine.IndexCommitRef need for in-process file restores. When a
follower starts restoring a shard through the CcrRepository it opens a
session with the leader through the PutCcrRestoreSessionAction. The
leader responds to the request by telling the follower what files it
needs to fetch for a restore. This is not yet implemented.

Once, the restore is complete, the follower closes the session with the
DeleteCcrRestoreSessionAction action.
This commit is contained in:
Tim Brooks 2018-12-18 11:23:13 -07:00 committed by GitHub
parent 7de85f55e3
commit 1fa105658e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 787 additions and 4 deletions

View File

@ -34,7 +34,7 @@ public class Iterables {
public static <T> Iterable<T> concat(Iterable<T>... inputs) {
Objects.requireNonNull(inputs);
return new ConcatenatedIterable(inputs);
return new ConcatenatedIterable<>(inputs);
}
static class ConcatenatedIterable<T> implements Iterable<T> {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ccr;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
@ -23,6 +24,7 @@ import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.license.XPackLicenseState;
@ -57,10 +59,13 @@ import org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction;
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
@ -110,6 +115,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
private final boolean enabled;
private final Settings settings;
private final CcrLicenseChecker ccrLicenseChecker;
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
private Client client;
/**
@ -150,8 +156,11 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
return emptyList();
}
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(settings);
this.restoreSourceService.set(restoreSourceService);
return Arrays.asList(
ccrLicenseChecker,
restoreSourceService,
new CcrRepositoryManager(settings, clusterService, client),
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
);
@ -179,6 +188,10 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
PutInternalCcrRepositoryAction.TransportPutInternalRepositoryAction.class),
new ActionHandler<>(DeleteInternalCcrRepositoryAction.INSTANCE,
DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class),
new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE,
PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class),
new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE,
ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class),
// stats action
new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
@ -278,6 +291,11 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
}
@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.addIndexEventListener(this.restoreSourceService.get());
}
protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }
}

View File

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action.repositories;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
import java.io.IOException;
import java.util.List;
public class ClearCcrRestoreSessionAction extends Action<ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse> {
public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction();
private static final String NAME = "internal:admin/ccr/restore/session/clear";
private ClearCcrRestoreSessionAction() {
super(NAME);
}
@Override
public ClearCcrRestoreSessionResponse newResponse() {
return new ClearCcrRestoreSessionResponse();
}
public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction<ClearCcrRestoreSessionRequest,
ClearCcrRestoreSessionResponse, ClearCcrRestoreSessionRequest.Request, Response> {
private final CcrRestoreSourceService ccrRestoreService;
@Inject
public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters,
TransportService transportService, CcrRestoreSourceService ccrRestoreService) {
super(NAME, threadPool, clusterService, transportService, actionFilters, ClearCcrRestoreSessionRequest::new,
ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class);
this.ccrRestoreService = ccrRestoreService;
}
@Override
protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List<Response> responses,
List<FailedNodeException> failures) {
return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), responses, failures);
}
@Override
protected ClearCcrRestoreSessionRequest.Request newNodeRequest(String nodeId, ClearCcrRestoreSessionRequest request) {
return request.getRequest();
}
@Override
protected Response newNodeResponse() {
return new Response();
}
@Override
protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) {
ccrRestoreService.closeSession(request.getSessionUUID());
return new Response(clusterService.localNode());
}
}
public static class Response extends BaseNodeResponse {
private Response() {
}
private Response(StreamInput in) throws IOException {
readFrom(in);
}
private Response(DiscoveryNode node) {
super(node);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
}
public static class ClearCcrRestoreSessionResponse extends BaseNodesResponse<Response> {
ClearCcrRestoreSessionResponse() {
}
ClearCcrRestoreSessionResponse(ClusterName clusterName, List<Response> chunkResponses, List<FailedNodeException> failures) {
super(clusterName, chunkResponses, failures);
}
@Override
protected List<Response> readNodesFrom(StreamInput in) throws IOException {
return in.readList(Response::new);
}
@Override
protected void writeNodesTo(StreamOutput out, List<Response> nodes) throws IOException {
out.writeList(nodes);
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action.repositories;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class ClearCcrRestoreSessionRequest extends BaseNodesRequest<ClearCcrRestoreSessionRequest> {
private Request request;
ClearCcrRestoreSessionRequest() {
}
public ClearCcrRestoreSessionRequest(String nodeId, Request request) {
super(nodeId);
this.request = request;
}
@Override
public void readFrom(StreamInput streamInput) throws IOException {
super.readFrom(streamInput);
request = new Request();
request.readFrom(streamInput);
}
@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
super.writeTo(streamOutput);
request.writeTo(streamOutput);
}
public Request getRequest() {
return request;
}
public static class Request extends BaseNodeRequest {
private String sessionUUID;
Request() {
}
public Request(String nodeId, String sessionUUID) {
super(nodeId);
this.sessionUUID = sessionUUID;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sessionUUID = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sessionUUID);
}
public String getSessionUUID() {
return sessionUUID;
}
}
}

View File

@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action.repositories;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
import java.io.IOException;
public class PutCcrRestoreSessionAction extends Action<PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse> {
public static final PutCcrRestoreSessionAction INSTANCE = new PutCcrRestoreSessionAction();
private static final String NAME = "internal:admin/ccr/restore/session/put";
private PutCcrRestoreSessionAction() {
super(NAME);
}
@Override
public PutCcrRestoreSessionResponse newResponse() {
return new PutCcrRestoreSessionResponse();
}
@Override
public Writeable.Reader<PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse> getResponseReader() {
return PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse::new;
}
public static class TransportPutCcrRestoreSessionAction
extends TransportSingleShardAction<PutCcrRestoreSessionRequest, PutCcrRestoreSessionResponse> {
private final IndicesService indicesService;
private final CcrRestoreSourceService ccrRestoreService;
@Inject
public TransportPutCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver resolver, TransportService transportService,
IndicesService indicesService, CcrRestoreSourceService ccrRestoreService) {
super(NAME, threadPool, clusterService, transportService, actionFilters, resolver, PutCcrRestoreSessionRequest::new,
ThreadPool.Names.GENERIC);
this.indicesService = indicesService;
this.ccrRestoreService = ccrRestoreService;
}
@Override
protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionRequest request, ShardId shardId) throws IOException {
IndexShard indexShard = indicesService.getShardOrNull(shardId);
if (indexShard == null) {
throw new ShardNotFoundException(shardId);
}
ccrRestoreService.openSession(request.getSessionUUID(), indexShard);
return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId());
}
@Override
protected PutCcrRestoreSessionResponse newResponse() {
return new PutCcrRestoreSessionResponse();
}
@Override
protected boolean resolveIndex(PutCcrRestoreSessionRequest request) {
return false;
}
@Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
final ShardId shardId = request.request().getShardId();
return state.routingTable().shardRoutingTable(shardId).primaryShardIt();
}
}
public static class PutCcrRestoreSessionResponse extends ActionResponse {
private String nodeId;
PutCcrRestoreSessionResponse() {
}
PutCcrRestoreSessionResponse(String nodeId) {
this.nodeId = nodeId;
}
PutCcrRestoreSessionResponse(StreamInput in) throws IOException {
super(in);
nodeId = in.readString();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(nodeId);
}
public String getNodeId() {
return nodeId;
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action.repositories;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
public class PutCcrRestoreSessionRequest extends SingleShardRequest<PutCcrRestoreSessionRequest> {
private String sessionUUID;
private ShardId shardId;
private Store.MetadataSnapshot metaData;
PutCcrRestoreSessionRequest() {
}
public PutCcrRestoreSessionRequest(String sessionUUID, ShardId shardId, Store.MetadataSnapshot metaData) {
super(shardId.getIndexName());
this.sessionUUID = sessionUUID;
this.shardId = shardId;
this.metaData = metaData;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sessionUUID = in.readString();
shardId = ShardId.readShardId(in);
metaData = new Store.MetadataSnapshot(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sessionUUID);
shardId.writeTo(out);
metaData.writeTo(out);
}
public String getSessionUUID() {
return sessionUUID;
}
public ShardId getShardId() {
return shardId;
}
public Store.MetadataSnapshot getMetaData() {
return metaData;
}
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
@ -36,6 +37,10 @@ import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionRequest;
import java.io.IOException;
import java.util.ArrayList;
@ -81,7 +86,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
}
@Override
protected void doClose() throws IOException {
protected void doClose() {
}
@ -227,19 +232,49 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
@Override
public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId,
RecoveryState recoveryState) {
// TODO: Add timeouts to network calls / the restore process.
final Store store = indexShard.store();
store.incRef();
try {
store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion);
} catch (EngineException | IOException e) {
throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e);
throw new IndexShardRecoveryException(shardId, "failed to create empty store", e);
} finally {
store.decRef();
}
Store.MetadataSnapshot recoveryMetadata;
try {
recoveryMetadata = indexShard.snapshotStoreMetadata();
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "failed access store metadata", e);
}
Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId());
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet();
String nodeId = response.getNodeId();
// TODO: Implement file restore
closeSession(remoteClient, nodeId, sessionUUID);
}
@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}
private void closeSession(Client remoteClient, String nodeId, String sessionUUID) {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId,
new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID));
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet();
if (response.hasFailures()) {
throw response.failures().get(0);
}
}
}

View File

@ -0,0 +1,175 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.repository;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener {
private static final Logger logger = LogManager.getLogger(CcrRestoreSourceService.class);
private final Map<String, RestoreContext> onGoingRestores = ConcurrentCollections.newConcurrentMap();
private final Map<IndexShard, HashSet<String>> sessionsForShard = new HashMap<>();
private final CopyOnWriteArrayList<Consumer<String>> openSessionListeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<String>> closeSessionListeners = new CopyOnWriteArrayList<>();
public CcrRestoreSourceService(Settings settings) {
super(settings);
}
@Override
public synchronized void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null) {
HashSet<String> sessions = sessionsForShard.remove(indexShard);
if (sessions != null) {
for (String sessionUUID : sessions) {
RestoreContext restore = onGoingRestores.remove(sessionUUID);
IOUtils.closeWhileHandlingException(restore);
}
}
}
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected synchronized void doClose() throws IOException {
sessionsForShard.clear();
IOUtils.closeWhileHandlingException(onGoingRestores.values());
onGoingRestores.clear();
}
// TODO: The listeners are for testing. Once end-to-end file restore is implemented and can be tested,
// these should be removed.
public void addOpenSessionListener(Consumer<String> listener) {
openSessionListeners.add(listener);
}
public void addCloseSessionListener(Consumer<String> listener) {
closeSessionListeners.add(listener);
}
// default visibility for testing
synchronized HashSet<String> getSessionsForShard(IndexShard indexShard) {
return sessionsForShard.get(indexShard);
}
// default visibility for testing
synchronized RestoreContext getOngoingRestore(String sessionUUID) {
return onGoingRestores.get(sessionUUID);
}
// TODO: Add a local timeout for the session. This timeout might might be for the entire session to be
// complete. Or it could be for session to have been touched.
public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException {
boolean success = false;
RestoreContext restore = null;
try {
if (onGoingRestores.containsKey(sessionUUID)) {
logger.debug("not opening new session [{}] as it already exists", sessionUUID);
restore = onGoingRestores.get(sessionUUID);
} else {
logger.debug("opening session [{}] for shard [{}]", sessionUUID, indexShard.shardId());
if (indexShard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(indexShard.shardId(), "cannot open ccr restore session if shard closed");
}
restore = new RestoreContext(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit());
onGoingRestores.put(sessionUUID, restore);
openSessionListeners.forEach(c -> c.accept(sessionUUID));
HashSet<String> sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>());
sessions.add(sessionUUID);
}
Store.MetadataSnapshot metaData = restore.getMetaData();
success = true;
return metaData;
} finally {
if (success == false) {
onGoingRestores.remove(sessionUUID);
IOUtils.closeWhileHandlingException(restore);
}
}
}
public synchronized void closeSession(String sessionUUID) {
closeSessionListeners.forEach(c -> c.accept(sessionUUID));
RestoreContext restore = onGoingRestores.remove(sessionUUID);
if (restore == null) {
logger.info("could not close session [{}] because session not found", sessionUUID);
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
}
IOUtils.closeWhileHandlingException(restore);
}
private class RestoreContext implements Closeable {
private final String sessionUUID;
private final IndexShard indexShard;
private final Engine.IndexCommitRef commitRef;
private RestoreContext(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) {
this.sessionUUID = sessionUUID;
this.indexShard = indexShard;
this.commitRef = commitRef;
}
Store.MetadataSnapshot getMetaData() throws IOException {
indexShard.store().incRef();
try {
return indexShard.store().getMetadata(commitRef.getIndexCommit());
} finally {
indexShard.store().decRef();
}
}
@Override
public void close() {
assert Thread.holdsLock(CcrRestoreSourceService.this);
removeSessionForShard(sessionUUID, indexShard);
IOUtils.closeWhileHandlingException(commitRef);
}
private void removeSessionForShard(String sessionUUID, IndexShard indexShard) {
logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId());
HashSet<String> sessions = sessionsForShard.get(indexShard);
if (sessions != null) {
sessions.remove(sessionUUID);
if (sessions.isEmpty()) {
sessionsForShard.remove(indexShard);
}
}
}
}
}

View File

@ -19,6 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
@ -31,9 +32,11 @@ import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.singletonMap;
@ -151,6 +154,47 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID());
}
public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException {
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
String followerIndex = "index2";
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen(leaderIndex);
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName,
CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions,
"^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false,
false, true, settingsBuilder.build(), new String[0],
"restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]");
Set<String> sessionsOpened = ConcurrentCollections.newConcurrentSet();
Set<String> sessionsClosed = ConcurrentCollections.newConcurrentSet();
for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) {
restoreSourceService.addOpenSessionListener(sessionsOpened::add);
restoreSourceService.addCloseSessionListener(sessionsClosed::add);
}
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
RestoreInfo restoreInfo = future.actionGet();
assertEquals(numberOfPrimaryShards, sessionsOpened.size());
assertEquals(numberOfPrimaryShards, sessionsClosed.size());
assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards());
assertEquals(0, restoreInfo.failedShards());
}
private ActionListener<RestoreService.RestoreCompletionResponse> waitForRestore(ClusterService clusterService,
ActionListener<RestoreInfo> listener) {
return new ActionListener<RestoreService.RestoreCompletionResponse>() {

View File

@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.repository;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.HashSet;
public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
private CcrRestoreSourceService restoreSourceService;
@Before
public void setUp() throws Exception {
super.setUp();
restoreSourceService = new CcrRestoreSourceService(Settings.EMPTY);
}
public void testOpenSession() throws IOException {
IndexShard indexShard1 = newStartedShard(true);
IndexShard indexShard2 = newStartedShard(true);
final String sessionUUID1 = UUIDs.randomBase64UUID();
final String sessionUUID2 = UUIDs.randomBase64UUID();
final String sessionUUID3 = UUIDs.randomBase64UUID();
assertNull(restoreSourceService.getSessionsForShard(indexShard1));
assertNotNull(restoreSourceService.openSession(sessionUUID1, indexShard1));
HashSet<String> sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1);
assertEquals(1, sessionsForShard.size());
assertTrue(sessionsForShard.contains(sessionUUID1));
assertNotNull(restoreSourceService.openSession(sessionUUID2, indexShard1));
sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1);
assertEquals(2, sessionsForShard.size());
assertTrue(sessionsForShard.contains(sessionUUID2));
assertNull(restoreSourceService.getSessionsForShard(indexShard2));
assertNotNull(restoreSourceService.openSession(sessionUUID3, indexShard2));
sessionsForShard = restoreSourceService.getSessionsForShard(indexShard2);
assertEquals(1, sessionsForShard.size());
assertTrue(sessionsForShard.contains(sessionUUID3));
restoreSourceService.closeSession(sessionUUID1);
restoreSourceService.closeSession(sessionUUID2);
restoreSourceService.closeSession(sessionUUID3);
closeShards(indexShard1, indexShard2);
}
public void testCannotOpenSessionForClosedShard() throws IOException {
IndexShard indexShard = newStartedShard(true);
closeShards(indexShard);
String sessionUUID = UUIDs.randomBase64UUID();
expectThrows(IllegalIndexShardStateException.class, () -> restoreSourceService.openSession(sessionUUID, indexShard));
assertNull(restoreSourceService.getOngoingRestore(sessionUUID));
}
public void testCloseSession() throws IOException {
IndexShard indexShard1 = newStartedShard(true);
IndexShard indexShard2 = newStartedShard(true);
final String sessionUUID1 = UUIDs.randomBase64UUID();
final String sessionUUID2 = UUIDs.randomBase64UUID();
final String sessionUUID3 = UUIDs.randomBase64UUID();
restoreSourceService.openSession(sessionUUID1, indexShard1);
restoreSourceService.openSession(sessionUUID2, indexShard1);
restoreSourceService.openSession(sessionUUID3, indexShard2);
assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size());
assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size());
assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID1));
assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID2));
assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID3));
restoreSourceService.closeSession(sessionUUID1);
assertEquals(1, restoreSourceService.getSessionsForShard(indexShard1).size());
assertNull(restoreSourceService.getOngoingRestore(sessionUUID1));
assertFalse(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID1));
assertTrue(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID2));
restoreSourceService.closeSession(sessionUUID2);
assertNull(restoreSourceService.getSessionsForShard(indexShard1));
assertNull(restoreSourceService.getOngoingRestore(sessionUUID2));
restoreSourceService.closeSession(sessionUUID3);
assertNull(restoreSourceService.getSessionsForShard(indexShard2));
assertNull(restoreSourceService.getOngoingRestore(sessionUUID3));
closeShards(indexShard1, indexShard2);
}
public void testCloseShardListenerFunctionality() throws IOException {
IndexShard indexShard1 = newStartedShard(true);
IndexShard indexShard2 = newStartedShard(true);
final String sessionUUID1 = UUIDs.randomBase64UUID();
final String sessionUUID2 = UUIDs.randomBase64UUID();
final String sessionUUID3 = UUIDs.randomBase64UUID();
restoreSourceService.openSession(sessionUUID1, indexShard1);
restoreSourceService.openSession(sessionUUID2, indexShard1);
restoreSourceService.openSession(sessionUUID3, indexShard2);
assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size());
assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size());
restoreSourceService.afterIndexShardClosed(indexShard1.shardId(), indexShard1, Settings.EMPTY);
assertNull(restoreSourceService.getSessionsForShard(indexShard1));
assertNull(restoreSourceService.getOngoingRestore(sessionUUID1));
assertNull(restoreSourceService.getOngoingRestore(sessionUUID2));
restoreSourceService.closeSession(sessionUUID3);
closeShards(indexShard1, indexShard2);
}
}