[Transform] automatic deletion of old checkpoints (#49496)

add automatic deletion of old checkpoints based on count and time
This commit is contained in:
Hendrik Muhs 2019-12-04 07:55:57 +01:00
parent 602369c8b9
commit c33be29dc7
7 changed files with 262 additions and 32 deletions

View File

@ -50,7 +50,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
// checkpoint of the indexes (sequence id's)
public static final ParseField INDICES = new ParseField("indices");
private static final String NAME = "data_frame_transform_checkpoint";
public static final String NAME = "data_frame_transform_checkpoint";
private static final ConstructingObjectParser<TransformCheckpoint, Void> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<TransformCheckpoint, Void> LENIENT_PARSER = createParser(true);

View File

@ -39,6 +39,7 @@ import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
@ -63,6 +64,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@ -146,16 +148,18 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
@Override
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
deleteByQueryRequest.indices(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
).setQuery(
);
deleteByQueryRequest.setQuery(
QueryBuilders.constantScoreQuery(
QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
.filter(QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId)))
)
).setIndicesOptions(IndicesOptions.lenientExpandOpen());
);
executeAsyncWithOrigin(
client,
@ -177,17 +181,18 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
@Override
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
deleteByQueryRequest.indices(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
).setQuery(
);
deleteByQueryRequest.setQuery(
QueryBuilders.constantScoreQuery(
QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
.filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId)))
)
).setIndicesOptions(IndicesOptions.lenientExpandOpen());
);
executeAsyncWithOrigin(
client,
TRANSFORM_ORIGIN,
@ -206,6 +211,41 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
);
}
@Override
public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener) {
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
deleteByQueryRequest.indices(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
);
deleteByQueryRequest.setQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId))
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformCheckpoint.NAME))
.filter(QueryBuilders.rangeQuery(TransformCheckpoint.CHECKPOINT.getPreferredName()).lt(deleteCheckpointsBelow))
.filter(
QueryBuilders.rangeQuery(TransformField.TIMESTAMP_MILLIS.getPreferredName()).lt(deleteOlderThan).format("epoch_millis")
)
);
logger.debug("Deleting old checkpoints using {}", deleteByQueryRequest.getSearchRequest());
executeAsyncWithOrigin(
client,
TRANSFORM_ORIGIN,
DeleteByQueryAction.INSTANCE,
deleteByQueryRequest,
ActionListener.wrap(response -> {
if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
listener.onFailure(
new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())
);
return;
}
listener.onResponse(response.getDeleted());
}, listener::onFailure)
);
}
private void putTransformConfiguration(
TransformConfig transformConfig,
DocWriteRequest.OpType optType,
@ -419,9 +459,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
@Override
public void deleteTransform(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest request = new DeleteByQueryRequest().setAbortOnVersionConflict(false); // since these documents are not
// updated, a conflict just means it was
// deleted previously
DeleteByQueryRequest request = createDeleteByQueryRequest();
request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED);
QueryBuilder query = QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId);
@ -675,4 +713,22 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
}
return new Tuple<>(status, reason);
}
/**
* Create DBQ request with good defaults
*
* @return new DeleteByQueryRequest with some defaults set
*/
private static DeleteByQueryRequest createDeleteByQueryRequest() {
DeleteByQueryRequest deleteByQuery = new DeleteByQueryRequest();
deleteByQuery.setAbortOnVersionConflict(false)
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
.setIndicesOptions(IndicesOptions.lenientExpandOpen());
// disable scoring by using index order
deleteByQuery.getSearchRequest().source().sort(SINGLE_MAPPING_NAME);
return deleteByQuery;
}
}

View File

@ -72,6 +72,18 @@ public interface TransformConfigManager {
*/
void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener);
/**
* This deletes stored checkpoint documents for the given transformId, based on number and age.
*
* Both criteria MUST apply for the deletion to happen.
*
* @param transformId The transform ID referenced by the documents
* @param deleteCheckpointsBelow checkpoints lower than this to delete
* @param deleteOlderThan checkpoints older than this to delete
* @param listener listener to alert on completion, returning number of deleted checkpoints
*/
void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener);
/**
* Get a stored checkpoint, requires the transform id as well as the checkpoint id
*

View File

@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
@ -55,8 +56,9 @@ public final class TransformInternalIndex {
* progress::docs_processed, progress::docs_indexed,
* stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed,
* stats::exponential_avg_documents_processed
*
* version 3 (7.5): rename to .transform-internal-xxx
* version 4 (7.6): state::should_stop_at_checkpoint
* checkpoint::checkpoint
*/
// constants for mappings
@ -115,26 +117,27 @@ public final class TransformInternalIndex {
builder.startObject(SINGLE_MAPPING_NAME);
addMetaInformation(builder);
builder.field(DYNAMIC, "false");
builder.startObject(PROPERTIES)
.startObject(TRANSFORM_ID)
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
.field(TYPE, TEXT)
.startObject(FIELDS)
.startObject(RAW)
.field(TYPE, KEYWORD)
.endObject()
.endObject()
builder
.startObject(PROPERTIES)
.startObject(TRANSFORM_ID)
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
.field(TYPE, TEXT)
.startObject(FIELDS)
.startObject(RAW)
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.startObject(AbstractAuditMessage.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.field(TYPE, DATE)
.endObject()
.startObject(AbstractAuditMessage.NODE_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
@ -260,9 +263,6 @@ public final class TransformInternalIndex {
.endObject()
.endObject()
.endObject();
// This is obsolete and can be removed for future versions of the index, but is left here as a warning/reminder that
// we cannot declare this field differently in version 1 of the internal index as it would cause a mapping clash
// .startObject("checkpointing").field(ENABLED, false).endObject();
}
public static XContentBuilder addTransformsConfigMappings(XContentBuilder builder) throws IOException {
@ -303,6 +303,9 @@ public final class TransformInternalIndex {
.endObject()
.startObject(TransformField.TIME_UPPER_BOUND_MILLIS.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(TransformCheckpoint.CHECKPOINT.getPreferredName())
.field(TYPE, LONG)
.endObject();
}

View File

@ -79,8 +79,14 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
public static final int MINIMUM_PAGE_SIZE = 10;
public static final String COMPOSITE_AGGREGATION_NAME = "_transform";
private static final Logger logger = LogManager.getLogger(TransformIndexer.class);
// constant for checkpoint retention, static for now
private static final long NUMBER_OF_CHECKPOINTS_TO_KEEP = 10;
private static final long RETENTION_OF_CHECKPOINTS_MS = 864000000L; // 10 days
private static final long CHECKPOINT_CLEANUP_INTERVAL = 100L; // every 100 checkpoints
protected final TransformConfigManager transformsConfigManager;
private final CheckpointProvider checkpointProvider;
private final TransformProgressGatherer progressGatherer;
@ -111,6 +117,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
private volatile Map<String, Set<String>> changedBuckets;
private volatile Map<String, Object> changedBucketsAfterKey;
private volatile long lastCheckpointCleanup = 0L;
public TransformIndexer(
Executor executor,
TransformConfigManager transformsConfigManager,
@ -375,7 +383,13 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
if (context.shouldStopAtCheckpoint()) {
stop();
}
listener.onResponse(null);
if (checkpoint - lastCheckpointCleanup > CHECKPOINT_CLEANUP_INTERVAL) {
// delete old checkpoints, on a failure we keep going
cleanupOldCheckpoints(listener);
} else {
listener.onResponse(null);
}
} catch (Exception e) {
listener.onFailure(e);
}
@ -492,6 +506,44 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
}
}
/**
* Cleanup old checkpoints
*
* @param listener listener to call after done
*/
private void cleanupOldCheckpoints(ActionListener<Void> listener) {
long now = getTime();
long checkpointLowerBound = context.getCheckpoint() - NUMBER_OF_CHECKPOINTS_TO_KEEP;
long lowerBoundEpochMs = now - RETENTION_OF_CHECKPOINTS_MS;
if (checkpointLowerBound > 0 && lowerBoundEpochMs > 0) {
transformsConfigManager.deleteOldCheckpoints(
transformConfig.getId(),
checkpointLowerBound,
lowerBoundEpochMs,
ActionListener.wrap(deletes -> {
logger.debug("[{}] deleted [{}] outdated checkpoints", getJobId(), deletes);
listener.onResponse(null);
lastCheckpointCleanup = context.getCheckpoint();
}, e -> {
logger.warn(
new ParameterizedMessage("[{}] failed to cleanup old checkpoints, retrying after next checkpoint", getJobId()),
e
);
auditor.warning(
getJobId(),
"Failed to cleanup old checkpoints, retrying after next checkpoint. Exception: " + e.getMessage()
);
listener.onResponse(null);
})
);
} else {
logger.debug("[{}] checked for outdated checkpoints", getJobId());
listener.onResponse(null);
}
}
private void sourceHasChanged(ActionListener<Boolean> hasChangedListener) {
checkpointProvider.sourceHasChanged(getLastCheckpoint(), ActionListener.wrap(hasChanged -> {
logger.trace("[{}] change detected [{}].", getJobId(), hasChanged);
@ -788,6 +840,13 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
context.markAsFailed(failureMessage);
}
/*
* Get the current time, abstracted for the purpose of testing
*/
long getTime() {
return System.currentTimeMillis();
}
/**
* Indicates if an audit message should be written when onFinish is called for the given checkpoint
* We audit the first checkpoint, and then every 10 checkpoints until completedCheckpoint == 99

View File

@ -81,6 +81,16 @@ public class InMemoryTransformConfigManager implements TransformConfigManager {
listener.onResponse(true);
}
@Override
public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener) {
List<TransformCheckpoint> checkpointsById = checkpoints.get(transformId);
int sizeBeforeDelete = checkpointsById.size();
if (checkpointsById != null) {
checkpointsById.removeIf(cp -> { return cp.getCheckpoint() < deleteCheckpointsBelow && cp.getTimestamp() < deleteOlderThan; });
}
listener.onResponse(Long.valueOf(sizeBeforeDelete - checkpointsById.size()));
}
@Override
public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener<TransformCheckpoint> resultListener) {
List<TransformCheckpoint> checkpointsById = checkpoints.get(transformId);

View File

@ -425,4 +425,94 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
is(true)
);
}
public void testDeleteOldCheckpoints() throws InterruptedException {
String transformId = randomAlphaOfLengthBetween(1, 10);
long timestamp = System.currentTimeMillis() - randomLongBetween(20000, 40000);
// create some other docs to check they are not getting accidentally deleted
TransformStoredDoc storedDocs = TransformStoredDocTests.randomTransformStoredDoc(transformId);
SeqNoPrimaryTermAndIndex firstIndex = new SeqNoPrimaryTermAndIndex(0, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME);
assertAsync(listener -> transformConfigManager.putOrUpdateTransformStoredDoc(storedDocs, null, listener), firstIndex, null, null);
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);
assertAsync(listener -> transformConfigManager.putTransformConfiguration(transformConfig, listener), true, null, null);
// create 100 checkpoints
for (int i = 1; i <= 100; i++) {
TransformCheckpoint checkpoint = new TransformCheckpoint(
transformId,
timestamp + i * 200,
i,
Collections.emptyMap(),
timestamp - 100 + i * 200
);
assertAsync(listener -> transformConfigManager.putTransformCheckpoint(checkpoint, listener), true, null, null);
}
// read a random checkpoint
int randomCheckpoint = randomIntBetween(1, 100);
TransformCheckpoint checkpointExpected = new TransformCheckpoint(
transformId,
timestamp + randomCheckpoint * 200,
randomCheckpoint,
Collections.emptyMap(),
timestamp - 100 + randomCheckpoint * 200
);
assertAsync(
listener -> transformConfigManager.getTransformCheckpoint(transformId, randomCheckpoint, listener),
checkpointExpected,
null,
null
);
// test delete based on checkpoint number (time would allow more)
assertAsync(
listener -> transformConfigManager.deleteOldCheckpoints(transformId, 11L, timestamp + 1 + 20L * 200, listener),
10L,
null,
null
);
// test delete based on time (checkpoint number would allow more)
assertAsync(
listener -> transformConfigManager.deleteOldCheckpoints(transformId, 30L, timestamp + 1 + 20L * 200, listener),
10L,
null,
null
);
// zero delete
assertAsync(
listener -> transformConfigManager.deleteOldCheckpoints(transformId, 30L, timestamp + 1 + 20L * 200, listener),
0L,
null,
null
);
// delete the rest
assertAsync(
listener -> transformConfigManager.deleteOldCheckpoints(transformId, 101L, timestamp + 1 + 100L * 200, listener),
80L,
null,
null
);
// test that the other docs are still there
assertAsync(
listener -> transformConfigManager.getTransformStoredDoc(transformId, listener),
Tuple.tuple(storedDocs, firstIndex),
null,
null
);
assertAsync(
listener -> transformConfigManager.getTransformConfiguration(transformConfig.getId(), listener),
transformConfig,
null,
null
);
}
}