From 36889e8a2f8320a4e8a7528b7247f087346b1a85 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 24 Jan 2019 10:11:18 +0100 Subject: [PATCH] Remove Custom Listeners from SnapshotsService (#37629) * Remove Custom Listeners from SnapshotsService Motivations: * Shorten the code some more * Use ActionListener#wrap to get easy to reason about behavior in failure scenarios * Remove duplication in the logic of handling snapshot completion listeners (listeners removing themselves and comparing snapshots to their targets) * Also here, move all listener handling into `SnapshotsService` and remove custom listener class by putting listeners in a map --- .../create/TransportCreateSnapshotAction.java | 42 +---- .../delete/TransportDeleteSnapshotAction.java | 13 +- .../snapshots/SnapshotsService.java | 178 ++++++------------ 3 files changed, 72 insertions(+), 161 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java index abd50f0785a..fe4089df945 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java @@ -28,8 +28,6 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.snapshots.Snapshot; -import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -72,37 +70,13 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction listener) { - snapshotsService.createSnapshot(request, new SnapshotsService.CreateSnapshotListener() { - @Override - public void onResponse(Snapshot snapshotCreated) { - if (request.waitForCompletion()) { - snapshotsService.addListener(new SnapshotsService.SnapshotCompletionListener() { - @Override - public void onSnapshotCompletion(Snapshot snapshot, SnapshotInfo snapshotInfo) { - if (snapshotCreated.equals(snapshot)) { - listener.onResponse(new CreateSnapshotResponse(snapshotInfo)); - snapshotsService.removeListener(this); - } - } - - @Override - public void onSnapshotFailure(Snapshot snapshot, Exception e) { - if (snapshotCreated.equals(snapshot)) { - listener.onFailure(e); - snapshotsService.removeListener(this); - } - } - }); - } else { - listener.onResponse(new CreateSnapshotResponse()); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + final ActionListener listener) { + if (request.waitForCompletion()) { + snapshotsService.executeSnapshot(request, + ActionListener.wrap(snapshotInfo-> listener.onResponse(new CreateSnapshotResponse(snapshotInfo)), listener::onFailure)); + } else { + snapshotsService.createSnapshot(request, + ActionListener.wrap(snapshot -> listener.onResponse(new CreateSnapshotResponse()), listener::onFailure)); + } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java index e53330349b3..dfb38aaed21 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java @@ -67,16 +67,7 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction listener) { - snapshotsService.deleteSnapshot(request.repository(), request.snapshot(), new SnapshotsService.DeleteSnapshotListener() { - @Override - public void onResponse() { - listener.onResponse(new AcknowledgedResponse(true)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, false); + snapshotsService.deleteSnapshot(request.repository(), request.snapshot(), + ActionListener.wrap(v -> listener.onResponse(new AcknowledgedResponse(true)), listener::onFailure), false); } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 4e8c26ea593..af6d7055e53 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -80,6 +80,7 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; @@ -91,10 +92,10 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed; *

* A typical snapshot creating process looks like this: *

    - *
  • On the master node the {@link #createSnapshot(CreateSnapshotRequest, CreateSnapshotListener)} is called and makes sure that + *
  • On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that * no snapshot is currently running and registers the new snapshot in cluster state
  • *
  • When cluster state is updated - * the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, CreateSnapshotListener)} method kicks in and initializes + * the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state
  • *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method
  • @@ -118,7 +119,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final ThreadPool threadPool; - private final CopyOnWriteArrayList snapshotCompletionListeners = new CopyOnWriteArrayList<>(); + private final Map>> snapshotCompletionListeners = new ConcurrentHashMap<>(); @Inject public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, @@ -225,6 +226,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus return Collections.unmodifiableList(snapshotList); } + /** + * Same as {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} but invokes its callback on completion of + * the snapshot. + * + * @param request snapshot request + * @param listener snapshot completion listener + */ + public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { + createSnapshot(request, ActionListener.wrap(snapshot -> addListener(snapshot, listener), listener::onFailure)); + } + /** * Initializes the snapshotting process. *

    @@ -234,7 +246,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * @param request snapshot request * @param listener snapshot creation listener */ - public void createSnapshot(final CreateSnapshotRequest request, final CreateSnapshotListener listener) { + public void createSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { final String repositoryName = request.repository(); final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); validate(repositoryName, snapshotName); @@ -351,7 +363,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private void beginSnapshot(final ClusterState clusterState, final SnapshotsInProgress.Entry snapshot, final boolean partial, - final CreateSnapshotListener userCreateSnapshotListener) { + final ActionListener userCreateSnapshotListener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { boolean snapshotCreated; @@ -491,11 +503,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final SnapshotsInProgress.Entry snapshot; private final boolean snapshotCreated; - private final CreateSnapshotListener userCreateSnapshotListener; + private final ActionListener userCreateSnapshotListener; private final Exception e; CleanupAfterErrorListener(SnapshotsInProgress.Entry snapshot, boolean snapshotCreated, - CreateSnapshotListener userCreateSnapshotListener, Exception e) { + ActionListener userCreateSnapshotListener, Exception e) { this.snapshot = snapshot; this.snapshotCreated = snapshotCreated; this.userCreateSnapshotListener = userCreateSnapshotListener; @@ -781,9 +793,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus entries.add(updatedSnapshot); // Clean up the snapshot that failed to start from the old master - deleteSnapshot(snapshot.snapshot(), new DeleteSnapshotListener() { + deleteSnapshot(snapshot.snapshot(), new ActionListener() { @Override - public void onResponse() { + public void onResponse(Void aVoid) { logger.debug("cleaned up abandoned snapshot {} in INIT state", snapshot.snapshot()); } @@ -1077,15 +1089,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - for (SnapshotCompletionListener listener : snapshotCompletionListeners) { + final List> completionListeners = snapshotCompletionListeners.remove(snapshot); + if (completionListeners != null) { try { - if (snapshotInfo != null) { - listener.onSnapshotCompletion(snapshot, snapshotInfo); + if (snapshotInfo == null) { + ActionListener.onFailure(completionListeners, failure); } else { - listener.onSnapshotFailure(snapshot, failure); + ActionListener.onResponse(completionListeners, snapshotInfo); } - } catch (Exception t) { - logger.warn(() -> new ParameterizedMessage("failed to notify listener [{}]", listener), t); + } catch (Exception e) { + logger.warn("Failed to notify listeners", e); } } if (listener != null) { @@ -1103,7 +1116,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * @param snapshotName snapshotName * @param listener listener */ - public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener, + public void deleteSnapshot(final String repositoryName, final String snapshotName, final ActionListener listener, final boolean immediatePriority) { // First, look for the snapshot in the repository final Repository repository = repositoriesService.repository(repositoryName); @@ -1143,7 +1156,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * @param listener listener * @param repositoryStateId the unique id for the state of the repository */ - private void deleteSnapshot(final Snapshot snapshot, final DeleteSnapshotListener listener, final long repositoryStateId, + private void deleteSnapshot(final Snapshot snapshot, final ActionListener listener, final long repositoryStateId, final boolean immediatePriority) { Priority priority = immediatePriority ? Priority.IMMEDIATE : Priority.NORMAL; clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) { @@ -1234,8 +1247,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } } SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards); - snapshots = new SnapshotsInProgress(newSnapshot); - clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, snapshots); + clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(newSnapshot)); } return clusterStateBuilder.build(); } @@ -1249,50 +1261,34 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (waitForSnapshot) { logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); - addListener(new SnapshotCompletionListener() { - @Override - public void onSnapshotCompletion(Snapshot completedSnapshot, SnapshotInfo snapshotInfo) { - if (completedSnapshot.equals(snapshot)) { - logger.debug("deleted snapshot completed - deleting files"); - removeListener(this); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { - deleteSnapshot(completedSnapshot.getRepository(), completedSnapshot.getSnapshotId().getName(), - listener, true); - - } catch (Exception ex) { - logger.warn(() -> - new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex); - } - } - ); - } - } - - @Override - public void onSnapshotFailure(Snapshot failedSnapshot, Exception e) { - if (failedSnapshot.equals(snapshot)) { - logger.warn("deleted snapshot failed - deleting files", e); - removeListener(this); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + addListener(snapshot, ActionListener.wrap( + snapshotInfo -> { + logger.debug("deleted snapshot completed - deleting files"); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { - deleteSnapshot(failedSnapshot.getRepository(), - failedSnapshot.getSnapshotId().getName(), - listener, - true); - } catch (SnapshotMissingException smex) { - logger.info(() -> new ParameterizedMessage( - "Tried deleting in-progress snapshot [{}], but it " + - "could not be found after failing to abort.", - smex.getSnapshotName()), e); - listener.onFailure(new SnapshotException(snapshot, - "Tried deleting in-progress snapshot [" + smex.getSnapshotName() + "], but it " + - "could not be found after failing to abort.", smex)); + deleteSnapshot(snapshot.getRepository(), snapshot.getSnapshotId().getName(), listener, true); + } catch (Exception ex) { + logger.warn(() -> new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex); } - }); - } + } + ); + }, + e -> { + logger.warn("deleted snapshot failed - deleting files", e); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + deleteSnapshot(snapshot.getRepository(), snapshot.getSnapshotId().getName(), listener, true); + } catch (SnapshotMissingException smex) { + logger.info(() -> new ParameterizedMessage( + "Tried deleting in-progress snapshot [{}], but it could not be found after failing to abort.", + smex.getSnapshotName()), e); + listener.onFailure(new SnapshotException(snapshot, + "Tried deleting in-progress snapshot [" + smex.getSnapshotName() + "], but it " + + "could not be found after failing to abort.", smex)); + } + }); } - }); + )); } else { logger.debug("deleted snapshot is not running - deleting files"); deleteSnapshotFromRepository(snapshot, listener, repositoryStateId); @@ -1335,8 +1331,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * @param listener listener * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began */ - private void deleteSnapshotFromRepository(final Snapshot snapshot, @Nullable final DeleteSnapshotListener listener, - long repositoryStateId) { + private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { Repository repository = repositoriesService.repository(snapshot.getRepository()); @@ -1354,7 +1349,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * Removes the snapshot deletion from {@link SnapshotDeletionsInProgress} in the cluster state. */ private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, @Nullable final Exception failure, - @Nullable final DeleteSnapshotListener listener) { + @Nullable final ActionListener listener) { clusterService.submitStateUpdateTask("remove snapshot deletion metadata", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -1388,7 +1383,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (failure != null) { listener.onFailure(failure); } else { - listener.onResponse(); + listener.onResponse(null); } } } @@ -1508,19 +1503,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus /** * Adds snapshot completion listener * + * @param snapshot Snapshot to listen for * @param listener listener */ - public void addListener(SnapshotCompletionListener listener) { - this.snapshotCompletionListeners.add(listener); - } - - /** - * Removes snapshot completion listener - * - * @param listener listener - */ - public void removeListener(SnapshotCompletionListener listener) { - this.snapshotCompletionListeners.remove(listener); + private void addListener(Snapshot snapshot, ActionListener listener) { + snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener); } @Override @@ -1541,45 +1528,4 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public RepositoriesService getRepositoriesService() { return repositoriesService; } - - /** - * Listener for create snapshot operation - */ - public interface CreateSnapshotListener { - - /** - * Called when snapshot has successfully started - * - * @param snapshot snapshot that was created - */ - void onResponse(Snapshot snapshot); - - /** - * Called if a snapshot operation couldn't start - */ - void onFailure(Exception e); - } - - /** - * Listener for delete snapshot operation - */ - public interface DeleteSnapshotListener { - - /** - * Called if delete operation was successful - */ - void onResponse(); - - /** - * Called if delete operation failed - */ - void onFailure(Exception e); - } - - public interface SnapshotCompletionListener { - - void onSnapshotCompletion(Snapshot snapshot, SnapshotInfo snapshotInfo); - - void onSnapshotFailure(Snapshot snapshot, Exception e); - } }