[Transform] fix page size return in cat transform, add dps (#57871)
fixes the page size reported after moving page size to settings(#56007) and adds documents per second(throttling) to the output. fixes #56498
This commit is contained in:
parent
f51f9b19c7
commit
95bd7b63b0
|
@ -20,9 +20,9 @@ Returns configuration and usage information about {transforms}.
|
|||
[[cat-transforms-api-prereqs]]
|
||||
==== {api-prereq-title}
|
||||
|
||||
* If the {es} {security-features} are enabled, you must have `monitor_transform`
|
||||
cluster privileges to use this API. The built-in `transform_user` role has these
|
||||
privileges. For more information, see <<security-privileges>> and
|
||||
* If the {es} {security-features} are enabled, you must have `monitor_transform`
|
||||
cluster privileges to use this API. The built-in `transform_user` role has these
|
||||
privileges. For more information, see <<security-privileges>> and
|
||||
<<built-in-roles>>.
|
||||
|
||||
//[[cat-transforms-api-desc]]
|
||||
|
@ -77,6 +77,10 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-index]
|
|||
`documents_indexed`, `doci`:::
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-indexed]
|
||||
|
||||
`docs_per_second`, `dps`:::
|
||||
(Default)
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-docs-per-second]
|
||||
|
||||
`documents_processed`, `docp`:::
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-processed]
|
||||
|
||||
|
@ -139,7 +143,7 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=state-transform]
|
|||
|
||||
`transform_type`, `tt`:::
|
||||
(Default)
|
||||
Indicates the type of {transform}: `batch` or `continuous`.
|
||||
Indicates the type of {transform}: `batch` or `continuous`.
|
||||
|
||||
`trigger_count`, `tc`:::
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=trigger-count]
|
||||
|
|
|
@ -55,8 +55,8 @@ teardown:
|
|||
transform_id: "airline-transform-stats"
|
||||
- match:
|
||||
$body: |
|
||||
/^ #id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state \n
|
||||
(airline\-transform\-stats \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data \s+ airline-data-by-airline \s+ \s+ batch \s+ 1m \s+ 500 \s+ STOPPED \n)+ $/
|
||||
/^ #id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state \n
|
||||
(airline\-transform\-stats \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data \s+ airline-data-by-airline \s+ \s+ batch \s+ 1m \s+ 500 \s+ - \s+ STOPPED \n)+ $/
|
||||
|
||||
---
|
||||
"Test cat transform stats with column selection":
|
||||
|
@ -95,8 +95,8 @@ teardown:
|
|||
v: true
|
||||
- match:
|
||||
$body: |
|
||||
/^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state \n
|
||||
(airline\-transform\-batch \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-batch \s+ \s+ description \s+ batch \s+ 1m \s+ 500 \s+ STOPPED \n)+ $/
|
||||
/^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state \n
|
||||
(airline\-transform\-batch \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-batch \s+ \s+ description \s+ batch \s+ 1m \s+ 500 \s+ - \s+ STOPPED \n)+ $/
|
||||
- do:
|
||||
transform.delete_transform:
|
||||
transform_id: "airline-transform-batch"
|
||||
|
@ -131,8 +131,8 @@ teardown:
|
|||
v: true
|
||||
- match:
|
||||
$body: |
|
||||
/^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state \n
|
||||
(airline\-transform\-continuous \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-continuous \s+ \s+ description \s+ continuous \s+ 10s \s+ 500 \s+ STOPPED \n)+ $/
|
||||
/^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state \n
|
||||
(airline\-transform\-continuous \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-continuous \s+ \s+ description \s+ continuous \s+ 10s \s+ 500 \s+ - \s+ STOPPED \n)+ $/
|
||||
- do:
|
||||
transform.delete_transform:
|
||||
transform_id: "airline-transform-continuous"
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Setting.Property;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry.Entry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
|
@ -132,6 +133,8 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
|
|||
private final SetOnce<TransformServices> transformServices = new SetOnce<>();
|
||||
|
||||
public static final int DEFAULT_FAILURE_RETRIES = 10;
|
||||
public static final Integer DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500);
|
||||
public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000);
|
||||
|
||||
// How many times the transform task can retry on an non-critical failure
|
||||
public static final Setting<Integer> NUM_FAILURE_RETRIES_SETTING = Setting.intSetting(
|
||||
|
@ -283,12 +286,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
|
|||
|
||||
TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry);
|
||||
TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName());
|
||||
TransformCheckpointService checkpointService = new TransformCheckpointService(
|
||||
settings,
|
||||
clusterService,
|
||||
configManager,
|
||||
auditor
|
||||
);
|
||||
TransformCheckpointService checkpointService = new TransformCheckpointService(settings, clusterService, configManager, auditor);
|
||||
SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC());
|
||||
|
||||
transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));
|
||||
|
|
|
@ -10,20 +10,21 @@ import org.elasticsearch.cluster.metadata.Metadata;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.Table;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.xpack.core.action.util.PageParams;
|
||||
import org.elasticsearch.xpack.core.common.table.TableColumnAttributeBuilder;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.RestResponse;
|
||||
import org.elasticsearch.rest.action.RestActionListener;
|
||||
import org.elasticsearch.rest.action.RestResponseListener;
|
||||
import org.elasticsearch.rest.action.cat.AbstractCatAction;
|
||||
import org.elasticsearch.rest.action.cat.RestTable;
|
||||
import org.elasticsearch.xpack.core.action.util.PageParams;
|
||||
import org.elasticsearch.xpack.core.common.table.TableColumnAttributeBuilder;
|
||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
||||
import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
|
||||
import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
|
||||
import org.elasticsearch.xpack.transform.Transform;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -37,9 +38,6 @@ import static org.elasticsearch.xpack.core.transform.TransformField.ALLOW_NO_MAT
|
|||
|
||||
public class RestCatTransformAction extends AbstractCatAction {
|
||||
|
||||
private static final Integer DEFAULT_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500);
|
||||
private static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000);
|
||||
|
||||
@Override
|
||||
public List<Route> routes() {
|
||||
return unmodifiableList(asList(
|
||||
|
@ -66,8 +64,10 @@ public class RestCatTransformAction extends AbstractCatAction {
|
|||
statsRequest.setAllowNoMatch(restRequest.paramAsBoolean(ALLOW_NO_MATCH.getPreferredName(), true));
|
||||
|
||||
if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
|
||||
PageParams pageParams = new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
|
||||
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE));
|
||||
PageParams pageParams = new PageParams(
|
||||
restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
|
||||
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)
|
||||
);
|
||||
request.setPageParams(pageParams);
|
||||
statsRequest.setPageParams(pageParams);
|
||||
}
|
||||
|
@ -101,138 +101,113 @@ public class RestCatTransformAction extends AbstractCatAction {
|
|||
}
|
||||
|
||||
private static Table getTableWithHeader() {
|
||||
return new Table()
|
||||
.startHeaders()
|
||||
return new Table().startHeaders()
|
||||
// Transform config info
|
||||
.addCell("id", TableColumnAttributeBuilder.builder("the id").build())
|
||||
.addCell("create_time",
|
||||
TableColumnAttributeBuilder.builder("transform creation time")
|
||||
.setAliases("ct", "createTime")
|
||||
.build())
|
||||
.addCell("version",
|
||||
TableColumnAttributeBuilder.builder("the version of Elasticsearch when the transform was created")
|
||||
.setAliases("v")
|
||||
.build())
|
||||
.addCell("source_index",
|
||||
TableColumnAttributeBuilder.builder("source index")
|
||||
.setAliases("si", "sourceIndex")
|
||||
.build())
|
||||
.addCell("dest_index",
|
||||
TableColumnAttributeBuilder.builder("destination index")
|
||||
.setAliases("di", "destIndex")
|
||||
.build())
|
||||
.addCell("pipeline",
|
||||
TableColumnAttributeBuilder.builder("transform pipeline")
|
||||
.setAliases("p")
|
||||
.build())
|
||||
.addCell("description",
|
||||
TableColumnAttributeBuilder.builder("description")
|
||||
.setAliases("d")
|
||||
.build())
|
||||
.addCell("transform_type",
|
||||
TableColumnAttributeBuilder.builder("batch or continuous transform")
|
||||
.setAliases("tt")
|
||||
.build())
|
||||
.addCell("frequency",
|
||||
TableColumnAttributeBuilder.builder("frequency of transform")
|
||||
.setAliases("f")
|
||||
.build())
|
||||
.addCell("max_page_search_size",
|
||||
TableColumnAttributeBuilder.builder("max page search size")
|
||||
.setAliases("mpsz")
|
||||
.build())
|
||||
|
||||
.addCell("create_time", TableColumnAttributeBuilder.builder("transform creation time").setAliases("ct", "createTime").build())
|
||||
.addCell(
|
||||
"version",
|
||||
TableColumnAttributeBuilder.builder("the version of Elasticsearch when the transform was created").setAliases("v").build()
|
||||
)
|
||||
.addCell("source_index", TableColumnAttributeBuilder.builder("source index").setAliases("si", "sourceIndex").build())
|
||||
.addCell("dest_index", TableColumnAttributeBuilder.builder("destination index").setAliases("di", "destIndex").build())
|
||||
.addCell("pipeline", TableColumnAttributeBuilder.builder("transform pipeline").setAliases("p").build())
|
||||
.addCell("description", TableColumnAttributeBuilder.builder("description").setAliases("d").build())
|
||||
.addCell("transform_type", TableColumnAttributeBuilder.builder("batch or continuous transform").setAliases("tt").build())
|
||||
.addCell("frequency", TableColumnAttributeBuilder.builder("frequency of transform").setAliases("f").build())
|
||||
.addCell("max_page_search_size", TableColumnAttributeBuilder.builder("max page search size").setAliases("mpsz").build())
|
||||
.addCell("docs_per_second", TableColumnAttributeBuilder.builder("docs per second").setAliases("dps").build())
|
||||
// Transform stats info
|
||||
.addCell("state",
|
||||
.addCell(
|
||||
"state",
|
||||
TableColumnAttributeBuilder.builder("transform state")
|
||||
.setAliases("s")
|
||||
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
|
||||
.build())
|
||||
.addCell("reason",
|
||||
TableColumnAttributeBuilder.builder("reason for the current state", false)
|
||||
.setAliases("r", "reason")
|
||||
.build())
|
||||
.addCell("changes_last_detection_time",
|
||||
TableColumnAttributeBuilder.builder("changes last detected time", false)
|
||||
.setAliases("cldt")
|
||||
.build())
|
||||
.addCell("search_total",
|
||||
TableColumnAttributeBuilder.builder("total number of search phases", false)
|
||||
.setAliases("st")
|
||||
.build())
|
||||
.addCell("search_failure",
|
||||
TableColumnAttributeBuilder.builder("total number of search failures", false)
|
||||
.setAliases("sf")
|
||||
.build())
|
||||
.addCell("search_time",
|
||||
TableColumnAttributeBuilder.builder("total search time", false)
|
||||
.setAliases("stime")
|
||||
.build())
|
||||
.addCell("index_total",
|
||||
TableColumnAttributeBuilder.builder("total number of index phases done by the transform", false)
|
||||
.setAliases("it")
|
||||
.build())
|
||||
.addCell("index_failure",
|
||||
TableColumnAttributeBuilder.builder("total number of index failures", false)
|
||||
.setAliases("if")
|
||||
.build())
|
||||
.addCell("index_time",
|
||||
TableColumnAttributeBuilder.builder("total time spent indexing documents", false)
|
||||
.setAliases("itime")
|
||||
.build())
|
||||
.addCell("documents_processed",
|
||||
TableColumnAttributeBuilder.builder("the number of documents read from source indices and processed",
|
||||
false)
|
||||
.build()
|
||||
)
|
||||
.addCell("reason", TableColumnAttributeBuilder.builder("reason for the current state", false).setAliases("r", "reason").build())
|
||||
.addCell(
|
||||
"changes_last_detection_time",
|
||||
TableColumnAttributeBuilder.builder("changes last detected time", false).setAliases("cldt").build()
|
||||
)
|
||||
.addCell("search_total", TableColumnAttributeBuilder.builder("total number of search phases", false).setAliases("st").build())
|
||||
.addCell(
|
||||
"search_failure",
|
||||
TableColumnAttributeBuilder.builder("total number of search failures", false).setAliases("sf").build()
|
||||
)
|
||||
.addCell("search_time", TableColumnAttributeBuilder.builder("total search time", false).setAliases("stime").build())
|
||||
.addCell(
|
||||
"index_total",
|
||||
TableColumnAttributeBuilder.builder("total number of index phases done by the transform", false).setAliases("it").build()
|
||||
)
|
||||
.addCell("index_failure", TableColumnAttributeBuilder.builder("total number of index failures", false).setAliases("if").build())
|
||||
.addCell(
|
||||
"index_time",
|
||||
TableColumnAttributeBuilder.builder("total time spent indexing documents", false).setAliases("itime").build()
|
||||
)
|
||||
.addCell(
|
||||
"documents_processed",
|
||||
TableColumnAttributeBuilder.builder("the number of documents read from source indices and processed", false)
|
||||
.setAliases("docp")
|
||||
.build())
|
||||
.addCell("documents_indexed",
|
||||
TableColumnAttributeBuilder.builder("the number of documents index to the destination index",
|
||||
false)
|
||||
.build()
|
||||
)
|
||||
.addCell(
|
||||
"documents_indexed",
|
||||
TableColumnAttributeBuilder.builder("the number of documents index to the destination index", false)
|
||||
.setAliases("doci")
|
||||
.build())
|
||||
.addCell("trigger_count",
|
||||
TableColumnAttributeBuilder.builder("the number of times the transform has been triggered", false)
|
||||
.setAliases("tc")
|
||||
.build())
|
||||
.addCell("pages_processed",
|
||||
TableColumnAttributeBuilder.builder("the number of pages processed", false)
|
||||
.setAliases("pp")
|
||||
.build())
|
||||
.addCell("processing_time",
|
||||
TableColumnAttributeBuilder.builder("the total time spent processing documents", false)
|
||||
.setAliases("pt")
|
||||
.build())
|
||||
.addCell("checkpoint_duration_time_exp_avg",
|
||||
.build()
|
||||
)
|
||||
.addCell(
|
||||
"trigger_count",
|
||||
TableColumnAttributeBuilder.builder("the number of times the transform has been triggered", false).setAliases("tc").build()
|
||||
)
|
||||
.addCell(
|
||||
"pages_processed",
|
||||
TableColumnAttributeBuilder.builder("the number of pages processed", false).setAliases("pp").build()
|
||||
)
|
||||
.addCell(
|
||||
"processing_time",
|
||||
TableColumnAttributeBuilder.builder("the total time spent processing documents", false).setAliases("pt").build()
|
||||
)
|
||||
.addCell(
|
||||
"checkpoint_duration_time_exp_avg",
|
||||
TableColumnAttributeBuilder.builder("exponential average checkpoint processing time (milliseconds)", false)
|
||||
.setAliases("cdtea", "checkpointTimeExpAvg")
|
||||
.build())
|
||||
.addCell("indexed_documents_exp_avg",
|
||||
TableColumnAttributeBuilder.builder("exponential average number of documents indexed", false)
|
||||
.setAliases("idea")
|
||||
.build())
|
||||
.addCell("processed_documents_exp_avg",
|
||||
TableColumnAttributeBuilder.builder("exponential average number of documents processed", false)
|
||||
.setAliases("pdea")
|
||||
.build())
|
||||
.build()
|
||||
)
|
||||
.addCell(
|
||||
"indexed_documents_exp_avg",
|
||||
TableColumnAttributeBuilder.builder("exponential average number of documents indexed", false).setAliases("idea").build()
|
||||
)
|
||||
.addCell(
|
||||
"processed_documents_exp_avg",
|
||||
TableColumnAttributeBuilder.builder("exponential average number of documents processed", false).setAliases("pdea").build()
|
||||
)
|
||||
.endHeaders();
|
||||
}
|
||||
|
||||
private Table buildTable(GetTransformAction.Response response, GetTransformStatsAction.Response statsResponse) {
|
||||
Table table = getTableWithHeader();
|
||||
Map<String, TransformStats> statsById = statsResponse.getTransformsStats().stream()
|
||||
.collect(Collectors.toMap(TransformStats::getId, Function.identity()));
|
||||
Map<String, TransformStats> statsById = statsResponse.getTransformsStats()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(TransformStats::getId, Function.identity()));
|
||||
response.getTransformConfigurations().forEach(config -> {
|
||||
TransformStats stats = statsById.get(config.getId());
|
||||
TransformCheckpointingInfo checkpointingInfo = null;
|
||||
TransformIndexerStats transformIndexerStats = null;
|
||||
|
||||
if(stats != null) {
|
||||
if (stats != null) {
|
||||
checkpointingInfo = stats.getCheckpointingInfo();
|
||||
transformIndexerStats = stats.getIndexerStats();
|
||||
}
|
||||
|
||||
table
|
||||
.startRow()
|
||||
Integer maxPageSearchSize = config.getSettings() == null || config.getSettings().getMaxPageSearchSize() == null
|
||||
? config.getPivotConfig() == null || config.getPivotConfig().getMaxPageSearchSize() == null
|
||||
? Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE
|
||||
: config.getPivotConfig().getMaxPageSearchSize()
|
||||
: config.getSettings().getMaxPageSearchSize();
|
||||
|
||||
table.startRow()
|
||||
.addCell(config.getId())
|
||||
.addCell(config.getCreateTime())
|
||||
.addCell(config.getVersion())
|
||||
|
@ -241,9 +216,13 @@ public class RestCatTransformAction extends AbstractCatAction {
|
|||
.addCell(config.getDestination().getPipeline())
|
||||
.addCell(config.getDescription())
|
||||
.addCell(config.getSyncConfig() == null ? "batch" : "continuous")
|
||||
.addCell(config.getFrequency() == null ? DEFAULT_TRANSFORM_FREQUENCY : config.getFrequency())
|
||||
.addCell(config.getPivotConfig() == null || config.getPivotConfig().getMaxPageSearchSize() == null ?
|
||||
DEFAULT_MAX_PAGE_SEARCH_SIZE : config.getPivotConfig().getMaxPageSearchSize())
|
||||
.addCell(config.getFrequency() == null ? Transform.DEFAULT_TRANSFORM_FREQUENCY : config.getFrequency())
|
||||
.addCell(maxPageSearchSize)
|
||||
.addCell(
|
||||
config.getSettings() == null || config.getSettings().getDocsPerSecond() == null
|
||||
? "-"
|
||||
: config.getSettings().getDocsPerSecond()
|
||||
)
|
||||
.addCell(stats == null ? null : stats.getState())
|
||||
.addCell(stats == null ? null : stats.getReason())
|
||||
.addCell(checkpointingInfo == null ? null : checkpointingInfo.getChangesLastDetectedAt())
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
|
|||
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
|
||||
import org.elasticsearch.xpack.transform.Transform;
|
||||
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
|
||||
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
||||
|
||||
|
@ -49,7 +50,6 @@ import static org.elasticsearch.xpack.core.transform.TransformMessages.CANNOT_ST
|
|||
public class TransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener, TransformContext.Listener {
|
||||
|
||||
// Default interval the scheduler sends an event if the config does not specify a frequency
|
||||
private static final long SCHEDULER_NEXT_MILLISECONDS = 60000;
|
||||
private static final Logger logger = LogManager.getLogger(TransformTask.class);
|
||||
private static final IndexerState[] RUNNING_STATES = new IndexerState[] { IndexerState.STARTED, IndexerState.INDEXING };
|
||||
public static final String SCHEDULE_NAME = TransformField.TASK_NAME + "/schedule";
|
||||
|
@ -538,7 +538,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
|
|||
private SchedulerEngine.Schedule next() {
|
||||
return (startTime, now) -> {
|
||||
TimeValue frequency = transform.getFrequency();
|
||||
return now + (frequency == null ? SCHEDULER_NEXT_MILLISECONDS : frequency.getMillis());
|
||||
return now + (frequency == null ? Transform.DEFAULT_TRANSFORM_FREQUENCY.getMillis() : frequency.getMillis());
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
|
|||
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
|
||||
import org.elasticsearch.xpack.transform.Transform;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
@ -49,7 +50,6 @@ import java.util.stream.Stream;
|
|||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
||||
public class Pivot {
|
||||
public static final int DEFAULT_INITIAL_PAGE_SIZE = 500;
|
||||
public static final int TEST_QUERY_PAGE_SIZE = 50;
|
||||
|
||||
private static final String COMPOSITE_AGGREGATION_NAME = "_transform";
|
||||
|
@ -103,12 +103,10 @@ public class Pivot {
|
|||
listener.onResponse(true);
|
||||
}, e -> {
|
||||
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
|
||||
RestStatus status = unwrapped instanceof ElasticsearchException ?
|
||||
((ElasticsearchException)unwrapped).status() :
|
||||
RestStatus.SERVICE_UNAVAILABLE;
|
||||
listener.onFailure(new ElasticsearchStatusException("Failed to test query",
|
||||
status,
|
||||
unwrapped));
|
||||
RestStatus status = unwrapped instanceof ElasticsearchException
|
||||
? ((ElasticsearchException) unwrapped).status()
|
||||
: RestStatus.SERVICE_UNAVAILABLE;
|
||||
listener.onFailure(new ElasticsearchStatusException("Failed to test query", status, unwrapped));
|
||||
}));
|
||||
}
|
||||
|
||||
|
@ -124,14 +122,14 @@ public class Pivot {
|
|||
* per page the page size is a multiplier for the costs of aggregating bucket.
|
||||
*
|
||||
* The user may set a maximum in the {@link PivotConfig#getMaxPageSearchSize()}, but if that is not provided,
|
||||
* the default {@link Pivot#DEFAULT_INITIAL_PAGE_SIZE} is used.
|
||||
* the default {@link Transform#DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE} is used.
|
||||
*
|
||||
* In future we might inspect the configuration and base the initial size on the aggregations used.
|
||||
*
|
||||
* @return the page size
|
||||
*/
|
||||
public int getInitialPageSize() {
|
||||
return config.getMaxPageSearchSize() == null ? DEFAULT_INITIAL_PAGE_SIZE : config.getMaxPageSearchSize();
|
||||
return config.getMaxPageSearchSize() == null ? Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE : config.getMaxPageSearchSize();
|
||||
}
|
||||
|
||||
public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map<String, Object> position, int pageSize) {
|
||||
|
|
|
@ -36,11 +36,11 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
|||
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
|
||||
import org.elasticsearch.xpack.transform.Transform;
|
||||
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
|
||||
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
|
||||
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
||||
import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager;
|
||||
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -240,7 +240,7 @@ public class TransformIndexerTests extends ESTestCase {
|
|||
new SettingsConfig(pageSize, null)
|
||||
);
|
||||
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
|
||||
final long initialPageSize = pageSize == null ? Pivot.DEFAULT_INITIAL_PAGE_SIZE : pageSize;
|
||||
final long initialPageSize = pageSize == null ? Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE : pageSize;
|
||||
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
|
||||
throw new SearchPhaseExecutionException(
|
||||
"query",
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
|
|||
import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
|
||||
import org.elasticsearch.xpack.transform.Transform;
|
||||
import org.elasticsearch.xpack.transform.transforms.pivot.Aggregations.AggregationType;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -108,7 +109,7 @@ public class PivotTests extends ESTestCase {
|
|||
assertThat(pivot.getInitialPageSize(), equalTo(expectedPageSize));
|
||||
|
||||
pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null));
|
||||
assertThat(pivot.getInitialPageSize(), equalTo(Pivot.DEFAULT_INITIAL_PAGE_SIZE));
|
||||
assertThat(pivot.getInitialPageSize(), equalTo(Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE));
|
||||
|
||||
assertWarnings("[max_page_search_size] is deprecated inside pivot please use settings instead");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue