From 5e84cf6ff96d96b75c505d0fa2614d773ea2286f Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Tue, 26 Jan 2021 22:43:50 -0500 Subject: [PATCH] NIFI-8175: Add WindowsEventLogReader controller service This closes #4785 Signed-off-by: David Handermann --- .../windowsevent/WindowsEventLogReader.java | 49 ++ .../WindowsEventLogRecordReader.java | 612 ++++++++++++++++++ .../xml/inference/XmlSchemaInference.java | 2 +- ...g.apache.nifi.controller.ControllerService | 1 + .../additionalDetails.html | 93 +++ .../TestWindowsEventLogRecordReader.java | 197 ++++++ .../windowseventlog/multiple_events.xml | 101 +++ .../windowseventlog/single_event.xml | 42 ++ .../single_event_no_parent.xml | 40 ++ 9 files changed, 1136 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/windowsevent/WindowsEventLogReader.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/windowsevent/WindowsEventLogRecordReader.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.windowsevent.WindowsEventLogReader/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/windowsevent/TestWindowsEventLogRecordReader.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/multiple_events.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/single_event.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/single_event_no_parent.xml diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/windowsevent/WindowsEventLogReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/windowsevent/WindowsEventLogReader.java new file mode 100644 index 0000000000..d684d5f4d1 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/windowsevent/WindowsEventLogReader.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.windowsevent; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.RecordFieldType; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + + +@Tags({"xml", "windows", "event", "log", "record", "reader", "parser"}) +@CapabilityDescription("Reads Windows Event Log data as XML content having been generated by ConsumeWindowsEventLog, ParseEvtx, etc. (see Additional Details) and creates Record object(s). If the " + + "root tag of the input XML is 'Events', the child content is expected to be a series of 'Event' tags, each of which will constitute a single record. If the root tag is 'Event', the " + + "content is expected to be a single 'Event' and thus a single record. No other root tags are valid. Only events of type 'System' are currently supported.") +public class WindowsEventLogReader extends AbstractControllerService implements RecordReaderFactory { + + private static final String DATE_FORMAT = RecordFieldType.DATE.getDefaultFormat(); + private static final String TIME_FORMAT = RecordFieldType.TIME.getDefaultFormat(); + private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; // The timestamps have nanoseconds but need a SimpleDateFormat string here + + @Override + public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) + throws MalformedRecordException, IOException, SchemaNotFoundException { + return new WindowsEventLogRecordReader(in, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, logger); + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/windowsevent/WindowsEventLogRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/windowsevent/WindowsEventLogRecordReader.java new file mode 100644 index 0000000000..cf1e23e10a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/windowsevent/WindowsEventLogRecordReader.java @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.windowsevent; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.inference.TimeValueInference; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.xml.inference.XmlSchemaInference; + +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.events.Attribute; +import javax.xml.stream.events.Characters; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +public class WindowsEventLogRecordReader implements RecordReader { + + private final ComponentLog logger; + private final RecordSchema schema; + private boolean isArray = false; + private XMLEventReader xmlEventReader; + + private StartElement currentRecordStartTag; + // Utility used to infer tag data types + private final XmlSchemaInference xmlSchemaInference; + + private final Supplier LAZY_DATE_FORMAT; + private final Supplier LAZY_TIME_FORMAT; + private final Supplier LAZY_TIMESTAMP_FORMAT; + + private static final String DATA_TAG = "Data"; + private static final String BINARY_TAG = "Binary"; + + private static final SimpleRecordSchema SYSTEM_SCHEMA; + private static final RecordField PROVIDER_GUID_FIELD = new RecordField("Guid", RecordFieldType.STRING.getDataType(), false); + private static final RecordField PROVIDER_NAME_FIELD = new RecordField("Name", RecordFieldType.STRING.getDataType(), false); + private static final RecordField TIME_CREATED_SYSTEMTIME_FIELD = new RecordField("SystemTime", RecordFieldType.STRING.getDataType(), false); + private static final RecordField EXECUTION_THREADID_FIELD = new RecordField("ThreadID", RecordFieldType.INT.getDataType(), true); + private static final RecordField EXECUTION_PROCESSID_FIELD = new RecordField("ProcessID", RecordFieldType.INT.getDataType(), true); + + private static final RecordField EVENT_ID_FIELD = new RecordField("EventID", RecordFieldType.INT.getDataType(), true); + private static final RecordField VERSION_FIELD = new RecordField("Version", RecordFieldType.INT.getDataType(), true); + private static final RecordField LEVEL_FIELD = new RecordField("Level", RecordFieldType.INT.getDataType(), true); + private static final RecordField TASK_FIELD = new RecordField("Task", RecordFieldType.INT.getDataType(), true); + private static final RecordField OPCODE_FIELD = new RecordField("Opcode", RecordFieldType.INT.getDataType(), true); + private static final RecordField KEYWORDS_FIELD = new RecordField("Keywords", RecordFieldType.STRING.getDataType(), true); + private static final RecordField EVENTRECORDID_FIELD = new RecordField("EventRecordID", RecordFieldType.INT.getDataType(), true); + private static final RecordField CORRELATION_FIELD = new RecordField("Correlation", RecordFieldType.STRING.getDataType(), true); + private static final RecordField CHANNEL_FIELD = new RecordField("Channel", RecordFieldType.STRING.getDataType(), true); + private static final RecordField COMPUTER_FIELD = new RecordField("Computer", RecordFieldType.STRING.getDataType(), true); + private static final RecordField SECURITY_FIELD = new RecordField("Security", RecordFieldType.STRING.getDataType(), true); + + static { + // Generate the System part of the schema as it is well-defined and static + List systemProviderFields = new ArrayList<>(); + systemProviderFields.add(PROVIDER_GUID_FIELD); + systemProviderFields.add(PROVIDER_NAME_FIELD); + SimpleRecordSchema systemProviderSchema = new SimpleRecordSchema(systemProviderFields); + systemProviderSchema.setSchemaName("Provider"); + + List systemTimeCreatedFields = new ArrayList<>(1); + systemTimeCreatedFields.add(TIME_CREATED_SYSTEMTIME_FIELD); + SimpleRecordSchema systemTimeCreatedSchema = new SimpleRecordSchema(systemTimeCreatedFields); + systemTimeCreatedSchema.setSchemaName("TimeCreated"); + + List systemExecutionFields = new ArrayList<>(2); + systemExecutionFields.add(EXECUTION_THREADID_FIELD); + systemExecutionFields.add(EXECUTION_PROCESSID_FIELD); + SimpleRecordSchema systemExecutionSchema = new SimpleRecordSchema(systemExecutionFields); + systemExecutionSchema.setSchemaName("Execution"); + + List systemFields = new ArrayList<>(14); + systemFields.add(new RecordField("Provider", RecordFieldType.RECORD.getRecordDataType(systemProviderSchema))); + systemFields.add(EVENT_ID_FIELD); + systemFields.add(VERSION_FIELD); + systemFields.add(LEVEL_FIELD); + systemFields.add(TASK_FIELD); + systemFields.add(OPCODE_FIELD); + systemFields.add(KEYWORDS_FIELD); + systemFields.add(new RecordField("TimeCreated", RecordFieldType.RECORD.getRecordDataType(systemTimeCreatedSchema))); + systemFields.add(EVENTRECORDID_FIELD); + systemFields.add(CORRELATION_FIELD); + systemFields.add(new RecordField("Execution", RecordFieldType.RECORD.getRecordDataType(systemExecutionSchema))); + systemFields.add(CHANNEL_FIELD); + systemFields.add(COMPUTER_FIELD); + systemFields.add(SECURITY_FIELD); + + SYSTEM_SCHEMA = new SimpleRecordSchema(systemFields); + SYSTEM_SCHEMA.setSchemaName("System"); + } + + + public WindowsEventLogRecordReader(InputStream in, final String dateFormat, final String timeFormat, final String timestampFormat, ComponentLog logger) + throws IOException, MalformedRecordException { + + this.logger = logger; + + final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); + final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); + final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat); + + LAZY_DATE_FORMAT = () -> df; + LAZY_TIME_FORMAT = () -> tf; + LAZY_TIMESTAMP_FORMAT = () -> tsf; + + final FilterInputStream inputStream; + final XMLInputFactory xmlInputFactory; + try { + xmlInputFactory = XMLInputFactory.newInstance(); + // Avoid XXE Vulnerabilities + xmlInputFactory.setProperty(XMLInputFactory.SUPPORT_DTD, false); + xmlInputFactory.setProperty("javax.xml.stream.isSupportingExternalEntities", false); + + inputStream = new NonCloseableInputStream(in); + inputStream.mark(Integer.MAX_VALUE); + xmlEventReader = xmlInputFactory.createXMLEventReader(inputStream); + xmlSchemaInference = new XmlSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat)); + } catch (XMLStreamException e) { + throw new MalformedRecordException("Error creating XML Event reader from FlowFile input stream", e); + } + + try { + // Do a streaming pass through the input looking for tags, then reset the input + schema = determineSchema(); + } catch (XMLStreamException e) { + throw new MalformedRecordException("Error reading records to determine the FlowFile's RecordSchema", e); + } + + try { + // Restart the XML event stream and advance to the first Event tag + inputStream.reset(); + xmlEventReader = xmlInputFactory.createXMLEventReader(inputStream); + if (isArray) { + skipToNextStartTag(); + } + setNextRecordStartTag(); + } catch (XMLStreamException e) { + throw new MalformedRecordException("Error resetting the XML input stream to the first Windows Log Event, current XML tag = " + currentRecordStartTag, e); + } + } + + @Override + public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { + if (currentRecordStartTag == null) { + return null; + } + try { + final Record record = parseRecord(currentRecordStartTag, this.schema, coerceTypes, dropUnknownFields); + setNextRecordStartTag(); + if (record != null) { + return record; + } else { + return new MapRecord(this.schema, Collections.emptyMap()); + } + } catch (XMLStreamException e) { + throw new MalformedRecordException("Could not parse XML", e); + } + } + + @Override + public RecordSchema getSchema() throws MalformedRecordException { + return schema; + } + + @Override + public void close() throws IOException { + try { + xmlEventReader.close(); + } catch (XMLStreamException e) { + logger.error("Unable to close XMLEventReader"); + } + } + + private RecordSchema determineSchema() throws XMLStreamException { + + setNextRecordStartTag(); + if (currentRecordStartTag == null) { + throw new XMLStreamException("No root tag found, must be one of or "); + } + + if (currentRecordStartTag.getName().getLocalPart().equals("Events")) { + isArray = true; + setNextRecordStartTag(); + } + + // If there was an tag pair with no Events in it, use the default schema + if (currentRecordStartTag == null) { + return generateFullSchema(new SimpleRecordSchema(Collections.emptyList())); + } + + List dataFields = new ArrayList<>(); + List dataFieldNames = new ArrayList<>(); + while (currentRecordStartTag != null) { + if (!currentRecordStartTag.getName().getLocalPart().equals("Event")) { + // Unknown and invalid tag + throw new XMLStreamException("Expecting tag but found unknown/invalid tag " + currentRecordStartTag.getName().getLocalPart()); + } + + setNextRecordStartTag(); + // At an Event tag, skip the event log type tag (System, e.g.), go into EventData tag then add all the Data tags to the partial schema + while (currentRecordStartTag != null && !currentRecordStartTag.getName().getLocalPart().equals("EventData")) { + skipElement(); + setNextRecordStartTag(); + } + + if (currentRecordStartTag == null) { + throw new XMLStreamException("Expecting tag but found none"); + } + + setNextRecordStartTag(); + if (currentRecordStartTag == null) { + // There was an tag but no Data/Binary tags, so this record has been fully processed + continue; + } + + String eventDataElementName = currentRecordStartTag.getName().getLocalPart(); + dataFields.addAll(getDataFieldsFrom(eventDataElementName, dataFieldNames)); + } + + return generateFullSchema(new SimpleRecordSchema(dataFields)); + } + + private List getDataFieldsFrom(String eventDataElementName, List dataFieldNames) throws XMLStreamException { + List dataFields = new ArrayList<>(); + while (DATA_TAG.equals(eventDataElementName)) { + // Save reference to Data start element so we can continue to get the value/content + StartElement dataElement = currentRecordStartTag; + String content = getContent(); + + // Create field for the data point using attribute "Name" + String dataFieldName; + final Iterator iterator = dataElement.getAttributes(); + if (!iterator.hasNext()) { + // If no Name attribute is provided, the Name is the content of the Data tag and there should be a following Binary tag + dataFieldName = content; + setNextRecordStartTag(); + eventDataElementName = currentRecordStartTag.getName().getLocalPart(); + if (!BINARY_TAG.equals(eventDataElementName)) { + throw new XMLStreamException("Expecting tag containing data for element: " + dataFieldName); + } + content = getContent(); + + } else { + final Attribute attribute = (Attribute) iterator.next(); + final String attributeName = attribute.getName().getLocalPart(); + if (!"Name" .equals(attributeName)) { + throw new XMLStreamException("Expecting 'Name' attribute, actual: " + attributeName); + } + dataFieldName = attribute.getValue(); + } + // Skip this if it has been processed in a previous record + if (!dataFieldNames.contains(dataFieldName)) { + final DataType dataElementDataType = xmlSchemaInference.inferTextualDataType(content); + RecordField newRecordField = new RecordField(dataFieldName, dataElementDataType, true); + dataFields.add(newRecordField); + dataFieldNames.add(dataFieldName); + } + + // Advance to next data point (or end of EventData) + setNextRecordStartTag(); + eventDataElementName = currentRecordStartTag == null ? null : currentRecordStartTag.getName().getLocalPart(); + } + return dataFields; + } + + private void skipElement() throws XMLStreamException { + while (xmlEventReader.hasNext()) { + final XMLEvent xmlEvent = xmlEventReader.nextEvent(); + + if (xmlEvent.isStartElement()) { + skipElement(); + } + if (xmlEvent.isEndElement()) { + return; + } + } + } + + private void skipToNextStartTag() throws XMLStreamException { + while (xmlEventReader.hasNext()) { + final XMLEvent xmlEvent = xmlEventReader.nextEvent(); + if (xmlEvent.isStartElement()) { + return; + } + } + } + + private void setNextRecordStartTag() throws XMLStreamException { + while (xmlEventReader.hasNext()) { + final XMLEvent xmlEvent = xmlEventReader.nextEvent(); + if (xmlEvent.isStartElement()) { + currentRecordStartTag = xmlEvent.asStartElement(); + return; + } + } + currentRecordStartTag = null; + } + + private RecordSchema generateFullSchema(final RecordSchema dataElementsSchema) { + final SimpleRecordSchema rootSchema; + + List fullSchemaFields = new ArrayList<>(2); + fullSchemaFields.add(new RecordField("System", RecordFieldType.RECORD.getRecordDataType(SYSTEM_SCHEMA))); + fullSchemaFields.add(new RecordField("EventData", RecordFieldType.RECORD.getRecordDataType(dataElementsSchema))); + + SimpleRecordSchema fullSchema = new SimpleRecordSchema(fullSchemaFields); + fullSchema.setSchemaName("Event"); + + rootSchema = fullSchema; + return rootSchema; + } + + private String getContent() throws XMLStreamException { + StringBuilder content = new StringBuilder(); + while (xmlEventReader.hasNext()) { + XMLEvent xmlEvent = xmlEventReader.nextEvent(); + if (xmlEvent.isCharacters()) { + final Characters characters = xmlEvent.asCharacters(); + if (!characters.isWhiteSpace()) { + content.append(characters.getData()); + } + } else if (xmlEvent.isEndElement()) { + break; + } else if (xmlEvent.isStartElement()) { + this.skipElement(); + } + } + return content.toString(); + } + + private Record parseRecord(StartElement startElement, RecordSchema schema, boolean coerceTypes, boolean dropUnknown) throws XMLStreamException, MalformedRecordException { + final Map recordValues = new HashMap<>(); + + // parse attributes + final Iterator iterator = startElement.getAttributes(); + while (iterator.hasNext()) { + final Attribute attribute = (Attribute) iterator.next(); + final String targetFieldName = attribute.getName().toString(); + + if (dropUnknown) { + final Optional field = schema.getField(targetFieldName); + if (field.isPresent()) { + + // dropUnknown == true && coerceTypes == true + if (coerceTypes) { + final Object value; + final DataType dataType = field.get().getDataType(); + if ((value = parseStringForType(attribute.getValue(), targetFieldName, dataType)) != null) { + recordValues.put(targetFieldName, value); + } + + // dropUnknown == true && coerceTypes == false + } else { + recordValues.put(targetFieldName, attribute.getValue()); + } + } + } else { + + // dropUnknown == false && coerceTypes == true + if (coerceTypes) { + final Object value; + final Optional field = schema.getField(targetFieldName); + if (field.isPresent()) { + if ((value = parseStringForType(attribute.getValue(), targetFieldName, field.get().getDataType())) != null) { + recordValues.put(targetFieldName, value); + } + } else { + recordValues.put(targetFieldName, attribute.getValue()); + } + + // dropUnknown == false && coerceTypes == false + } else { + recordValues.put(targetFieldName, attribute.getValue()); + } + } + } + + // parse fields + while (xmlEventReader.hasNext()) { + final XMLEvent xmlEvent = xmlEventReader.nextEvent(); + + if (xmlEvent.isStartElement()) { + final StartElement subStartElement = xmlEvent.asStartElement(); + String fieldName = subStartElement.getName().getLocalPart(); + + final Optional field = schema.getField(fieldName); + + if (dropUnknown) { + if (field.isPresent()) { + // dropUnknown == true && coerceTypes == true + if (coerceTypes) { + final Object value = parseFieldForType(subStartElement, fieldName, field.get().getDataType(), recordValues, true); + if (value != null) { + recordValues.put(fieldName, value); + } + + // dropUnknown == true && coerceTypes == false + // subElements of subStartElement can only be known if there is a corresponding field in the schema defined as record + } + } else { + if (DATA_TAG.equals(fieldName)) { + Map namedValue = parseDataField(subStartElement, schema, true); + if (namedValue != null && !namedValue.isEmpty()) { + final String name = namedValue.keySet().iterator().next(); + recordValues.put(name, namedValue.get(name)); + } + } else { + skipElement(); + } + } + } else { + // dropUnknown == false && coerceTypes == true + if (coerceTypes) { + if (field.isPresent()) { + final Object value = parseFieldForType(subStartElement, fieldName, field.get().getDataType(), recordValues, false); + if (value != null) { + recordValues.put(fieldName, value); + } + } else { + if (DATA_TAG.equals(fieldName)) { + Map namedValue = parseDataField(subStartElement, schema, dropUnknown); + if (namedValue != null && !namedValue.isEmpty()) { + final String name = namedValue.keySet().iterator().next(); + recordValues.put(name, namedValue.get(name)); + } + } + } + + // dropUnknown == false && coerceTypes == false + } else { + if (DATA_TAG.equals(fieldName)) { + Map namedValue = parseDataField(subStartElement, schema, dropUnknown); + if (namedValue != null && !namedValue.isEmpty()) { + final String name = namedValue.keySet().iterator().next(); + recordValues.put(name, namedValue.get(name)); + } + } + } + } + } else if (xmlEvent.isEndElement()) { + break; + } + } + + for (final Map.Entry entry : recordValues.entrySet()) { + if (entry.getValue() instanceof List) { + recordValues.put(entry.getKey(), ((List) entry.getValue()).toArray()); + } + } + + if (recordValues.size() > 0) { + return new MapRecord(schema, recordValues); + } else { + return null; + } + } + + private Object parseStringForType(String data, String fieldName, DataType dataType) { + switch (dataType.getFieldType()) { + case BOOLEAN: + case BYTE: + case CHAR: + case DECIMAL: + case DOUBLE: + case FLOAT: + case INT: + case LONG: + case SHORT: + case STRING: + case DATE: + case TIME: + case TIMESTAMP: { + return DataTypeUtils.convertType(data, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } + } + return null; + } + + private Object parseFieldForType(StartElement startElement, String fieldName, DataType dataType, Map recordValues, + boolean dropUnknown) throws XMLStreamException, MalformedRecordException { + switch (dataType.getFieldType()) { + case BOOLEAN: + case BYTE: + case CHAR: + case DECIMAL: + case DOUBLE: + case FLOAT: + case INT: + case LONG: + case SHORT: + case STRING: + case DATE: + case TIME: + case TIMESTAMP: { + + StringBuilder content = new StringBuilder(); + + while (xmlEventReader.hasNext()) { + XMLEvent xmlEvent = xmlEventReader.nextEvent(); + if (xmlEvent.isCharacters()) { + final Characters characters = xmlEvent.asCharacters(); + if (!characters.isWhiteSpace()) { + content.append(characters.getData()); + } + } else if (xmlEvent.isEndElement()) { + final String contentToReturn = content.toString(); + + if (!StringUtils.isBlank(contentToReturn)) { + return DataTypeUtils.convertType(content.toString(), dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName); + } else { + return null; + } + + } else if (xmlEvent.isStartElement()) { + this.skipElement(); + } + } + break; + } + + case RECORD: { + final RecordSchema childSchema; + if (dataType instanceof RecordDataType) { + childSchema = ((RecordDataType) dataType).getChildSchema(); + } else { + return null; + } + + return parseRecord(startElement, childSchema, true, dropUnknown); + } + } + return null; + } + + private Map parseDataField(StartElement startElement, RecordSchema schema, boolean dropUnknown) throws XMLStreamException { + // Save reference to Data start element so we can continue to get the value/content + String content = getContent(); + + // Create field for the data point using attribute "Name" + String dataFieldName; + final Iterator iterator = startElement.getAttributes(); + if (!iterator.hasNext()) { + // If no Name attribute is provided, the Name is the content of the Data tag and there should be a following Binary tag + dataFieldName = content; + setNextRecordStartTag(); + String eventDataElementName = currentRecordStartTag.getName().getLocalPart(); + if (!BINARY_TAG.equals(eventDataElementName)) { + throw new XMLStreamException("Expecting tag containing data for element: " + dataFieldName); + } + content = getContent(); + + } else { + final Attribute attribute = (Attribute) iterator.next(); + final String attributeName = attribute.getName().getLocalPart(); + if (!"Name" .equals(attributeName)) { + throw new XMLStreamException("Expecting 'Name' attribute, actual: " + attributeName); + } + dataFieldName = attribute.getValue(); + } + + Optional rf = schema.getField(dataFieldName); + if (rf.isPresent()) { + return Collections.singletonMap(dataFieldName, + DataTypeUtils.convertType(content, rf.get().getDataType(), LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, dataFieldName)); + } else if (dropUnknown) { + return Collections.emptyMap(); + } else { + return Collections.singletonMap(dataFieldName, content); + } + } +} + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java index 0e2ccab893..c4e2e13d72 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java @@ -48,7 +48,7 @@ public class XmlSchemaInference extends HierarchicalSchemaInference { return inferTextualDataType(text); } - private DataType inferTextualDataType(final String text) { + public DataType inferTextualDataType(final String text) { if (text == null || text.isEmpty()) { return null; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 18bab0567b..88745e3bef 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -35,5 +35,6 @@ org.apache.nifi.syslog.Syslog5424Reader org.apache.nifi.xml.XMLReader org.apache.nifi.xml.XMLRecordSetWriter +org.apache.nifi.windowsevent.WindowsEventLogReader org.apache.nifi.schema.inference.VolatileSchemaCache \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.windowsevent.WindowsEventLogReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.windowsevent.WindowsEventLogReader/additionalDetails.html new file mode 100644 index 0000000000..7c85a5ce8e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.windowsevent.WindowsEventLogReader/additionalDetails.html @@ -0,0 +1,93 @@ + + + + + + WindowsEventLogReader + + + + + + +

Description:

+

This controller service is used to parse Windows Event Log events in the form of XML input (possibly from ConsumeWindowsEventLog or ParseEvtx). +

+

Input XML Example:

+

+

+<Event xmlns="https://schemas.microsoft.com/win/2004/08/events/event">
+  <System>
+    <Provider Name="Service Control Manager" Guid="{555908d1-a6d7-4695-8e1e-26931d2012f4}" EventSourceName="Service Control Manager"/>
+    <EventID Qualifiers="16384">7036</EventID>
+    <Version>0</Version>
+    <Level>4</Level>
+    <Task>0</Task>
+    <Opcode>0</Opcode>
+    <Keywords>0x8080000000000000</Keywords>
+    <TimeCreated SystemTime="2016-06-10T22:28:53.905233700Z"/>
+    <EventRecordID>34153</EventRecordID>
+    <Correlation/>
+    <Execution ProcessID="684" ThreadID="3504"/>
+    <Channel>System</Channel>
+    <Computer>WIN-O05CNUCF16M.hdf.local</Computer>
+    <Security/>
+  </System>
+  <EventData>
+    <Data Name="param1">Smart Card Device Enumeration Service</Data>
+    <Data>param2</Data>
+    <Binary>5300630044006500760069006300650045006E0075006D002F0034000000</Binary>
+  </EventData>
+</Event>
+             
+

+

Output example (using ConvertRecord with JsonRecordSetWriter

+

+

+[ {
+  "System" : {
+    "Provider" : {
+      "Guid" : "{555908d1-a6d7-4695-8e1e-26931d2012f4}",
+      "Name" : "Service Control Manager"
+    },
+    "EventID" : 7036,
+    "Version" : 0,
+    "Level" : 4,
+    "Task" : 0,
+    "Opcode" : 0,
+    "Keywords" : "0x8080000000000000",
+    "TimeCreated" : {
+      "SystemTime" : "2016-06-10T22:28:53.905233700Z"
+    },
+    "EventRecordID" : 34153,
+    "Correlation" : null,
+    "Execution" : {
+      "ThreadID" : 3504,
+      "ProcessID" : 684
+    },
+    "Channel" : "System",
+    "Computer" : "WIN-O05CNUCF16M.hdf.local",
+    "Security" : null
+  },
+  "EventData" : {
+    "param1" : "Smart Card Device Enumeration Service",
+    "param2" : "5300630044006500760069006300650045006E0075006D002F0034000000"
+  }
+} ]
+        
+

+ + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/windowsevent/TestWindowsEventLogRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/windowsevent/TestWindowsEventLogRecordReader.java new file mode 100644 index 0000000000..2e408c429a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/windowsevent/TestWindowsEventLogRecordReader.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.windowsevent; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestWindowsEventLogRecordReader { + + private static final String DATE_FORMAT = RecordFieldType.DATE.getDefaultFormat(); + private static final String TIME_FORMAT = RecordFieldType.TIME.getDefaultFormat(); + private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; // The timestamps have nanoseconds but need a SimpleDateFormat string here + + @Test + public void testSingleEvent() throws IOException, MalformedRecordException { + InputStream is = new BufferedInputStream(new FileInputStream("src/test/resources/windowseventlog/single_event.xml")); + WindowsEventLogRecordReader reader = new WindowsEventLogRecordReader(is, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, Mockito.mock(ComponentLog.class)); + Record r = reader.nextRecord(); + assertNotNull(r); + assertEquals(2, r.getValues().length); + + // Verify some System fields + Object childObj = r.getValue("System"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + Record childRecord = (Record) childObj; + assertEquals(14, childRecord.getValues().length); + childObj = childRecord.getValue("Provider"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(2, childRecord.getValues().length); + assertEquals("Microsoft-Windows-Security-Auditing", childRecord.getAsString("Name")); + + // Verify some EventData fields, including Data fields + childObj = r.getValue("EventData"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(4, childRecord.getValues().length); + assertEquals("DOMAIN", childRecord.getAsString("TargetDomainName")); + + assertNull(reader.nextRecord()); + } + + @Test + public void testSingleEventNoParent() throws IOException, MalformedRecordException { + InputStream is = new BufferedInputStream(new FileInputStream("src/test/resources/windowseventlog/single_event_no_parent.xml")); + WindowsEventLogRecordReader reader = new WindowsEventLogRecordReader(is, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, Mockito.mock(ComponentLog.class)); + Record r = reader.nextRecord(); + assertNotNull(r); + assertEquals(2, r.getValues().length); + + // Verify some System fields + Object childObj = r.getValue("System"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + Record childRecord = (Record) childObj; + assertEquals(14, childRecord.getValues().length); + childObj = childRecord.getValue("Provider"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(2, childRecord.getValues().length); + assertEquals("Microsoft-Windows-Security-Auditing", childRecord.getAsString("Name")); + + // Verify some EventData fields, including Data fields + childObj = r.getValue("EventData"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(4, childRecord.getValues().length); + assertEquals("DOMAIN", childRecord.getAsString("TargetDomainName")); + + assertNull(reader.nextRecord()); + } + + @Test + public void testMultipleEvents() throws IOException, MalformedRecordException { + InputStream is = new BufferedInputStream(new FileInputStream("src/test/resources/windowseventlog/multiple_events.xml")); + WindowsEventLogRecordReader reader = new WindowsEventLogRecordReader(is, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, Mockito.mock(ComponentLog.class)); + Record r = reader.nextRecord(true, true); + assertNotNull(r); + assertEquals(2, r.getValues().length); + + // Verify some System fields + Object childObj = r.getValue("System"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + Record childRecord = (Record) childObj; + assertEquals(14, childRecord.getValues().length); + childObj = childRecord.getValue("Provider"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(2, childRecord.getValues().length); + assertEquals("Microsoft-Windows-Security-Auditing", childRecord.getAsString("Name")); + + // Verify some EventData fields, including Data fields + childObj = r.getValue("EventData"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(16, childRecord.getValues().length); + assertEquals("DOMAIN", childRecord.getAsString("TargetDomainName")); + assertNull(childRecord.getValue("SubjectUserName")); + + // Verify next record + r = reader.nextRecord(true, true); + assertNotNull(r); + assertEquals(2, r.getValues().length); + childObj = r.getValue("System"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(14, childRecord.getValues().length); + childObj = childRecord.getValue("Provider"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(2, childRecord.getValues().length); + assertEquals("Microsoft-Windows-Security-Auditing", childRecord.getAsString("Name")); + + // Verify some EventData fields, including Data fields + childObj = r.getValue("EventData"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(16, childRecord.getValues().length); + assertEquals("DOMAIN", childRecord.getAsString("TargetDomainName")); + assertEquals("-", childRecord.getValue("SubjectUserName")); + + // Verify next record + r = reader.nextRecord(true, false); + assertNotNull(r); + assertEquals(2, r.getValues().length); + childObj = r.getValue("System"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(14, childRecord.getValues().length); + childObj = childRecord.getValue("Provider"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(2, childRecord.getValues().length); + assertEquals("Microsoft-Windows-Security-Auditing", childRecord.getAsString("Name")); + + // Verify some EventData fields, including Data fields + childObj = r.getValue("EventData"); + assertNotNull(childObj); + assertTrue(childObj instanceof Record); + childRecord = (Record) childObj; + assertEquals(16, childRecord.getValues().length); + assertNull(childRecord.getAsString("TargetDomainName")); + assertEquals("User", childRecord.getValue("SubjectUserName")); + assertEquals("D9060000", childRecord.getValue("SessionEnv")); + + assertNull(reader.nextRecord()); + } + + @Test(expected = MalformedRecordException.class) + public void testNotXmlInput() throws IOException, MalformedRecordException { + InputStream is = new ByteArrayInputStream("I am not XML" .getBytes(StandardCharsets.UTF_8)); + WindowsEventLogRecordReader reader = new WindowsEventLogRecordReader(is, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, Mockito.mock(ComponentLog.class)); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/multiple_events.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/multiple_events.xml new file mode 100644 index 0000000000..5b31fd961c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/multiple_events.xml @@ -0,0 +1,101 @@ + + + + + + + 4634 + 0 + 0 + 12545 + 0 + 0x8020000000000000 + + 239422 + + + Security + my.domain.com + + + + MYUSER$ + DOMAIN + 0x1624842 + 3 + + + + + + 4624 + 1 + 0 + 12544 + 0 + 0x8020000000000000 + + 239424 + + + Security + my.domain.com + + + + - + - + 0x0 + MYUSER$ + DOMAIN + 3 + - + {5EC82F53-CD1C-063B-EFC8-A191FD31993F} + - + - + 0 + 0x0 + - + Impersonation + + + + + + 5555 + 1 + 0 + 12345 + 0 + 0x8020000000000000 + + 239424 + + + Security + my.domain.com + + + + User + DOMAIN + SessionEnv + D9060000 + 0x0 + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/single_event.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/single_event.xml new file mode 100644 index 0000000000..7c2fafbf0f --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/single_event.xml @@ -0,0 +1,42 @@ + + + + + + + 4634 + 0 + 0 + 12545 + 0 + 0x8020000000000000 + + 239422 + + + Security + my.domain.com + + + + MYUSER$ + DOMAIN + 0x1624842 + 3 + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/single_event_no_parent.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/single_event_no_parent.xml new file mode 100644 index 0000000000..161f393531 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/windowseventlog/single_event_no_parent.xml @@ -0,0 +1,40 @@ + + + + + + 4634 + 0 + 0 + 12545 + 0 + 0x8020000000000000 + + 239422 + + + Security + my.domain.com + + + + MYUSER$ + DOMAIN + 0x1624842 + 3 + +