NIFI-4081 - Added raw message option in GrokReader

This closes #1921.
This commit is contained in:
Pierre Villard 2017-06-16 00:09:45 +02:00 committed by Mark Payne
parent 1f67cbf628
commit 6559604456
3 changed files with 57 additions and 14 deletions

View File

@ -61,7 +61,8 @@ import io.thekraken.grok.api.exception.GrokException;
+ "If a line in the input does not match the expected message pattern, the line of text is either considered to be part of the previous "
+ "message or is skipped, depending on the configuration, with the exception of stack traces. A stack trace that is found at the end of "
+ "a log message is considered to be part of the previous message but is added to the 'stackTrace' field of the Record. If a record has "
+ "no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String).")
+ "no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String). "
+ "Assuming that the schema includes a '_raw' field of type String, the raw message will be included in the Record.")
public class GrokReader extends SchemaRegistryService implements RecordReaderFactory {
private volatile Grok grok;
private volatile boolean appendUnmatchedLine;
@ -150,6 +151,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
populateSchemaFieldNames(grok, grokExpression, fields);
fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType()));
fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
return schema;
@ -241,4 +243,4 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
final RecordSchema schema = getSchema(variables, in, null);
return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, appendUnmatchedLine);
}
}
}

View File

@ -50,6 +50,8 @@ public class GrokRecordReader implements RecordReader {
private String nextLine;
static final String STACK_TRACE_COLUMN_NAME = "stackTrace";
static final String RAW_MESSAGE_NAME = "_raw";
private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
"^\\s*(?:(?: |\\t)+at )|"
+ "(?:(?: |\\t)+\\[CIRCULAR REFERENCE\\:)|"
@ -73,8 +75,11 @@ public class GrokRecordReader implements RecordReader {
@Override
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
Map<String, Object> valueMap = null;
StringBuilder raw = new StringBuilder();
while (valueMap == null || valueMap.isEmpty()) {
final String line = nextLine == null ? reader.readLine() : nextLine;
raw.append(line);
nextLine = null; // ensure that we don't process nextLine again
if (line == null) {
return null;
@ -98,9 +103,11 @@ public class GrokRecordReader implements RecordReader {
// the stack trace ends. Otherwise, append the next line to the last field in the record.
if (isStartOfStackTrace(nextLine)) {
stackTrace = readStackTrace(nextLine);
raw.append("\n").append(stackTrace);
break;
} else if (append) {
trailingText.append("\n").append(nextLine);
raw.append("\n").append(nextLine);
}
} else {
// The next line matched our pattern.
@ -108,11 +115,11 @@ public class GrokRecordReader implements RecordReader {
}
}
final Record record = createRecord(valueMap, trailingText, stackTrace, coerceTypes, dropUnknownFields);
final Record record = createRecord(valueMap, trailingText, stackTrace, raw.toString(), coerceTypes, dropUnknownFields);
return record;
}
private Record createRecord(final Map<String, Object> valueMap, final StringBuilder trailingText, final String stackTrace, final boolean coerceTypes, final boolean dropUnknown) {
private Record createRecord(final Map<String, Object> valueMap, final StringBuilder trailingText, final String stackTrace, final String raw, final boolean coerceTypes, final boolean dropUnknown) {
final Map<String, Object> converted = new HashMap<>();
for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
final String fieldName = entry.getKey();
@ -179,6 +186,8 @@ public class GrokRecordReader implements RecordReader {
}
converted.put(STACK_TRACE_COLUMN_NAME, stackTrace);
converted.put(RAW_MESSAGE_NAME, raw);
return new MapRecord(schema, converted);
}
@ -257,4 +266,4 @@ public class GrokRecordReader implements RecordReader {
return schema;
}
}
}

View File

@ -52,19 +52,24 @@ public class TestGrokRecordReader {
final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"};
final String[] messages = new String[] {"Test Message 1", "Red", "Green", "Blue", "Yellow"};
final String[] rawMessages = new String[] {"2016-11-08 21:24:23,029 INFO Test Message 1",
"2016-11-08 21:24:23,029 WARN Red", "2016-11-08 21:24:23,029 ERROR Green",
"2016-11-08 21:24:23,029 FATAL Blue", "2016-11-08 21:24:23,029 FINE Yellow"};
for (int i = 0; i < logLevels.length; i++) {
final Object[] values = deserializer.nextRecord().getValues();
assertNotNull(values);
assertEquals(4, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE
assertEquals(5, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE
assertEquals("2016-11-08 21:24:23,029", values[0]);
assertEquals(logLevels[i], values[1]);
assertEquals(messages[i], values[2]);
assertNull(values[3]);
assertEquals(rawMessages[i], values[4]);
}
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@ -83,13 +88,16 @@ public class TestGrokRecordReader {
final Object[] values = deserializer.nextRecord().getValues();
assertNotNull(values);
assertEquals(6, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE
assertEquals(7, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE
assertEquals("2016-08-04 13:26:32,473", values[0]);
assertEquals("INFO", values[1]);
assertEquals("Leader Election Notification Thread-1", values[2]);
assertEquals("o.a.n.LoggerClass", values[3]);
assertEquals("", values[4]);
assertEquals("org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces", values[5]);
assertEquals(msg, values[6]);
deserializer.close();
}
@ -109,12 +117,14 @@ public class TestGrokRecordReader {
final Object[] values = deserializer.nextRecord().getValues();
assertNotNull(values);
assertEquals(6, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE
assertEquals(7, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
assertEquals(logLevels[i], values[1]);
assertNull(values[5]);
assertNotNull(values[6]);
}
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@ -134,18 +144,23 @@ public class TestGrokRecordReader {
final Object[] values = record.getValues();
assertNotNull(values);
assertEquals(6, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE
assertEquals(7, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
assertEquals(logLevels[i], values[1]);
if ("ERROR".equals(values[1])) {
final String msg = (String) values[4];
assertEquals("One\nTwo\nThree", msg);
assertNotNull(values[5]);
assertTrue(values[6].toString().startsWith("2016-08-04 13:26:32,474 ERROR [Leader Election Notification Thread-2] o.apache.nifi.controller.FlowController One"));
assertTrue(values[6].toString().endsWith(" at org.apache.nifi.cluster."
+ "coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) "
+ "[nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]\n ... 12 common frames omitted"));
} else {
assertNull(values[5]);
}
}
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@ -168,9 +183,10 @@ public class TestGrokRecordReader {
final Object[] values = deserializer.nextRecord().getValues();
assertNotNull(values);
assertEquals(4, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE
assertEquals(5, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE
assertEquals(logLevels[i], values[1]);
assertEquals(messages[i], values[2]);
assertNotNull(values[4]);
if (values[1].equals("ERROR")) {
final String stackTrace = (String) values[3];
@ -182,10 +198,21 @@ public class TestGrokRecordReader {
assertTrue(stackTrace.contains("at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress("
+ "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]"));
assertTrue(stackTrace.endsWith(" ... 12 common frames omitted"));
final String raw = (String) values[4];
assertTrue(raw.startsWith("2016-11-23 16:00:02,689 ERROR Log message with stack trace"));
assertTrue(raw.contains("org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces"));
assertTrue(raw.contains(" at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress("
+ "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]"));
assertTrue(raw.contains("Caused by: org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces"));
assertTrue(raw.contains("at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress("
+ "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]"));
assertTrue(raw.endsWith(" ... 12 common frames omitted"));
}
}
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@ -201,7 +228,7 @@ public class TestGrokRecordReader {
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(8, fieldNames.size());
assertEquals(9, fieldNames.size());
assertTrue(fieldNames.contains("timestamp"));
assertTrue(fieldNames.contains("logsource"));
assertTrue(fieldNames.contains("facility"));
@ -210,6 +237,7 @@ public class TestGrokRecordReader {
assertTrue(fieldNames.contains("pid"));
assertTrue(fieldNames.contains("message"));
assertTrue(fieldNames.contains("stackTrace")); // always implicitly there
assertTrue(fieldNames.contains("_raw")); // always implicitly there
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, schema, true);
final Record record = deserializer.nextRecord();
@ -221,8 +249,10 @@ public class TestGrokRecordReader {
assertEquals("nifi", record.getValue("program"));
assertEquals("12345", record.getValue("pid"));
assertEquals("My Message", record.getValue("message"));
assertEquals("May 22 15:58:23 my-host nifi[12345]:My Message", record.getValue("_raw"));
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@ -241,7 +271,7 @@ public class TestGrokRecordReader {
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(6, fieldNames.size());
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
assertTrue(fieldNames.contains("second"));
assertTrue(fieldNames.contains("third"));
@ -258,6 +288,7 @@ public class TestGrokRecordReader {
assertEquals("5", record.getValue("fifth"));
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
@ -276,7 +307,7 @@ public class TestGrokRecordReader {
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
assertEquals(6, fieldNames.size());
assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
assertTrue(fieldNames.contains("second"));
assertTrue(fieldNames.contains("third"));
@ -295,6 +326,7 @@ public class TestGrokRecordReader {
}
assertNull(deserializer.nextRecord());
deserializer.close();
}
}
}
}