Remove Custom Listeners from SnapshotsService (#37629)

* Remove Custom Listeners from SnapshotsService

Motivations:
    * Shorten the code some more
    * Use ActionListener#wrap to get easy to reason about behavior in failure scenarios
    * Remove duplication in the logic of handling snapshot completion listeners (listeners removing themselves and comparing snapshots to their targets)
        * Also here, move all listener handling into `SnapshotsService` and remove custom listener class by putting listeners in a map
This commit is contained in:
Armin Braun 2019-01-24 10:11:18 +01:00 committed by GitHub
parent bdef2ab8c0
commit 36889e8a2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 161 deletions

View File

@ -28,8 +28,6 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -72,37 +70,13 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
@Override
protected void masterOperation(final CreateSnapshotRequest request, ClusterState state,
final ActionListener<CreateSnapshotResponse> listener) {
snapshotsService.createSnapshot(request, new SnapshotsService.CreateSnapshotListener() {
@Override
public void onResponse(Snapshot snapshotCreated) {
if (request.waitForCompletion()) {
snapshotsService.addListener(new SnapshotsService.SnapshotCompletionListener() {
@Override
public void onSnapshotCompletion(Snapshot snapshot, SnapshotInfo snapshotInfo) {
if (snapshotCreated.equals(snapshot)) {
listener.onResponse(new CreateSnapshotResponse(snapshotInfo));
snapshotsService.removeListener(this);
}
}
@Override
public void onSnapshotFailure(Snapshot snapshot, Exception e) {
if (snapshotCreated.equals(snapshot)) {
listener.onFailure(e);
snapshotsService.removeListener(this);
}
}
});
} else {
listener.onResponse(new CreateSnapshotResponse());
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
final ActionListener<CreateSnapshotResponse> listener) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request,
ActionListener.wrap(snapshotInfo-> listener.onResponse(new CreateSnapshotResponse(snapshotInfo)), listener::onFailure));
} else {
snapshotsService.createSnapshot(request,
ActionListener.wrap(snapshot -> listener.onResponse(new CreateSnapshotResponse()), listener::onFailure));
}
}
}

View File

@ -67,16 +67,7 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction<Del
@Override
protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
snapshotsService.deleteSnapshot(request.repository(), request.snapshot(), new SnapshotsService.DeleteSnapshotListener() {
@Override
public void onResponse() {
listener.onResponse(new AcknowledgedResponse(true));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}, false);
snapshotsService.deleteSnapshot(request.repository(), request.snapshot(),
ActionListener.wrap(v -> listener.onResponse(new AcknowledgedResponse(true)), listener::onFailure), false);
}
}

View File

@ -80,6 +80,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
@ -91,10 +92,10 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
* <p>
* A typical snapshot creating process looks like this:
* <ul>
* <li>On the master node the {@link #createSnapshot(CreateSnapshotRequest, CreateSnapshotListener)} is called and makes sure that
* <li>On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that
* no snapshot is currently running and registers the new snapshot in cluster state</li>
* <li>When cluster state is updated
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, CreateSnapshotListener)} method kicks in and initializes
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes
* the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
* start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method</li>
@ -118,7 +119,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
private final ThreadPool threadPool;
private final CopyOnWriteArrayList<SnapshotCompletionListener> snapshotCompletionListeners = new CopyOnWriteArrayList<>();
private final Map<Snapshot, List<ActionListener<SnapshotInfo>>> snapshotCompletionListeners = new ConcurrentHashMap<>();
@Inject
public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
@ -225,6 +226,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
return Collections.unmodifiableList(snapshotList);
}
/**
* Same as {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} but invokes its callback on completion of
* the snapshot.
*
* @param request snapshot request
* @param listener snapshot completion listener
*/
public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
createSnapshot(request, ActionListener.wrap(snapshot -> addListener(snapshot, listener), listener::onFailure));
}
/**
* Initializes the snapshotting process.
* <p>
@ -234,7 +246,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param request snapshot request
* @param listener snapshot creation listener
*/
public void createSnapshot(final CreateSnapshotRequest request, final CreateSnapshotListener listener) {
public void createSnapshot(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
final String repositoryName = request.repository();
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
validate(repositoryName, snapshotName);
@ -351,7 +363,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
private void beginSnapshot(final ClusterState clusterState,
final SnapshotsInProgress.Entry snapshot,
final boolean partial,
final CreateSnapshotListener userCreateSnapshotListener) {
final ActionListener<Snapshot> userCreateSnapshotListener) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
boolean snapshotCreated;
@ -491,11 +503,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
private final SnapshotsInProgress.Entry snapshot;
private final boolean snapshotCreated;
private final CreateSnapshotListener userCreateSnapshotListener;
private final ActionListener<Snapshot> userCreateSnapshotListener;
private final Exception e;
CleanupAfterErrorListener(SnapshotsInProgress.Entry snapshot, boolean snapshotCreated,
CreateSnapshotListener userCreateSnapshotListener, Exception e) {
ActionListener<Snapshot> userCreateSnapshotListener, Exception e) {
this.snapshot = snapshot;
this.snapshotCreated = snapshotCreated;
this.userCreateSnapshotListener = userCreateSnapshotListener;
@ -781,9 +793,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
entries.add(updatedSnapshot);
// Clean up the snapshot that failed to start from the old master
deleteSnapshot(snapshot.snapshot(), new DeleteSnapshotListener() {
deleteSnapshot(snapshot.snapshot(), new ActionListener<Void>() {
@Override
public void onResponse() {
public void onResponse(Void aVoid) {
logger.debug("cleaned up abandoned snapshot {} in INIT state", snapshot.snapshot());
}
@ -1077,15 +1089,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
for (SnapshotCompletionListener listener : snapshotCompletionListeners) {
final List<ActionListener<SnapshotInfo>> completionListeners = snapshotCompletionListeners.remove(snapshot);
if (completionListeners != null) {
try {
if (snapshotInfo != null) {
listener.onSnapshotCompletion(snapshot, snapshotInfo);
if (snapshotInfo == null) {
ActionListener.onFailure(completionListeners, failure);
} else {
listener.onSnapshotFailure(snapshot, failure);
ActionListener.onResponse(completionListeners, snapshotInfo);
}
} catch (Exception t) {
logger.warn(() -> new ParameterizedMessage("failed to notify listener [{}]", listener), t);
} catch (Exception e) {
logger.warn("Failed to notify listeners", e);
}
}
if (listener != null) {
@ -1103,7 +1116,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param snapshotName snapshotName
* @param listener listener
*/
public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener,
public void deleteSnapshot(final String repositoryName, final String snapshotName, final ActionListener<Void> listener,
final boolean immediatePriority) {
// First, look for the snapshot in the repository
final Repository repository = repositoriesService.repository(repositoryName);
@ -1143,7 +1156,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param listener listener
* @param repositoryStateId the unique id for the state of the repository
*/
private void deleteSnapshot(final Snapshot snapshot, final DeleteSnapshotListener listener, final long repositoryStateId,
private void deleteSnapshot(final Snapshot snapshot, final ActionListener<Void> listener, final long repositoryStateId,
final boolean immediatePriority) {
Priority priority = immediatePriority ? Priority.IMMEDIATE : Priority.NORMAL;
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) {
@ -1234,8 +1247,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
}
SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards);
snapshots = new SnapshotsInProgress(newSnapshot);
clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, snapshots);
clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(newSnapshot));
}
return clusterStateBuilder.build();
}
@ -1249,50 +1261,34 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (waitForSnapshot) {
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
addListener(new SnapshotCompletionListener() {
@Override
public void onSnapshotCompletion(Snapshot completedSnapshot, SnapshotInfo snapshotInfo) {
if (completedSnapshot.equals(snapshot)) {
logger.debug("deleted snapshot completed - deleting files");
removeListener(this);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
deleteSnapshot(completedSnapshot.getRepository(), completedSnapshot.getSnapshotId().getName(),
listener, true);
} catch (Exception ex) {
logger.warn(() ->
new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex);
}
}
);
}
}
@Override
public void onSnapshotFailure(Snapshot failedSnapshot, Exception e) {
if (failedSnapshot.equals(snapshot)) {
logger.warn("deleted snapshot failed - deleting files", e);
removeListener(this);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
addListener(snapshot, ActionListener.wrap(
snapshotInfo -> {
logger.debug("deleted snapshot completed - deleting files");
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
deleteSnapshot(failedSnapshot.getRepository(),
failedSnapshot.getSnapshotId().getName(),
listener,
true);
} catch (SnapshotMissingException smex) {
logger.info(() -> new ParameterizedMessage(
"Tried deleting in-progress snapshot [{}], but it " +
"could not be found after failing to abort.",
smex.getSnapshotName()), e);
listener.onFailure(new SnapshotException(snapshot,
"Tried deleting in-progress snapshot [" + smex.getSnapshotName() + "], but it " +
"could not be found after failing to abort.", smex));
deleteSnapshot(snapshot.getRepository(), snapshot.getSnapshotId().getName(), listener, true);
} catch (Exception ex) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex);
}
});
}
}
);
},
e -> {
logger.warn("deleted snapshot failed - deleting files", e);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
deleteSnapshot(snapshot.getRepository(), snapshot.getSnapshotId().getName(), listener, true);
} catch (SnapshotMissingException smex) {
logger.info(() -> new ParameterizedMessage(
"Tried deleting in-progress snapshot [{}], but it could not be found after failing to abort.",
smex.getSnapshotName()), e);
listener.onFailure(new SnapshotException(snapshot,
"Tried deleting in-progress snapshot [" + smex.getSnapshotName() + "], but it " +
"could not be found after failing to abort.", smex));
}
});
}
});
));
} else {
logger.debug("deleted snapshot is not running - deleting files");
deleteSnapshotFromRepository(snapshot, listener, repositoryStateId);
@ -1335,8 +1331,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param listener listener
* @param repositoryStateId the unique id representing the state of the repository at the time the deletion began
*/
private void deleteSnapshotFromRepository(final Snapshot snapshot, @Nullable final DeleteSnapshotListener listener,
long repositoryStateId) {
private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener<Void> listener, long repositoryStateId) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
Repository repository = repositoriesService.repository(snapshot.getRepository());
@ -1354,7 +1349,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* Removes the snapshot deletion from {@link SnapshotDeletionsInProgress} in the cluster state.
*/
private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, @Nullable final Exception failure,
@Nullable final DeleteSnapshotListener listener) {
@Nullable final ActionListener<Void> listener) {
clusterService.submitStateUpdateTask("remove snapshot deletion metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
@ -1388,7 +1383,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (failure != null) {
listener.onFailure(failure);
} else {
listener.onResponse();
listener.onResponse(null);
}
}
}
@ -1508,19 +1503,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
/**
* Adds snapshot completion listener
*
* @param snapshot Snapshot to listen for
* @param listener listener
*/
public void addListener(SnapshotCompletionListener listener) {
this.snapshotCompletionListeners.add(listener);
}
/**
* Removes snapshot completion listener
*
* @param listener listener
*/
public void removeListener(SnapshotCompletionListener listener) {
this.snapshotCompletionListeners.remove(listener);
private void addListener(Snapshot snapshot, ActionListener<SnapshotInfo> listener) {
snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener);
}
@Override
@ -1541,45 +1528,4 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public RepositoriesService getRepositoriesService() {
return repositoriesService;
}
/**
* Listener for create snapshot operation
*/
public interface CreateSnapshotListener {
/**
* Called when snapshot has successfully started
*
* @param snapshot snapshot that was created
*/
void onResponse(Snapshot snapshot);
/**
* Called if a snapshot operation couldn't start
*/
void onFailure(Exception e);
}
/**
* Listener for delete snapshot operation
*/
public interface DeleteSnapshotListener {
/**
* Called if delete operation was successful
*/
void onResponse();
/**
* Called if delete operation failed
*/
void onFailure(Exception e);
}
public interface SnapshotCompletionListener {
void onSnapshotCompletion(Snapshot snapshot, SnapshotInfo snapshotInfo);
void onSnapshotFailure(Snapshot snapshot, Exception e);
}
}