mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-23 13:26:02 +00:00
This sets up all indexing to one of our write aliases to require it actually be an alias. This allows failures scenarios to be captured quickly, loudly, and then potentially recovered.
This commit is contained in:
parent
ac6c806ec7
commit
a6da1fd73e
@ -110,6 +110,7 @@ public class DeleteJobIT extends MlNativeAutodetectIntegTestCase {
|
||||
try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
|
||||
return new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME)
|
||||
.source(xContentBuilder)
|
||||
.setRequireAlias(true)
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
}
|
||||
}
|
||||
|
@ -203,8 +203,9 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase {
|
||||
ModelSnapshot.Builder modelSnapshotBuilder = new ModelSnapshot.Builder();
|
||||
modelSnapshotBuilder.setJobId(jobId).setSnapshotId(snapshotId).setTimestamp(timestamp).setSnapshotDocCount(numDocs);
|
||||
|
||||
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId));
|
||||
indexRequest.id(ModelSnapshot.documentId(jobId, snapshotId));
|
||||
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId))
|
||||
.id(ModelSnapshot.documentId(jobId, snapshotId))
|
||||
.setRequireAlias(true);
|
||||
if (immediateRefresh) {
|
||||
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
}
|
||||
@ -219,12 +220,12 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase {
|
||||
private void persistModelStateDocs(String jobId, String snapshotId, int numDocs) {
|
||||
assertThat(numDocs, greaterThan(0));
|
||||
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
BulkRequest bulkRequest = new BulkRequest().requireAlias(true);
|
||||
for (int i = 1; i <= numDocs; ++i) {
|
||||
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
|
||||
indexRequest.id(ModelState.documentId(jobId, snapshotId, i));
|
||||
// The exact contents of the model state doesn't matter - we are not going to try and restore it
|
||||
indexRequest.source(Collections.singletonMap("compressed", Collections.singletonList("foo")));
|
||||
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias())
|
||||
.id(ModelState.documentId(jobId, snapshotId, i))
|
||||
// The exact contents of the model state doesn't matter - we are not going to try and restore it
|
||||
.source(Collections.singletonMap("compressed", Collections.singletonList("foo")));
|
||||
bulkRequest.add(indexRequest);
|
||||
}
|
||||
|
||||
|
@ -216,6 +216,7 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
|
||||
try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
|
||||
return new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME)
|
||||
.source(xContentBuilder)
|
||||
.setRequireAlias(true)
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
}
|
||||
}
|
||||
|
@ -451,6 +451,7 @@ public class MlConfigMigrator {
|
||||
String documentId = "ml-config";
|
||||
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias())
|
||||
.id(documentId)
|
||||
.setRequireAlias(true)
|
||||
.opType(DocWriteRequest.OpType.CREATE);
|
||||
|
||||
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"));
|
||||
|
@ -74,7 +74,7 @@ public class AnnotationPersister {
|
||||
public class Builder {
|
||||
|
||||
private final String jobId;
|
||||
private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME);
|
||||
private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true);
|
||||
private Supplier<Boolean> shouldRetry = () -> true;
|
||||
|
||||
private Builder(String jobId) {
|
||||
@ -115,7 +115,7 @@ public class AnnotationPersister {
|
||||
BulkResponse bulkResponse =
|
||||
resultsPersisterService.bulkIndexWithRetry(
|
||||
bulkRequest, jobId, shouldRetry, msg -> auditor.warning(jobId, "Bulk indexing of annotations failed " + msg));
|
||||
bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME);
|
||||
bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true);
|
||||
return bulkResponse;
|
||||
}
|
||||
}
|
||||
|
@ -299,6 +299,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
|
||||
}
|
||||
IndexRequest indexRequest = new IndexRequest(indexOrAlias)
|
||||
.id(progressDocId)
|
||||
.setRequireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias))
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
|
||||
new StoredProgress(stats.get().getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
|
||||
|
@ -44,6 +44,7 @@ public class StatsPersister {
|
||||
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
|
||||
WriteRequest.RefreshPolicy.NONE,
|
||||
docIdSupplier.apply(jobId),
|
||||
true,
|
||||
() -> true,
|
||||
errorMsg -> auditor.error(jobId,
|
||||
"failed to persist result with id [" + docIdSupplier.apply(jobId) + "]; " + errorMsg)
|
||||
|
@ -174,7 +174,7 @@ public class TrainedModelStatsService {
|
||||
if (stats.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
BulkRequest bulkRequest = new BulkRequest().requireAlias(true);
|
||||
stats.stream().map(TrainedModelStatsService::buildUpdateRequest).filter(Objects::nonNull).forEach(bulkRequest::add);
|
||||
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
if (bulkRequest.requests().isEmpty()) {
|
||||
|
@ -65,6 +65,7 @@ public class JobDataCountsPersister {
|
||||
ToXContent.EMPTY_PARAMS,
|
||||
WriteRequest.RefreshPolicy.NONE,
|
||||
DataCounts.documentId(jobId),
|
||||
true,
|
||||
() -> true,
|
||||
(msg) -> auditor.warning(jobId, "Job data_counts " + msg));
|
||||
} catch (IOException ioe) {
|
||||
@ -88,6 +89,7 @@ public class JobDataCountsPersister {
|
||||
try (XContentBuilder content = serialiseCounts(counts)) {
|
||||
final IndexRequest request = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId))
|
||||
.id(DataCounts.documentId(jobId))
|
||||
.setRequireAlias(true)
|
||||
.source(content);
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
|
@ -266,7 +266,7 @@ public class JobResultsPersister {
|
||||
public void persistCategoryDefinition(CategoryDefinition category, Supplier<Boolean> shouldRetry) {
|
||||
Persistable persistable =
|
||||
new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId()), category.getJobId(), category, category.getId());
|
||||
persistable.persist(shouldRetry);
|
||||
persistable.persist(shouldRetry, true);
|
||||
// Don't commit as we expect masses of these updates and they're not
|
||||
// read again by this process
|
||||
}
|
||||
@ -290,7 +290,7 @@ public class JobResultsPersister {
|
||||
: AnomalyDetectorsIndex.jobStateIndexWriteAlias();
|
||||
|
||||
Persistable persistable = new Persistable(indexOrAlias, quantiles.getJobId(), quantiles, quantilesDocId);
|
||||
persistable.persist(shouldRetry);
|
||||
persistable.persist(shouldRetry, AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -311,7 +311,7 @@ public class JobResultsPersister {
|
||||
|
||||
Persistable persistable = new Persistable(indexOrAlias, quantiles.getJobId(), quantiles, quantilesDocId);
|
||||
persistable.setRefreshPolicy(refreshPolicy);
|
||||
persistable.persist(listener);
|
||||
persistable.persist(listener, AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias));
|
||||
},
|
||||
listener::onFailure
|
||||
);
|
||||
@ -345,7 +345,7 @@ public class JobResultsPersister {
|
||||
modelSnapshot,
|
||||
ModelSnapshot.documentId(modelSnapshot));
|
||||
persistable.setRefreshPolicy(refreshPolicy);
|
||||
return persistable.persist(shouldRetry);
|
||||
return persistable.persist(shouldRetry, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -356,7 +356,7 @@ public class JobResultsPersister {
|
||||
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
|
||||
Persistable persistable =
|
||||
new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(jobId), jobId, modelSizeStats, modelSizeStats.getId());
|
||||
persistable.persist(shouldRetry);
|
||||
persistable.persist(shouldRetry, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -369,7 +369,7 @@ public class JobResultsPersister {
|
||||
Persistable persistable =
|
||||
new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(jobId), jobId, modelSizeStats, modelSizeStats.getId());
|
||||
persistable.setRefreshPolicy(refreshPolicy);
|
||||
persistable.persist(listener);
|
||||
persistable.persist(listener, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -448,7 +448,7 @@ public class JobResultsPersister {
|
||||
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
|
||||
DatafeedTimingStats.documentId(timingStats.getJobId()));
|
||||
persistable.setRefreshPolicy(refreshPolicy);
|
||||
return persistable.persist(() -> true);
|
||||
return persistable.persist(() -> true, true);
|
||||
}
|
||||
|
||||
private static XContentBuilder toXContentBuilder(ToXContent obj, ToXContent.Params params) throws IOException {
|
||||
@ -483,7 +483,7 @@ public class JobResultsPersister {
|
||||
this.refreshPolicy = refreshPolicy;
|
||||
}
|
||||
|
||||
BulkResponse persist(Supplier<Boolean> shouldRetry) {
|
||||
BulkResponse persist(Supplier<Boolean> shouldRetry, boolean requireAlias) {
|
||||
logCall(indexName);
|
||||
try {
|
||||
return resultsPersisterService.indexWithRetry(jobId,
|
||||
@ -492,6 +492,7 @@ public class JobResultsPersister {
|
||||
params,
|
||||
refreshPolicy,
|
||||
id,
|
||||
requireAlias,
|
||||
shouldRetry,
|
||||
(msg) -> auditor.warning(jobId, id + " " + msg));
|
||||
} catch (IOException e) {
|
||||
@ -504,11 +505,15 @@ public class JobResultsPersister {
|
||||
}
|
||||
}
|
||||
|
||||
void persist(ActionListener<IndexResponse> listener) {
|
||||
void persist(ActionListener<IndexResponse> listener, boolean requireAlias) {
|
||||
logCall(indexName);
|
||||
|
||||
try (XContentBuilder content = toXContentBuilder(object, params)) {
|
||||
IndexRequest indexRequest = new IndexRequest(indexName).id(id).source(content).setRefreshPolicy(refreshPolicy);
|
||||
IndexRequest indexRequest = new IndexRequest(indexName)
|
||||
.id(id)
|
||||
.source(content)
|
||||
.setRefreshPolicy(refreshPolicy)
|
||||
.setRequireAlias(requireAlias);
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest, listener, client::index);
|
||||
} catch (IOException e) {
|
||||
logger.error(new ParameterizedMessage("[{}] Error writing [{}]", jobId, (id == null) ? "auto-generated ID" : id), e);
|
||||
|
@ -137,7 +137,9 @@ public class IndexingStateProcessor implements StateProcessor {
|
||||
}
|
||||
|
||||
void persist(String indexOrAlias, BytesReference bytes) throws IOException {
|
||||
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
BulkRequest bulkRequest = new BulkRequest()
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
||||
.requireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias));
|
||||
bulkRequest.add(bytes, indexOrAlias, XContentType.JSON);
|
||||
if (bulkRequest.numberOfActions() > 0) {
|
||||
LOGGER.trace("[{}] Persisting job state document: index [{}], length [{}]", jobId, indexOrAlias, bytes.length());
|
||||
|
@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
@ -44,6 +45,8 @@ import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.ExceptionsHelper.status;
|
||||
|
||||
public class ResultsPersisterService {
|
||||
/**
|
||||
* List of rest statuses that we consider irrecoverable
|
||||
@ -52,6 +55,7 @@ public class ResultsPersisterService {
|
||||
Arrays.asList(
|
||||
RestStatus.GONE,
|
||||
RestStatus.NOT_IMPLEMENTED,
|
||||
// Not found is returned when we require an alias but the index is NOT an alias.
|
||||
RestStatus.NOT_FOUND,
|
||||
RestStatus.BAD_REQUEST,
|
||||
RestStatus.UNAUTHORIZED,
|
||||
@ -105,9 +109,10 @@ public class ResultsPersisterService {
|
||||
ToXContent.Params params,
|
||||
WriteRequest.RefreshPolicy refreshPolicy,
|
||||
String id,
|
||||
boolean requireAlias,
|
||||
Supplier<Boolean> shouldRetry,
|
||||
Consumer<String> msgHandler) throws IOException {
|
||||
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy);
|
||||
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy).requireAlias(requireAlias);
|
||||
try (XContentBuilder content = object.toXContent(XContentFactory.jsonBuilder(), params)) {
|
||||
bulkRequest.add(new IndexRequest(indexName).id(id).source(content));
|
||||
}
|
||||
@ -144,19 +149,18 @@ public class ResultsPersisterService {
|
||||
return bulkResponse;
|
||||
}
|
||||
for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
|
||||
if (itemResponse.isFailed()) {
|
||||
if (isIrrecoverable(itemResponse.getFailure().getCause())) {
|
||||
Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause());
|
||||
LOGGER.warn(new ParameterizedMessage(
|
||||
"[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]",
|
||||
jobId,
|
||||
bulkResponse.buildFailureMessage()),
|
||||
unwrappedParticular);
|
||||
throw new ElasticsearchException(
|
||||
"{} experienced failure that cannot be automatically retried. See logs for bulk failures",
|
||||
unwrappedParticular,
|
||||
jobId);
|
||||
}
|
||||
if (itemResponse.isFailed() && isIrrecoverable(itemResponse.getFailure().getCause())) {
|
||||
Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause());
|
||||
LOGGER.warn(new ParameterizedMessage(
|
||||
"[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]",
|
||||
jobId,
|
||||
bulkResponse.buildFailureMessage()),
|
||||
unwrappedParticular);
|
||||
throw new ElasticsearchStatusException(
|
||||
"{} experienced failure that cannot be automatically retried. See logs for bulk failures",
|
||||
status(unwrappedParticular),
|
||||
unwrappedParticular,
|
||||
jobId);
|
||||
}
|
||||
}
|
||||
retryContext.nextIteration("index", bulkResponse.buildFailureMessage());
|
||||
@ -183,7 +187,11 @@ public class ResultsPersisterService {
|
||||
failureMessage = e.getDetailedMessage();
|
||||
if (isIrrecoverable(e)) {
|
||||
LOGGER.warn(new ParameterizedMessage("[{}] experienced failure that cannot be automatically retried", jobId), e);
|
||||
throw new ElasticsearchException("{} experienced failure that cannot be automatically retried", e, jobId);
|
||||
throw new ElasticsearchStatusException(
|
||||
"{} experienced failure that cannot be automatically retried",
|
||||
status(e),
|
||||
e,
|
||||
jobId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -197,10 +205,7 @@ public class ResultsPersisterService {
|
||||
*/
|
||||
private static boolean isIrrecoverable(Exception ex) {
|
||||
Throwable t = ExceptionsHelper.unwrapCause(ex);
|
||||
if (t instanceof ElasticsearchException) {
|
||||
return IRRECOVERABLE_REST_STATUSES.contains(((ElasticsearchException) t).status());
|
||||
}
|
||||
return false;
|
||||
return IRRECOVERABLE_REST_STATUSES.contains(status(t));
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user