[ML] Add a file structure determination endpoint (#33471)

This endpoint accepts an arbitrary file in the request body and
attempts to determine the structure.  If successful it also
proposes mappings that could be used when indexing the file's
contents, and calculates simple statistics for each of the fields
that are useful in the data preparation step prior to configuring
machine learning jobs.
This commit is contained in:
David Roberts 2018-09-07 17:41:57 +01:00 committed by GitHub
parent 4d233107f8
commit e42cc5cd8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 596 additions and 27 deletions

View File

@ -56,6 +56,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.action.FindFileStructureAction;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.ForecastJobAction; import org.elasticsearch.xpack.core.ml.action.ForecastJobAction;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
@ -265,6 +266,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
GetCalendarEventsAction.INSTANCE, GetCalendarEventsAction.INSTANCE,
PostCalendarEventsAction.INSTANCE, PostCalendarEventsAction.INSTANCE,
PersistJobAction.INSTANCE, PersistJobAction.INSTANCE,
FindFileStructureAction.INSTANCE,
// security // security
ClearRealmCacheAction.INSTANCE, ClearRealmCacheAction.INSTANCE,
ClearRolesCacheAction.INSTANCE, ClearRolesCacheAction.INSTANCE,

View File

@ -0,0 +1,183 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FileStructure;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class FindFileStructureAction extends Action<FindFileStructureAction.Response> {
public static final FindFileStructureAction INSTANCE = new FindFileStructureAction();
public static final String NAME = "cluster:monitor/xpack/ml/findfilestructure";
private FindFileStructureAction() {
super(NAME);
}
@Override
public Response newResponse() {
return new Response();
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response> {
RequestBuilder(ElasticsearchClient client, FindFileStructureAction action) {
super(client, action, new Request());
}
}
public static class Response extends ActionResponse implements StatusToXContentObject, Writeable {
private FileStructure fileStructure;
public Response(FileStructure fileStructure) {
this.fileStructure = fileStructure;
}
Response() {
}
public FileStructure getFileStructure() {
return fileStructure;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
fileStructure = new FileStructure(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
fileStructure.writeTo(out);
}
@Override
public RestStatus status() {
return RestStatus.OK;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
fileStructure.toXContent(builder, params);
return builder;
}
@Override
public int hashCode() {
return Objects.hash(fileStructure);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
FindFileStructureAction.Response that = (FindFileStructureAction.Response) other;
return Objects.equals(fileStructure, that.fileStructure);
}
}
public static class Request extends ActionRequest {
public static final ParseField LINES_TO_SAMPLE = new ParseField("lines_to_sample");
private Integer linesToSample;
private BytesReference sample;
public Request() {
}
public Integer getLinesToSample() {
return linesToSample;
}
public void setLinesToSample(Integer linesToSample) {
this.linesToSample = linesToSample;
}
public BytesReference getSample() {
return sample;
}
public void setSample(BytesReference sample) {
this.sample = sample;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (linesToSample != null && linesToSample <= 0) {
validationException =
addValidationError(LINES_TO_SAMPLE.getPreferredName() + " must be positive if specified", validationException);
}
if (sample == null || sample.length() == 0) {
validationException = addValidationError("sample must be specified", validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
linesToSample = in.readOptionalVInt();
sample = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalVInt(linesToSample);
out.writeBytesReference(sample);
}
@Override
public int hashCode() {
return Objects.hash(linesToSample, sample);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
Request that = (Request) other;
return Objects.equals(this.linesToSample, that.linesToSample) &&
Objects.equals(this.sample, that.sample);
}
}
}

View File

@ -6,6 +6,9 @@
package org.elasticsearch.xpack.core.ml.filestructurefinder; package org.elasticsearch.xpack.core.ml.filestructurefinder;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -16,7 +19,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class FieldStats implements ToXContentObject { public class FieldStats implements ToXContentObject, Writeable {
static final ParseField COUNT = new ParseField("count"); static final ParseField COUNT = new ParseField("count");
static final ParseField CARDINALITY = new ParseField("cardinality"); static final ParseField CARDINALITY = new ParseField("cardinality");
@ -64,6 +67,27 @@ public class FieldStats implements ToXContentObject {
this.topHits = (topHits == null) ? Collections.emptyList() : Collections.unmodifiableList(topHits); this.topHits = (topHits == null) ? Collections.emptyList() : Collections.unmodifiableList(topHits);
} }
public FieldStats(StreamInput in) throws IOException {
count = in.readVLong();
cardinality = in.readVInt();
minValue = in.readOptionalDouble();
maxValue = in.readOptionalDouble();
meanValue = in.readOptionalDouble();
medianValue = in.readOptionalDouble();
topHits = in.readList(StreamInput::readMap);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
out.writeVInt(cardinality);
out.writeOptionalDouble(minValue);
out.writeOptionalDouble(maxValue);
out.writeOptionalDouble(meanValue);
out.writeOptionalDouble(medianValue);
out.writeCollection(topHits, StreamOutput::writeMap);
}
public long getCount() { public long getCount() {
return count; return count;
} }

View File

@ -6,6 +6,9 @@
package org.elasticsearch.xpack.core.ml.filestructurefinder; package org.elasticsearch.xpack.core.ml.filestructurefinder;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -24,7 +27,7 @@ import java.util.TreeMap;
/** /**
* Stores the file format determined by Machine Learning. * Stores the file format determined by Machine Learning.
*/ */
public class FileStructure implements ToXContentObject { public class FileStructure implements ToXContentObject, Writeable {
public enum Format { public enum Format {
@ -79,6 +82,8 @@ public class FileStructure implements ToXContentObject {
} }
} }
public static final String EXPLAIN = "explain";
static final ParseField NUM_LINES_ANALYZED = new ParseField("num_lines_analyzed"); static final ParseField NUM_LINES_ANALYZED = new ParseField("num_lines_analyzed");
static final ParseField NUM_MESSAGES_ANALYZED = new ParseField("num_messages_analyzed"); static final ParseField NUM_MESSAGES_ANALYZED = new ParseField("num_messages_analyzed");
static final ParseField SAMPLE_START = new ParseField("sample_start"); static final ParseField SAMPLE_START = new ParseField("sample_start");
@ -176,6 +181,66 @@ public class FileStructure implements ToXContentObject {
this.explanation = Collections.unmodifiableList(new ArrayList<>(explanation)); this.explanation = Collections.unmodifiableList(new ArrayList<>(explanation));
} }
public FileStructure(StreamInput in) throws IOException {
numLinesAnalyzed = in.readVInt();
numMessagesAnalyzed = in.readVInt();
sampleStart = in.readString();
charset = in.readString();
hasByteOrderMarker = in.readOptionalBoolean();
format = in.readEnum(Format.class);
multilineStartPattern = in.readOptionalString();
excludeLinesPattern = in.readOptionalString();
inputFields = in.readBoolean() ? Collections.unmodifiableList(in.readList(StreamInput::readString)) : null;
hasHeaderRow = in.readOptionalBoolean();
delimiter = in.readBoolean() ? (char) in.readVInt() : null;
shouldTrimFields = in.readOptionalBoolean();
grokPattern = in.readOptionalString();
timestampFormats = in.readBoolean() ? Collections.unmodifiableList(in.readList(StreamInput::readString)) : null;
timestampField = in.readOptionalString();
needClientTimezone = in.readBoolean();
mappings = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap()));
fieldStats = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap(StreamInput::readString, FieldStats::new)));
explanation = Collections.unmodifiableList(in.readList(StreamInput::readString));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numLinesAnalyzed);
out.writeVInt(numMessagesAnalyzed);
out.writeString(sampleStart);
out.writeString(charset);
out.writeOptionalBoolean(hasByteOrderMarker);
out.writeEnum(format);
out.writeOptionalString(multilineStartPattern);
out.writeOptionalString(excludeLinesPattern);
if (inputFields == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeCollection(inputFields, StreamOutput::writeString);
}
out.writeOptionalBoolean(hasHeaderRow);
if (delimiter == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeVInt(delimiter);
}
out.writeOptionalBoolean(shouldTrimFields);
out.writeOptionalString(grokPattern);
if (timestampFormats == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeCollection(timestampFormats, StreamOutput::writeString);
}
out.writeOptionalString(timestampField);
out.writeBoolean(needClientTimezone);
out.writeMap(mappings);
out.writeMap(fieldStats, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
out.writeCollection(explanation, StreamOutput::writeString);
}
public int getNumLinesAnalyzed() { public int getNumLinesAnalyzed() {
return numLinesAnalyzed; return numLinesAnalyzed;
} }
@ -300,7 +365,9 @@ public class FileStructure implements ToXContentObject {
} }
builder.endObject(); builder.endObject();
} }
builder.field(EXPLANATION.getPreferredName(), explanation); if (params.paramAsBoolean(EXPLAIN, false)) {
builder.field(EXPLANATION.getPreferredName(), explanation);
}
builder.endObject(); builder.endObject();
return builder; return builder;

View File

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.test.AbstractStreamableTestCase;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.startsWith;
public class FindFileStructureActionRequestTests extends AbstractStreamableTestCase<FindFileStructureAction.Request> {
@Override
protected FindFileStructureAction.Request createTestInstance() {
FindFileStructureAction.Request request = new FindFileStructureAction.Request();
if (randomBoolean()) {
request.setLinesToSample(randomIntBetween(10, 2000));
}
request.setSample(new BytesArray(randomByteArrayOfLength(randomIntBetween(1000, 20000))));
return request;
}
@Override
protected FindFileStructureAction.Request createBlankInstance() {
return new FindFileStructureAction.Request();
}
public void testValidateLinesToSample() {
FindFileStructureAction.Request request = new FindFileStructureAction.Request();
request.setLinesToSample(randomFrom(-1, 0));
request.setSample(new BytesArray("foo\n"));
ActionRequestValidationException e = request.validate();
assertNotNull(e);
assertThat(e.getMessage(), startsWith("Validation Failed: "));
assertThat(e.getMessage(), containsString(" lines_to_sample must be positive if specified"));
}
public void testValidateSample() {
FindFileStructureAction.Request request = new FindFileStructureAction.Request();
if (randomBoolean()) {
request.setSample(BytesArray.EMPTY);
}
ActionRequestValidationException e = request.validate();
assertNotNull(e);
assertThat(e.getMessage(), startsWith("Validation Failed: "));
assertThat(e.getMessage(), containsString(" sample must be specified"));
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FileStructureTests;
public class FindFileStructureActionResponseTests extends AbstractStreamableTestCase<FindFileStructureAction.Response> {
@Override
protected FindFileStructureAction.Response createTestInstance() {
return new FindFileStructureAction.Response(FileStructureTests.createTestFileStructure());
}
@Override
protected FindFileStructureAction.Response createBlankInstance() {
return new FindFileStructureAction.Response();
}
}

View File

@ -5,16 +5,18 @@
*/ */
package org.elasticsearch.xpack.core.ml.filestructurefinder; package org.elasticsearch.xpack.core.ml.filestructurefinder;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.test.AbstractSerializingTestCase;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public class FieldStatsTests extends AbstractXContentTestCase<FieldStats> { public class FieldStatsTests extends AbstractSerializingTestCase<FieldStats> {
@Override
protected FieldStats createTestInstance() { protected FieldStats createTestInstance() {
return createTestFieldStats(); return createTestFieldStats();
} }
@ -51,11 +53,13 @@ public class FieldStatsTests extends AbstractXContentTestCase<FieldStats> {
return new FieldStats(count, cardinality, minValue, maxValue, meanValue, medianValue, topHits); return new FieldStats(count, cardinality, minValue, maxValue, meanValue, medianValue, topHits);
} }
@Override
protected Writeable.Reader<FieldStats> instanceReader() {
return FieldStats::new;
}
@Override
protected FieldStats doParseInstance(XContentParser parser) { protected FieldStats doParseInstance(XContentParser parser) {
return FieldStats.PARSER.apply(parser, null); return FieldStats.PARSER.apply(parser, null);
} }
protected boolean supportsUnknownFields() {
return false;
}
} }

View File

@ -5,8 +5,10 @@
*/ */
package org.elasticsearch.xpack.core.ml.filestructurefinder; package org.elasticsearch.xpack.core.ml.filestructurefinder;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.test.AbstractSerializingTestCase;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Arrays; import java.util.Arrays;
@ -16,9 +18,14 @@ import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
public class FileStructureTests extends AbstractXContentTestCase<FileStructure> { public class FileStructureTests extends AbstractSerializingTestCase<FileStructure> {
@Override
protected FileStructure createTestInstance() { protected FileStructure createTestInstance() {
return createTestFileStructure();
}
public static FileStructure createTestFileStructure() {
FileStructure.Format format = randomFrom(EnumSet.allOf(FileStructure.Format.class)); FileStructure.Format format = randomFrom(EnumSet.allOf(FileStructure.Format.class));
@ -66,24 +73,31 @@ public class FileStructureTests extends AbstractXContentTestCase<FileStructure>
} }
builder.setMappings(mappings); builder.setMappings(mappings);
//if (randomBoolean()) { if (randomBoolean()) {
Map<String, FieldStats> fieldStats = new TreeMap<>(); Map<String, FieldStats> fieldStats = new TreeMap<>();
for (String field : generateRandomStringArray(5, 20, false, false)) { for (String field : generateRandomStringArray(5, 20, false, false)) {
fieldStats.put(field, FieldStatsTests.createTestFieldStats()); fieldStats.put(field, FieldStatsTests.createTestFieldStats());
} }
builder.setFieldStats(fieldStats); builder.setFieldStats(fieldStats);
//} }
builder.setExplanation(Arrays.asList(generateRandomStringArray(10, 150, false, false))); builder.setExplanation(Arrays.asList(generateRandomStringArray(10, 150, false, false)));
return builder.build(); return builder.build();
} }
@Override
protected Writeable.Reader<FileStructure> instanceReader() {
return FileStructure::new;
}
@Override
protected FileStructure doParseInstance(XContentParser parser) { protected FileStructure doParseInstance(XContentParser parser) {
return FileStructure.PARSER.apply(parser, null).build(); return FileStructure.PARSER.apply(parser, null).build();
} }
protected boolean supportsUnknownFields() { @Override
return false; protected ToXContent.Params getToXContentParams() {
return new ToXContent.MapParams(Collections.singletonMap(FileStructure.EXPLAIN, "true"));
} }
} }

View File

@ -31,10 +31,13 @@ public class MlWithSecurityUserRoleIT extends MlWithSecurityIT {
super.test(); super.test();
// We should have got here if and only if the only ML endpoints in the test were GETs // We should have got here if and only if the only ML endpoints in the test were GETs
// or the find_file_structure API, which is also available to the machine_learning_user
// role
for (ExecutableSection section : testCandidate.getTestSection().getExecutableSections()) { for (ExecutableSection section : testCandidate.getTestSection().getExecutableSections()) {
if (section instanceof DoSection) { if (section instanceof DoSection) {
if (((DoSection) section).getApiCallSection().getApi().startsWith("xpack.ml.") && if (((DoSection) section).getApiCallSection().getApi().startsWith("xpack.ml.") &&
((DoSection) section).getApiCallSection().getApi().startsWith("xpack.ml.get_") == false) { ((DoSection) section).getApiCallSection().getApi().startsWith("xpack.ml.get_") == false &&
((DoSection) section).getApiCallSection().getApi().equals("xpack.ml.find_file_structure") == false) {
fail("should have failed because of missing role"); fail("should have failed because of missing role");
} }
} }

View File

@ -66,6 +66,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.action.FindFileStructureAction;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.ForecastJobAction; import org.elasticsearch.xpack.core.ml.action.ForecastJobAction;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
@ -119,6 +120,7 @@ import org.elasticsearch.xpack.ml.action.TransportDeleteForecastAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteJobAction; import org.elasticsearch.xpack.ml.action.TransportDeleteJobAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportDeleteModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.TransportFinalizeJobExecutionAction; import org.elasticsearch.xpack.ml.action.TransportFinalizeJobExecutionAction;
import org.elasticsearch.xpack.ml.action.TransportFindFileStructureAction;
import org.elasticsearch.xpack.ml.action.TransportFlushJobAction; import org.elasticsearch.xpack.ml.action.TransportFlushJobAction;
import org.elasticsearch.xpack.ml.action.TransportForecastJobAction; import org.elasticsearch.xpack.ml.action.TransportForecastJobAction;
import org.elasticsearch.xpack.ml.action.TransportGetBucketsAction; import org.elasticsearch.xpack.ml.action.TransportGetBucketsAction;
@ -180,6 +182,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction; import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.rest.RestFindFileStructureAction;
import org.elasticsearch.xpack.ml.rest.RestMlInfoAction; import org.elasticsearch.xpack.ml.rest.RestMlInfoAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarAction; import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarEventAction; import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarEventAction;
@ -500,7 +503,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new RestDeleteCalendarJobAction(settings, restController), new RestDeleteCalendarJobAction(settings, restController),
new RestPutCalendarJobAction(settings, restController), new RestPutCalendarJobAction(settings, restController),
new RestGetCalendarEventsAction(settings, restController), new RestGetCalendarEventsAction(settings, restController),
new RestPostCalendarEventAction(settings, restController) new RestPostCalendarEventAction(settings, restController),
new RestFindFileStructureAction(settings, restController)
); );
} }
@ -557,7 +561,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, TransportUpdateCalendarJobAction.class), new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, TransportUpdateCalendarJobAction.class),
new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class), new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class),
new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class), new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class),
new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class) new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class),
new ActionHandler<>(FindFileStructureAction.INSTANCE, TransportFindFileStructureAction.class)
); );
} }
@Override @Override

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.FindFileStructureAction;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.filestructurefinder.FileStructureFinder;
import org.elasticsearch.xpack.ml.filestructurefinder.FileStructureFinderManager;
public class TransportFindFileStructureAction
extends HandledTransportAction<FindFileStructureAction.Request, FindFileStructureAction.Response> {
private final ThreadPool threadPool;
@Inject
public TransportFindFileStructureAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
ThreadPool threadPool) {
super(settings, FindFileStructureAction.NAME, transportService, actionFilters, FindFileStructureAction.Request::new);
this.threadPool = threadPool;
}
@Override
protected void doExecute(Task task, FindFileStructureAction.Request request,
ActionListener<FindFileStructureAction.Response> listener) {
// As determining the file structure might take a while, we run
// in a different thread to avoid blocking the network thread.
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
try {
listener.onResponse(buildFileStructureResponse(request));
} catch (Exception e) {
listener.onFailure(e);
}
});
}
private FindFileStructureAction.Response buildFileStructureResponse(FindFileStructureAction.Request request) throws Exception {
FileStructureFinderManager structureFinderManager = new FileStructureFinderManager();
FileStructureFinder fileStructureFinder =
structureFinderManager.findFileStructure(request.getLinesToSample(), request.getSample().streamInput());
return new FindFileStructureAction.Response(fileStructureFinder.getStructure());
}
}

View File

@ -35,6 +35,7 @@ import java.util.Set;
public final class FileStructureFinderManager { public final class FileStructureFinderManager {
public static final int MIN_SAMPLE_LINE_COUNT = 2; public static final int MIN_SAMPLE_LINE_COUNT = 2;
public static final int DEFAULT_IDEAL_SAMPLE_LINE_COUNT = 1000;
static final Set<String> FILEBEAT_SUPPORTED_ENCODINGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( static final Set<String> FILEBEAT_SUPPORTED_ENCODINGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
"866", "ansi_x3.4-1968", "arabic", "ascii", "asmo-708", "big5", "big5-hkscs", "chinese", "cn-big5", "cp1250", "cp1251", "cp1252", "866", "ansi_x3.4-1968", "arabic", "ascii", "asmo-708", "big5", "big5-hkscs", "chinese", "cn-big5", "cp1250", "cp1251", "cp1252",
@ -82,16 +83,18 @@ public final class FileStructureFinderManager {
* Given a stream of data from some file, determine its structure. * Given a stream of data from some file, determine its structure.
* @param idealSampleLineCount Ideally, how many lines from the stream will be read to determine the structure? * @param idealSampleLineCount Ideally, how many lines from the stream will be read to determine the structure?
* If the stream has fewer lines then an attempt will still be made, providing at * If the stream has fewer lines then an attempt will still be made, providing at
* least {@link #MIN_SAMPLE_LINE_COUNT} lines can be read. * least {@link #MIN_SAMPLE_LINE_COUNT} lines can be read. If <code>null</code>
* the value of {@link #DEFAULT_IDEAL_SAMPLE_LINE_COUNT} will be used.
* @param fromFile A stream from which the sample will be read. * @param fromFile A stream from which the sample will be read.
* @return A {@link FileStructureFinder} object from which the structure and messages can be queried. * @return A {@link FileStructureFinder} object from which the structure and messages can be queried.
* @throws Exception A variety of problems could occur at various stages of the structure finding process. * @throws Exception A variety of problems could occur at various stages of the structure finding process.
*/ */
public FileStructureFinder findLogStructure(int idealSampleLineCount, InputStream fromFile) throws Exception { public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile) throws Exception {
return findLogStructure(new ArrayList<>(), idealSampleLineCount, fromFile); return findFileStructure(new ArrayList<>(), (idealSampleLineCount == null) ? DEFAULT_IDEAL_SAMPLE_LINE_COUNT : idealSampleLineCount,
fromFile);
} }
public FileStructureFinder findLogStructure(List<String> explanation, int idealSampleLineCount, InputStream fromFile) public FileStructureFinder findFileStructure(List<String> explanation, int idealSampleLineCount, InputStream fromFile)
throws Exception { throws Exception {
CharsetMatch charsetMatch = findCharset(explanation, fromFile); CharsetMatch charsetMatch = findCharset(explanation, fromFile);

View File

@ -445,7 +445,7 @@ public final class GrokPatternCreator {
@Override @Override
public String processCaptures(Map<String, Integer> fieldNameCountStore, Collection<String> snippets, Collection<String> prefaces, public String processCaptures(Map<String, Integer> fieldNameCountStore, Collection<String> snippets, Collection<String> prefaces,
Collection<String> epilogues, Map<String, Object> mappings, Map<String, FieldStats> fieldStats) { Collection<String> epilogues, Map<String, Object> mappings, Map<String, FieldStats> fieldStats) {
String sampleValue = null; Collection<String> values = new ArrayList<>();
for (String snippet : snippets) { for (String snippet : snippets) {
Map<String, Object> captures = grok.captures(snippet); Map<String, Object> captures = grok.captures(snippet);
// If the pattern doesn't match then captures will be null // If the pattern doesn't match then captures will be null
@ -453,22 +453,24 @@ public final class GrokPatternCreator {
throw new IllegalStateException("[%{" + grokPatternName + "}] does not match snippet [" + snippet + "]"); throw new IllegalStateException("[%{" + grokPatternName + "}] does not match snippet [" + snippet + "]");
} }
prefaces.add(captures.getOrDefault(PREFACE, "").toString()); prefaces.add(captures.getOrDefault(PREFACE, "").toString());
if (sampleValue == null) { values.add(captures.getOrDefault(VALUE, "").toString());
sampleValue = captures.get(VALUE).toString();
}
epilogues.add(captures.getOrDefault(EPILOGUE, "").toString()); epilogues.add(captures.getOrDefault(EPILOGUE, "").toString());
} }
String adjustedFieldName = buildFieldName(fieldNameCountStore, fieldName); String adjustedFieldName = buildFieldName(fieldNameCountStore, fieldName);
if (mappings != null) { if (mappings != null) {
Map<String, String> fullMappingType = Collections.singletonMap(FileStructureUtils.MAPPING_TYPE_SETTING, mappingType); Map<String, String> fullMappingType = Collections.singletonMap(FileStructureUtils.MAPPING_TYPE_SETTING, mappingType);
if ("date".equals(mappingType)) { if ("date".equals(mappingType)) {
TimestampMatch timestampMatch = TimestampFormatFinder.findFirstFullMatch(sampleValue); assert values.isEmpty() == false;
TimestampMatch timestampMatch = TimestampFormatFinder.findFirstFullMatch(values.iterator().next());
if (timestampMatch != null) { if (timestampMatch != null) {
fullMappingType = timestampMatch.getEsDateMappingTypeWithFormat(); fullMappingType = timestampMatch.getEsDateMappingTypeWithFormat();
} }
} }
mappings.put(adjustedFieldName, fullMappingType); mappings.put(adjustedFieldName, fullMappingType);
} }
if (fieldStats != null) {
fieldStats.put(adjustedFieldName, FileStructureUtils.calculateFieldStats(values));
}
return "%{" + grokPatternName + ":" + adjustedFieldName + "}"; return "%{" + grokPatternName + ":" + adjustedFieldName + "}";
} }
} }

View File

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.FindFileStructureAction;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FileStructure;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.filestructurefinder.FileStructureFinderManager;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
public class RestFindFileStructureAction extends BaseRestHandler {
public RestFindFileStructureAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.POST, MachineLearning.BASE_PATH + "find_file_structure", this);
}
@Override
public String getName() {
return "xpack_ml_find_file_structure_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
FindFileStructureAction.Request request = new FindFileStructureAction.Request();
request.setLinesToSample(restRequest.paramAsInt(FindFileStructureAction.Request.LINES_TO_SAMPLE.getPreferredName(),
FileStructureFinderManager.DEFAULT_IDEAL_SAMPLE_LINE_COUNT));
if (restRequest.hasContent()) {
request.setSample(restRequest.content());
} else {
throw new ElasticsearchParseException("request body is required");
}
return channel -> client.execute(FindFileStructureAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
@Override
protected Set<String> responseParams() {
return Collections.singleton(FileStructure.EXPLAIN);
}
}

View File

@ -0,0 +1,25 @@
{
"xpack.ml.find_file_structure": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-file-structure.html",
"methods": [ "POST" ],
"url": {
"path": "/_xpack/ml/find_file_structure",
"paths": [ "/_xpack/ml/find_file_structure" ],
"params": {
"lines_to_sample": {
"type": "int",
"description": "Optional parameter to specify how many lines of the file to include in the analysis"
},
"explain": {
"type": "boolean",
"description": "Optional parameter to include an commentary on how the structure was derived"
}
}
},
"body": {
"description" : "The contents of the file to be analyzed",
"required" : true,
"serialize" : "bulk"
}
}
}

View File

@ -0,0 +1,44 @@
---
"Test JSON file structure analysis":
- do:
headers:
# This is to stop the usual content type randomization, which
# would obviously ruin the results for this particular test
Content-Type: "application/json"
xpack.ml.find_file_structure:
body:
- airline: AAL
responsetime: 132.2046
sourcetype: file-structure-test
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: file-structure-test
time: 1403481700
- airline: AAL
responsetime: 134.2046
sourcetype: file-structure-test
time: 1403481800
- match: { num_lines_analyzed: 3 }
- match: { num_messages_analyzed: 3 }
- match: { charset: "UTF-8" }
- match: { has_byte_order_marker: false }
- match: { format: json }
- match: { timestamp_field: time }
- match: { timestamp_formats.0: UNIX }
- match: { need_client_timezone: false }
- match: { mappings.airline.type: keyword }
- match: { mappings.responsetime.type: double }
- match: { mappings.sourcetype.type: keyword }
- match: { mappings.time.type: date }
- match: { mappings.time.format: epoch_second }
- match: { field_stats.airline.count: 3 }
- match: { field_stats.airline.cardinality: 2 }
- match: { field_stats.responsetime.count: 3 }
- match: { field_stats.responsetime.cardinality: 3 }
- match: { field_stats.sourcetype.count: 3 }
- match: { field_stats.sourcetype.cardinality: 1 }
- match: { field_stats.time.count: 3 }
- match: { field_stats.time.cardinality: 3 }
- match: { field_stats.time.cardinality: 3 }