[ML-DataFrame] Refactorings and tidying (#41248)
Remove unnecessary generic params from SingleGroupSource and unused code from the HLRC
This commit is contained in:
parent
6566979c18
commit
1d2365f5b6
|
@ -30,32 +30,9 @@ public abstract class SingleGroupSource implements ToXContentObject {
|
|||
protected static final ParseField FIELD = new ParseField("field");
|
||||
|
||||
public enum Type {
|
||||
TERMS(0),
|
||||
HISTOGRAM(1),
|
||||
DATE_HISTOGRAM(2);
|
||||
|
||||
private final byte id;
|
||||
|
||||
Type(int id) {
|
||||
this.id = (byte) id;
|
||||
}
|
||||
|
||||
public byte getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public static Type fromId(byte id) {
|
||||
switch (id) {
|
||||
case 0:
|
||||
return TERMS;
|
||||
case 1:
|
||||
return HISTOGRAM;
|
||||
case 2:
|
||||
return DATE_HISTOGRAM;
|
||||
default:
|
||||
throw new IllegalArgumentException("unknown type");
|
||||
}
|
||||
}
|
||||
TERMS,
|
||||
HISTOGRAM,
|
||||
DATE_HISTOGRAM;
|
||||
|
||||
public String value() {
|
||||
return name().toLowerCase(Locale.ROOT);
|
||||
|
|
|
@ -215,7 +215,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
}
|
||||
|
||||
public static DataFrameTransformConfig fromXContent(final XContentParser parser, @Nullable final String optionalTransformId,
|
||||
boolean lenient) throws IOException {
|
||||
boolean lenient) {
|
||||
|
||||
return lenient ? LENIENT_PARSER.apply(parser, optionalTransformId) : STRICT_PARSER.apply(parser, optionalTransformId);
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import java.time.ZoneOffset;
|
|||
import java.util.Objects;
|
||||
|
||||
|
||||
public class DateHistogramGroupSource extends SingleGroupSource<DateHistogramGroupSource> {
|
||||
public class DateHistogramGroupSource extends SingleGroupSource {
|
||||
|
||||
private static final String NAME = "data_frame_date_histogram_group";
|
||||
private static final ParseField TIME_ZONE = new ParseField("time_zone");
|
||||
|
@ -51,7 +51,7 @@ public class DateHistogramGroupSource extends SingleGroupSource<DateHistogramGro
|
|||
return new DateHistogramGroupSource(field);
|
||||
});
|
||||
|
||||
declareValuesSourceFields(parser, null);
|
||||
declareValuesSourceFields(parser);
|
||||
|
||||
parser.declareField((histogram, interval) -> {
|
||||
if (interval instanceof Long) {
|
||||
|
|
|
@ -40,9 +40,9 @@ public class GroupConfig implements Writeable, ToXContentObject {
|
|||
private static final Logger logger = LogManager.getLogger(GroupConfig.class);
|
||||
|
||||
private final Map<String, Object> source;
|
||||
private final Map<String, SingleGroupSource<?>> groups;
|
||||
private final Map<String, SingleGroupSource> groups;
|
||||
|
||||
public GroupConfig(final Map<String, Object> source, final Map<String, SingleGroupSource<?>> groups) {
|
||||
public GroupConfig(final Map<String, Object> source, final Map<String, SingleGroupSource> groups) {
|
||||
this.source = ExceptionsHelper.requireNonNull(source, DataFrameField.GROUP_BY.getPreferredName());
|
||||
this.groups = groups;
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ public class GroupConfig implements Writeable, ToXContentObject {
|
|||
});
|
||||
}
|
||||
|
||||
public Map <String, SingleGroupSource<?>> getGroups() {
|
||||
public Map <String, SingleGroupSource> getGroups() {
|
||||
return groups;
|
||||
}
|
||||
|
||||
|
@ -109,7 +109,7 @@ public class GroupConfig implements Writeable, ToXContentObject {
|
|||
public static GroupConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException {
|
||||
NamedXContentRegistry registry = parser.getXContentRegistry();
|
||||
Map<String, Object> source = parser.mapOrdered();
|
||||
Map<String, SingleGroupSource<?>> groups = null;
|
||||
Map<String, SingleGroupSource> groups = null;
|
||||
|
||||
if (source.isEmpty()) {
|
||||
if (lenient) {
|
||||
|
@ -133,9 +133,9 @@ public class GroupConfig implements Writeable, ToXContentObject {
|
|||
return new GroupConfig(source, groups);
|
||||
}
|
||||
|
||||
private static Map<String, SingleGroupSource<?>> parseGroupConfig(final XContentParser parser,
|
||||
private static Map<String, SingleGroupSource> parseGroupConfig(final XContentParser parser,
|
||||
boolean lenient) throws IOException {
|
||||
LinkedHashMap<String, SingleGroupSource<?>> groups = new LinkedHashMap<>();
|
||||
LinkedHashMap<String, SingleGroupSource> groups = new LinkedHashMap<>();
|
||||
|
||||
// be parsing friendly, whether the token needs to be advanced or not (similar to what ObjectParser does)
|
||||
XContentParser.Token token;
|
||||
|
@ -158,7 +158,7 @@ public class GroupConfig implements Writeable, ToXContentObject {
|
|||
|
||||
token = parser.nextToken();
|
||||
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
|
||||
SingleGroupSource<?> groupSource;
|
||||
SingleGroupSource groupSource;
|
||||
switch (groupType) {
|
||||
case TERMS:
|
||||
groupSource = TermsGroupSource.fromXContent(parser, lenient);
|
||||
|
|
|
@ -17,7 +17,7 @@ import java.util.Objects;
|
|||
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
|
||||
|
||||
public class HistogramGroupSource extends SingleGroupSource<HistogramGroupSource> {
|
||||
public class HistogramGroupSource extends SingleGroupSource {
|
||||
|
||||
static final ParseField INTERVAL = new ParseField("interval");
|
||||
private static final String NAME = "data_frame_histogram_group";
|
||||
|
@ -44,7 +44,7 @@ public class HistogramGroupSource extends SingleGroupSource<HistogramGroupSource
|
|||
double interval = (double) args[1];
|
||||
return new HistogramGroupSource(field, interval);
|
||||
});
|
||||
declareValuesSourceFields(parser, null);
|
||||
declareValuesSourceFields(parser);
|
||||
parser.declareDouble(optionalConstructorArg(), INTERVAL);
|
||||
return parser;
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ public class PivotConfig implements Writeable, ToXContentObject {
|
|||
builder.field(CompositeAggregationBuilder.SOURCES_FIELD_NAME.getPreferredName());
|
||||
builder.startArray();
|
||||
|
||||
for (Entry<String, SingleGroupSource<?>> groupBy : groups.getGroups().entrySet()) {
|
||||
for (Entry<String, SingleGroupSource> groupBy : groups.getGroups().entrySet()) {
|
||||
builder.startObject();
|
||||
builder.startObject(groupBy.getKey());
|
||||
builder.field(groupBy.getValue().getType().value(), groupBy.getValue());
|
||||
|
|
|
@ -13,7 +13,6 @@ import org.elasticsearch.common.io.stream.Writeable;
|
|||
import org.elasticsearch.common.xcontent.AbstractObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Locale;
|
||||
|
@ -24,7 +23,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
|
|||
/*
|
||||
* Base class for a single source for group_by
|
||||
*/
|
||||
public abstract class SingleGroupSource<AB extends SingleGroupSource<AB>> implements Writeable, ToXContentObject {
|
||||
public abstract class SingleGroupSource implements Writeable, ToXContentObject {
|
||||
|
||||
public enum Type {
|
||||
TERMS(0),
|
||||
|
@ -64,8 +63,7 @@ public abstract class SingleGroupSource<AB extends SingleGroupSource<AB>> implem
|
|||
// TODO: add script
|
||||
protected final String field;
|
||||
|
||||
static <VB extends SingleGroupSource<?>, T> void declareValuesSourceFields(AbstractObjectParser<VB, T> parser,
|
||||
ValueType targetValueType) {
|
||||
static <T> void declareValuesSourceFields(AbstractObjectParser<? extends SingleGroupSource, T> parser) {
|
||||
// either script or field
|
||||
parser.declareString(optionalConstructorArg(), FIELD);
|
||||
}
|
||||
|
@ -109,7 +107,7 @@ public abstract class SingleGroupSource<AB extends SingleGroupSource<AB>> implem
|
|||
return false;
|
||||
}
|
||||
|
||||
final SingleGroupSource<?> that = (SingleGroupSource<?>) other;
|
||||
final SingleGroupSource that = (SingleGroupSource) other;
|
||||
|
||||
return Objects.equals(this.field, that.field);
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ import java.io.IOException;
|
|||
/*
|
||||
* A terms aggregation source for group_by
|
||||
*/
|
||||
public class TermsGroupSource extends SingleGroupSource<TermsGroupSource> {
|
||||
public class TermsGroupSource extends SingleGroupSource {
|
||||
private static final String NAME = "data_frame_terms_group";
|
||||
|
||||
private static final ConstructingObjectParser<TermsGroupSource, Void> STRICT_PARSER = createParser(false);
|
||||
|
@ -27,7 +27,7 @@ public class TermsGroupSource extends SingleGroupSource<TermsGroupSource> {
|
|||
return new TermsGroupSource(field);
|
||||
});
|
||||
|
||||
SingleGroupSource.declareValuesSourceFields(parser, null);
|
||||
SingleGroupSource.declareValuesSourceFields(parser);
|
||||
return parser;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,14 +29,14 @@ public class GroupConfigTests extends AbstractSerializingTestCase<GroupConfig> {
|
|||
|
||||
public static GroupConfig randomGroupConfig() {
|
||||
Map<String, Object> source = new LinkedHashMap<>();
|
||||
Map<String, SingleGroupSource<?>> groups = new LinkedHashMap<>();
|
||||
Map<String, SingleGroupSource> groups = new LinkedHashMap<>();
|
||||
|
||||
// ensure that the unlikely does not happen: 2 group_by's share the same name
|
||||
Set<String> names = new HashSet<>();
|
||||
for (int i = 0; i < randomIntBetween(1, 20); ++i) {
|
||||
String targetFieldName = randomAlphaOfLengthBetween(1, 20);
|
||||
if (names.add(targetFieldName)) {
|
||||
SingleGroupSource<?> groupBy;
|
||||
SingleGroupSource groupBy;
|
||||
Type type = randomFrom(SingleGroupSource.Type.values());
|
||||
switch (type) {
|
||||
case TERMS:
|
||||
|
@ -88,7 +88,7 @@ public class GroupConfigTests extends AbstractSerializingTestCase<GroupConfig> {
|
|||
}
|
||||
}
|
||||
|
||||
private static Map<String, Object> getSource(SingleGroupSource<?> groupSource) {
|
||||
private static Map<String, Object> getSource(SingleGroupSource groupSource) {
|
||||
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
|
||||
XContentBuilder content = groupSource.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
|
||||
return XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON).v2();
|
||||
|
|
|
@ -28,8 +28,6 @@ import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsActio
|
|||
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
|
||||
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE;
|
||||
|
||||
|
||||
|
@ -62,7 +60,7 @@ public class TransportGetDataFrameTransformsAction extends AbstractTransportGetR
|
|||
}
|
||||
|
||||
@Override
|
||||
protected DataFrameTransformConfig parse(XContentParser parser) throws IOException {
|
||||
protected DataFrameTransformConfig parse(XContentParser parser) {
|
||||
return DataFrameTransformConfig.fromXContent(parser, null, true);
|
||||
}
|
||||
|
||||
|
|
|
@ -85,9 +85,8 @@ public class TransportPreviewDataFrameTransformAction extends
|
|||
DataFrameIndexerTransformStats stats = DataFrameIndexerTransformStats.withDefaultTransformId();
|
||||
// remove all internal fields
|
||||
List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
|
||||
.map(record -> {
|
||||
.peek(record -> {
|
||||
record.keySet().removeIf(k -> k.startsWith("_"));
|
||||
return record;
|
||||
}).collect(Collectors.toList());
|
||||
listener.onResponse(results);
|
||||
},
|
||||
|
|
|
@ -7,20 +7,13 @@
|
|||
package org.elasticsearch.xpack.dataframe.checkpoint;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class CheckpointException extends ElasticsearchException {
|
||||
public CheckpointException(String msg, Object... params) {
|
||||
class CheckpointException extends ElasticsearchException {
|
||||
CheckpointException(String msg, Object... params) {
|
||||
super(msg, null, params);
|
||||
}
|
||||
|
||||
public CheckpointException(String msg, Throwable cause, Object... params) {
|
||||
CheckpointException(String msg, Throwable cause, Object... params) {
|
||||
super(msg, cause, params);
|
||||
}
|
||||
|
||||
public CheckpointException(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -240,7 +240,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|||
public synchronized void triggered(Event event) {
|
||||
// for now no rerun, so only trigger if checkpoint == 0
|
||||
if (currentCheckpoint.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) {
|
||||
logger.debug("Data frame indexer [" + event.getJobName() + "] schedule has triggered, state: [" + indexer.getState() + "]");
|
||||
logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), indexer.getState());
|
||||
indexer.maybeTriggerAsyncJob(System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
@ -336,7 +336,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|||
@Override
|
||||
public synchronized boolean maybeTriggerAsyncJob(long now) {
|
||||
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
|
||||
logger.debug("Schedule was triggered for transform [" + getJobId() + "] but task is failed. Ignoring trigger.");
|
||||
logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getJobId());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -133,10 +133,11 @@ public final class SchemaUtil {
|
|||
String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName);
|
||||
String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping);
|
||||
|
||||
logger.debug(
|
||||
"Deduced mapping for: [" + targetFieldName + "], agg type [" + aggregationName + "] to [" + destinationMapping + "]");
|
||||
logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]",
|
||||
targetFieldName, aggregationName, destinationMapping);
|
||||
|
||||
if (Aggregations.isDynamicMapping(destinationMapping)) {
|
||||
logger.info("Dynamic target mapping set for field ["+ targetFieldName +"] and aggregation [" + aggregationName +"]");
|
||||
logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName);
|
||||
} else if (destinationMapping != null) {
|
||||
targetMapping.put(targetFieldName, destinationMapping);
|
||||
} else {
|
||||
|
@ -146,8 +147,7 @@ public final class SchemaUtil {
|
|||
|
||||
fieldNamesForGrouping.forEach((targetFieldName, sourceFieldName) -> {
|
||||
String destinationMapping = sourceMappings.get(sourceFieldName);
|
||||
logger.debug(
|
||||
"Deduced mapping for: [" + targetFieldName + "] to [" + destinationMapping + "]");
|
||||
logger.debug("Deduced mapping for: [{}] to [{}]", targetFieldName, destinationMapping);
|
||||
if (destinationMapping != null) {
|
||||
targetMapping.put(targetFieldName, destinationMapping);
|
||||
} else {
|
||||
|
@ -187,7 +187,9 @@ public final class SchemaUtil {
|
|||
final Map<?, ?> map = (Map<?, ?>) typeMap;
|
||||
if (map.containsKey("type")) {
|
||||
String type = map.get("type").toString();
|
||||
logger.debug("Extracted type for [" + fieldName + "] : [" + type + "] from index [" + indexName +"]");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Extracted type for [" + fieldName + "] : [" + type + "] from index [" + indexName + "]");
|
||||
}
|
||||
// TODO: overwrites types, requires resolve if
|
||||
// types are mixed
|
||||
extractedTypes.put(fieldName, type);
|
||||
|
|
Loading…
Reference in New Issue