mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
As we prepare to introduce a new index for storing additional information about data frame analytics jobs (e.g. intrumentation), renaming this class to `DestinationIndex` better captures what it does and leaves its prior name available for a more suitable use. Backport of #51353 Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
parent
144e037941
commit
3443d69883
@ -186,7 +186,7 @@ public class DataFrameAnalyticsManager {
|
|||||||
reindexRequest.setSourceQuery(config.getSource().getParsedQuery());
|
reindexRequest.setSourceQuery(config.getSource().getParsedQuery());
|
||||||
reindexRequest.getSearchRequest().source().fetchSource(config.getSource().getSourceFiltering());
|
reindexRequest.getSearchRequest().source().fetchSource(config.getSource().getSourceFiltering());
|
||||||
reindexRequest.setDestIndex(config.getDest().getIndex());
|
reindexRequest.setDestIndex(config.getDest().getIndex());
|
||||||
reindexRequest.setScript(new Script("ctx._source." + DataFrameAnalyticsIndex.ID_COPY + " = ctx._id"));
|
reindexRequest.setScript(new Script("ctx._source." + DestinationIndex.ID_COPY + " = ctx._id"));
|
||||||
|
|
||||||
final ThreadContext threadContext = client.threadPool().getThreadContext();
|
final ThreadContext threadContext = client.threadPool().getThreadContext();
|
||||||
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
|
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
|
||||||
@ -206,7 +206,7 @@ public class DataFrameAnalyticsManager {
|
|||||||
config.getId(),
|
config.getId(),
|
||||||
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX, indexResponse.indices()[0]));
|
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX, indexResponse.indices()[0]));
|
||||||
LOGGER.info("[{}] Using existing destination index [{}]", config.getId(), indexResponse.indices()[0]);
|
LOGGER.info("[{}] Using existing destination index [{}]", config.getId(), indexResponse.indices()[0]);
|
||||||
DataFrameAnalyticsIndex.updateMappingsToDestIndex(client, config, indexResponse, ActionListener.wrap(
|
DestinationIndex.updateMappingsToDestIndex(client, config, indexResponse, ActionListener.wrap(
|
||||||
acknowledgedResponse -> copyIndexCreatedListener.onResponse(null),
|
acknowledgedResponse -> copyIndexCreatedListener.onResponse(null),
|
||||||
copyIndexCreatedListener::onFailure
|
copyIndexCreatedListener::onFailure
|
||||||
));
|
));
|
||||||
@ -217,7 +217,7 @@ public class DataFrameAnalyticsManager {
|
|||||||
config.getId(),
|
config.getId(),
|
||||||
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX, config.getDest().getIndex()));
|
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX, config.getDest().getIndex()));
|
||||||
LOGGER.info("[{}] Creating destination index [{}]", config.getId(), config.getDest().getIndex());
|
LOGGER.info("[{}] Creating destination index [{}]", config.getId(), config.getDest().getIndex());
|
||||||
DataFrameAnalyticsIndex.createDestinationIndex(client, Clock.systemUTC(), config, copyIndexCreatedListener);
|
DestinationIndex.createDestinationIndex(client, Clock.systemUTC(), config, copyIndexCreatedListener);
|
||||||
} else {
|
} else {
|
||||||
copyIndexCreatedListener.onFailure(e);
|
copyIndexCreatedListener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
@ -43,9 +43,9 @@ import java.util.function.Supplier;
|
|||||||
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
|
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link DataFrameAnalyticsIndex} class encapsulates logic for creating destination index based on source index metadata.
|
* {@link DestinationIndex} class encapsulates logic for creating destination index based on source index metadata.
|
||||||
*/
|
*/
|
||||||
public final class DataFrameAnalyticsIndex {
|
public final class DestinationIndex {
|
||||||
|
|
||||||
public static final String ID_COPY = "ml__id_copy";
|
public static final String ID_COPY = "ml__id_copy";
|
||||||
|
|
||||||
@ -66,7 +66,7 @@ public final class DataFrameAnalyticsIndex {
|
|||||||
*/
|
*/
|
||||||
private static final String[] PRESERVED_SETTINGS = new String[] {"index.number_of_shards", "index.number_of_replicas"};
|
private static final String[] PRESERVED_SETTINGS = new String[] {"index.number_of_shards", "index.number_of_replicas"};
|
||||||
|
|
||||||
private DataFrameAnalyticsIndex() {}
|
private DestinationIndex() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates destination index based on source index metadata.
|
* Creates destination index based on source index metadata.
|
@ -24,7 +24,7 @@ import org.elasticsearch.search.fetch.StoredFieldsContext;
|
|||||||
import org.elasticsearch.search.sort.SortOrder;
|
import org.elasticsearch.search.sort.SortOrder;
|
||||||
import org.elasticsearch.xpack.core.ClientHelper;
|
import org.elasticsearch.xpack.core.ClientHelper;
|
||||||
import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
|
import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
|
||||||
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsIndex;
|
import org.elasticsearch.xpack.ml.dataframe.DestinationIndex;
|
||||||
import org.elasticsearch.xpack.ml.extractor.ExtractedField;
|
import org.elasticsearch.xpack.ml.extractor.ExtractedField;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -131,7 +131,7 @@ public class DataFrameDataExtractor {
|
|||||||
.setScroll(SCROLL_TIMEOUT)
|
.setScroll(SCROLL_TIMEOUT)
|
||||||
// This ensures the search throws if there are failures and the scroll context gets cleared automatically
|
// This ensures the search throws if there are failures and the scroll context gets cleared automatically
|
||||||
.setAllowPartialSearchResults(false)
|
.setAllowPartialSearchResults(false)
|
||||||
.addSort(DataFrameAnalyticsIndex.ID_COPY, SortOrder.ASC)
|
.addSort(DestinationIndex.ID_COPY, SortOrder.ASC)
|
||||||
.setIndices(context.indices)
|
.setIndices(context.indices)
|
||||||
.setSize(context.scrollSize)
|
.setSize(context.scrollSize)
|
||||||
.setQuery(context.query);
|
.setQuery(context.query);
|
||||||
|
@ -26,7 +26,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.explain.FieldSelection;
|
|||||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||||
import org.elasticsearch.xpack.core.ml.utils.NameResolver;
|
import org.elasticsearch.xpack.core.ml.utils.NameResolver;
|
||||||
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsIndex;
|
import org.elasticsearch.xpack.ml.dataframe.DestinationIndex;
|
||||||
import org.elasticsearch.xpack.ml.extractor.ExtractedField;
|
import org.elasticsearch.xpack.ml.extractor.ExtractedField;
|
||||||
import org.elasticsearch.xpack.ml.extractor.ExtractedFields;
|
import org.elasticsearch.xpack.ml.extractor.ExtractedFields;
|
||||||
|
|
||||||
@ -52,7 +52,7 @@ public class ExtractedFieldsDetector {
|
|||||||
* Fields to ignore. These are mostly internal meta fields.
|
* Fields to ignore. These are mostly internal meta fields.
|
||||||
*/
|
*/
|
||||||
private static final List<String> IGNORE_FIELDS = Arrays.asList("_id", "_field_names", "_index", "_parent", "_routing", "_seq_no",
|
private static final List<String> IGNORE_FIELDS = Arrays.asList("_id", "_field_names", "_index", "_parent", "_routing", "_seq_no",
|
||||||
"_source", "_type", "_uid", "_version", "_feature", "_ignored", DataFrameAnalyticsIndex.ID_COPY);
|
"_source", "_type", "_uid", "_version", "_feature", "_ignored", DestinationIndex.ID_COPY);
|
||||||
|
|
||||||
private final String[] index;
|
private final String[] index;
|
||||||
private final DataFrameAnalyticsConfig config;
|
private final DataFrameAnalyticsConfig config;
|
||||||
|
@ -68,7 +68,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
|
|||||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class DataFrameAnalyticsIndexTests extends ESTestCase {
|
public class DestinationIndexTests extends ESTestCase {
|
||||||
|
|
||||||
private static final String ANALYTICS_ID = "some-analytics-id";
|
private static final String ANALYTICS_ID = "some-analytics-id";
|
||||||
private static final String[] SOURCE_INDEX = new String[] {"source-index"};
|
private static final String[] SOURCE_INDEX = new String[] {"source-index"};
|
||||||
@ -155,7 +155,7 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase {
|
|||||||
doAnswer(callListenerOnResponse(getMappingsResponse))
|
doAnswer(callListenerOnResponse(getMappingsResponse))
|
||||||
.when(client).execute(eq(GetMappingsAction.INSTANCE), getMappingsRequestCaptor.capture(), any());
|
.when(client).execute(eq(GetMappingsAction.INSTANCE), getMappingsRequestCaptor.capture(), any());
|
||||||
|
|
||||||
DataFrameAnalyticsIndex.createDestinationIndex(
|
DestinationIndex.createDestinationIndex(
|
||||||
client,
|
client,
|
||||||
clock,
|
clock,
|
||||||
config,
|
config,
|
||||||
@ -246,7 +246,7 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase {
|
|||||||
doAnswer(callListenerOnResponse(getSettingsResponse)).when(client).execute(eq(GetSettingsAction.INSTANCE), any(), any());
|
doAnswer(callListenerOnResponse(getSettingsResponse)).when(client).execute(eq(GetSettingsAction.INSTANCE), any(), any());
|
||||||
doAnswer(callListenerOnResponse(getMappingsResponse)).when(client).execute(eq(GetMappingsAction.INSTANCE), any(), any());
|
doAnswer(callListenerOnResponse(getMappingsResponse)).when(client).execute(eq(GetMappingsAction.INSTANCE), any(), any());
|
||||||
|
|
||||||
DataFrameAnalyticsIndex.createDestinationIndex(
|
DestinationIndex.createDestinationIndex(
|
||||||
client,
|
client,
|
||||||
clock,
|
clock,
|
||||||
config,
|
config,
|
||||||
@ -290,7 +290,7 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase {
|
|||||||
doAnswer(callListenerOnResponse(new AcknowledgedResponse(true)))
|
doAnswer(callListenerOnResponse(new AcknowledgedResponse(true)))
|
||||||
.when(client).execute(eq(PutMappingAction.INSTANCE), putMappingRequestCaptor.capture(), any());
|
.when(client).execute(eq(PutMappingAction.INSTANCE), putMappingRequestCaptor.capture(), any());
|
||||||
|
|
||||||
DataFrameAnalyticsIndex.updateMappingsToDestIndex(
|
DestinationIndex.updateMappingsToDestIndex(
|
||||||
client,
|
client,
|
||||||
config,
|
config,
|
||||||
getIndexResponse,
|
getIndexResponse,
|
||||||
@ -362,7 +362,7 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase {
|
|||||||
ElasticsearchStatusException e =
|
ElasticsearchStatusException e =
|
||||||
expectThrows(
|
expectThrows(
|
||||||
ElasticsearchStatusException.class,
|
ElasticsearchStatusException.class,
|
||||||
() -> DataFrameAnalyticsIndex.updateMappingsToDestIndex(
|
() -> DestinationIndex.updateMappingsToDestIndex(
|
||||||
client, config, getIndexResponse, ActionListener.wrap(Assert::fail)));
|
client, config, getIndexResponse, ActionListener.wrap(Assert::fail)));
|
||||||
assertThat(
|
assertThat(
|
||||||
e.getMessage(),
|
e.getMessage(),
|
Loading…
x
Reference in New Issue
Block a user