This commit adds more logging to the actions that the SLM retention task does. It will help in the event that we need to diagnose any additional issues or problems while running retention.
This commit is contained in:
parent
a68fafd64b
commit
22cf1140eb
|
@ -50,6 +50,7 @@ import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -84,6 +85,15 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String formatSnapshots(Map<String, List<SnapshotInfo>> snapshotMap) {
|
||||||
|
return snapshotMap.entrySet().stream()
|
||||||
|
.map(e -> e.getKey() + ": [" + e.getValue().stream()
|
||||||
|
.map(si -> si.snapshotId().getName())
|
||||||
|
.collect(Collectors.joining(","))
|
||||||
|
+ "]")
|
||||||
|
.collect(Collectors.joining(","));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void triggered(SchedulerEngine.Event event) {
|
public void triggered(SchedulerEngine.Event event) {
|
||||||
assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID) ||
|
assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID) ||
|
||||||
|
@ -111,6 +121,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||||
slmStats.retentionFailed();
|
slmStats.retentionFailed();
|
||||||
updateStateWithStats(slmStats);
|
updateStateWithStats(slmStats);
|
||||||
} finally {
|
} finally {
|
||||||
|
logger.info("SLM retention snapshot cleanup task completed with error");
|
||||||
running.set(false);
|
running.set(false);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -122,18 +133,20 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||||
slmStats.retentionRun();
|
slmStats.retentionRun();
|
||||||
// Find all SLM policies that have retention enabled
|
// Find all SLM policies that have retention enabled
|
||||||
final Map<String, SnapshotLifecyclePolicy> policiesWithRetention = getAllPoliciesWithRetentionEnabled(state);
|
final Map<String, SnapshotLifecyclePolicy> policiesWithRetention = getAllPoliciesWithRetentionEnabled(state);
|
||||||
|
logger.trace("policies with retention enabled: {}", policiesWithRetention.keySet());
|
||||||
|
|
||||||
// For those policies (there may be more than one for the same repo),
|
// For those policies (there may be more than one for the same repo),
|
||||||
// return the repos that we need to get the snapshots for
|
// return the repos that we need to get the snapshots for
|
||||||
final Set<String> repositioriesToFetch = policiesWithRetention.values().stream()
|
final Set<String> repositioriesToFetch = policiesWithRetention.values().stream()
|
||||||
.map(SnapshotLifecyclePolicy::getRepository)
|
.map(SnapshotLifecyclePolicy::getRepository)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
|
logger.trace("fetching snapshots from repositories: {}", repositioriesToFetch);
|
||||||
|
|
||||||
if (repositioriesToFetch.isEmpty()) {
|
if (repositioriesToFetch.isEmpty()) {
|
||||||
running.set(false);
|
running.set(false);
|
||||||
|
logger.info("there are no repositories to fetch, SLM retention snapshot cleanup task complete");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally, asynchronously retrieve all the snapshots, deleting them serially,
|
// Finally, asynchronously retrieve all the snapshots, deleting them serially,
|
||||||
// before updating the cluster state with the new metrics and setting 'running'
|
// before updating the cluster state with the new metrics and setting 'running'
|
||||||
// back to false
|
// back to false
|
||||||
|
@ -141,6 +154,9 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(Map<String, List<SnapshotInfo>> allSnapshots) {
|
public void onResponse(Map<String, List<SnapshotInfo>> allSnapshots) {
|
||||||
try {
|
try {
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("retrieved snapshots: [{}]", formatSnapshots(allSnapshots));
|
||||||
|
}
|
||||||
// Find all the snapshots that are past their retention date
|
// Find all the snapshots that are past their retention date
|
||||||
final Map<String, List<SnapshotInfo>> snapshotsToBeDeleted = allSnapshots.entrySet().stream()
|
final Map<String, List<SnapshotInfo>> snapshotsToBeDeleted = allSnapshots.entrySet().stream()
|
||||||
.collect(Collectors.toMap(Map.Entry::getKey,
|
.collect(Collectors.toMap(Map.Entry::getKey,
|
||||||
|
@ -148,11 +164,16 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||||
.filter(snapshot -> snapshotEligibleForDeletion(snapshot, allSnapshots, policiesWithRetention))
|
.filter(snapshot -> snapshotEligibleForDeletion(snapshot, allSnapshots, policiesWithRetention))
|
||||||
.collect(Collectors.toList())));
|
.collect(Collectors.toList())));
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("snapshots eligible for deletion: [{}]", formatSnapshots(snapshotsToBeDeleted));
|
||||||
|
}
|
||||||
|
|
||||||
// Finally, delete the snapshots that need to be deleted
|
// Finally, delete the snapshots that need to be deleted
|
||||||
maybeDeleteSnapshots(snapshotsToBeDeleted, maxDeletionTime, slmStats);
|
maybeDeleteSnapshots(snapshotsToBeDeleted, maxDeletionTime, slmStats);
|
||||||
|
|
||||||
updateStateWithStats(slmStats);
|
updateStateWithStats(slmStats);
|
||||||
} finally {
|
} finally {
|
||||||
|
logger.info("SLM retention snapshot cleanup task complete");
|
||||||
running.set(false);
|
running.set(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -240,6 +261,9 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||||
final CountDown countDown = new CountDown(repositories.size());
|
final CountDown countDown = new CountDown(repositories.size());
|
||||||
final Runnable onComplete = () -> {
|
final Runnable onComplete = () -> {
|
||||||
if (countDown.countDown()) {
|
if (countDown.countDown()) {
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("retrieved snapshots: {}", formatSnapshots(snapshots));
|
||||||
|
}
|
||||||
listener.onResponse(snapshots);
|
listener.onResponse(snapshots);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -309,6 +333,8 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||||
|
|
||||||
ClusterState state = clusterService.state();
|
ClusterState state = clusterService.state();
|
||||||
if (okayToDeleteSnapshots(state)) {
|
if (okayToDeleteSnapshots(state)) {
|
||||||
|
logger.trace("there are no snapshots currently running, proceeding with snapshot deletion of [{}]",
|
||||||
|
formatSnapshots(snapshotsToDelete));
|
||||||
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
|
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
|
||||||
} else {
|
} else {
|
||||||
logger.debug("a snapshot is currently running, rescheduling SLM retention for after snapshot has completed");
|
logger.debug("a snapshot is currently running, rescheduling SLM retention for after snapshot has completed");
|
||||||
|
@ -318,6 +344,8 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||||
new NoSnapshotRunningListener(observer,
|
new NoSnapshotRunningListener(observer,
|
||||||
newState -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> {
|
newState -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> {
|
||||||
try {
|
try {
|
||||||
|
logger.trace("received cluster state without running snapshots, proceeding with snapshot deletion of [{}]",
|
||||||
|
formatSnapshots(snapshotsToDelete));
|
||||||
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
|
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
|
||||||
} finally {
|
} finally {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
|
@ -328,7 +356,11 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||||
throw new ElasticsearchException(e);
|
throw new ElasticsearchException(e);
|
||||||
}));
|
}));
|
||||||
try {
|
try {
|
||||||
latch.await();
|
logger.trace("waiting for snapshot deletion to complete");
|
||||||
|
// Wait until we find a cluster state not running a snapshot operation.
|
||||||
|
// If we can't find one within a day, give up and throw an error.
|
||||||
|
latch.await(1, TimeUnit.DAYS);
|
||||||
|
logger.trace("deletion complete");
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new ElasticsearchException(e);
|
throw new ElasticsearchException(e);
|
||||||
}
|
}
|
||||||
|
@ -447,24 +479,28 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
||||||
// Cannot delete during a snapshot
|
// Cannot delete during a snapshot
|
||||||
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
|
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
|
||||||
if (snapshotsInProgress != null && snapshotsInProgress.entries().size() > 0) {
|
if (snapshotsInProgress != null && snapshotsInProgress.entries().size() > 0) {
|
||||||
|
logger.trace("deletion cannot proceed as there are snapshots in progress");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cannot delete during an existing delete
|
// Cannot delete during an existing delete
|
||||||
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
|
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
|
||||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||||
|
logger.trace("deletion cannot proceed as there are snapshot deletions in progress");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cannot delete while a repository is being cleaned
|
// Cannot delete while a repository is being cleaned
|
||||||
final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
|
final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
|
||||||
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
|
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
|
||||||
|
logger.trace("deletion cannot proceed as there are repository cleanups in progress");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cannot delete during a restore
|
// Cannot delete during a restore
|
||||||
final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
|
final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
|
||||||
if (restoreInProgress != null && restoreInProgress.isEmpty() == false) {
|
if (restoreInProgress != null && restoreInProgress.isEmpty() == false) {
|
||||||
|
logger.trace("deletion cannot proceed as there are snapshot restores in progress");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue