diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java index c73aa9146b..07bc9ec63f 100644 --- a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java @@ -140,7 +140,7 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor { SUCCESS, FAILURE, GRAPH ))); - public static final String RECORD_COUNT = "records.count"; + public static final String RECORD_COUNT = "record.count"; public static final String GRAPH_OPERATION_TIME = "graph.operations.took"; private volatile RecordPathCache recordPathCache; @@ -167,12 +167,25 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor { recordPathCache = new RecordPathCache(100); } - private List getRecordValue(Record record, RecordPath recordPath){ + private Object getRecordValue(Record record, RecordPath recordPath){ final RecordPathResult result = recordPath.evaluate(record); - return result.getSelectedFields() - .filter(fv -> fv.getValue() != null) - .map(FieldValue::getValue) - .collect( Collectors.toList()); + final List values = result.getSelectedFields().collect(Collectors.toList()); + if (values != null && !values.isEmpty()) { + if (values.size() == 1) { + Object raw = values.get(0).getValue(); + + if (raw != null && raw.getClass().isArray()) { + Object[] arr = (Object[]) raw; + raw = Arrays.asList(arr); + } + + return raw; + } else { + return values.stream().map(fv -> fv.getValue()).collect(Collectors.toList()); + } + } else { + return null; + } } @Override @@ -211,6 +224,7 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor { Record record; long start = System.currentTimeMillis(); failedWriter.beginRecordSet(); + int records = 0; while ((record = reader.nextRecord()) != null) { FlowFile graph = session.create(input); @@ -223,6 +237,9 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor { } dynamicPropertyMap.putAll(input.getAttributes()); + if (getLogger().isDebugEnabled()) { + getLogger().debug("Dynamic Properties: {}", new Object[]{dynamicPropertyMap}); + } List> graphResponses = new ArrayList<>(executeQuery(recordScript, dynamicPropertyMap)); OutputStream graphOutputStream = session.write(graph); @@ -231,15 +248,18 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor { graphOutputStream.close(); session.transfer(graph, GRAPH); } catch (Exception e) { + getLogger().error("Error processing record at index " + records, e); // write failed records to a flowfile destined for the failure relationship failedWriter.write(record); session.remove(graph); + } finally { + records++; } } long end = System.currentTimeMillis(); delta = (end - start) / 1000; if (getLogger().isDebugEnabled()){ - getLogger().debug(String.format("Took %s seconds.", delta)); + getLogger().debug(String.format("Took %s seconds.\nHandled %d records", delta, records)); } failedWriteResult = failedWriter.finishRecordSet(); failedWriter.flush(); diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java index d16507c5a2..b83242333b 100644 --- a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java +++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.ArrayList; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class ExecuteGraphQueryRecordTest { @@ -67,7 +68,7 @@ public class ExecuteGraphQueryRecordTest { byte[] json = JsonOutput.toJson(test).getBytes(); String submissionScript; - submissionScript = "[ 'M': M[0] ]"; + submissionScript = "[ 'M': M ]"; runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript); runner.setProperty("M", "/M"); @@ -98,7 +99,7 @@ public class ExecuteGraphQueryRecordTest { byte[] json = JsonOutput.toJson(test).getBytes(); String submissionScript = "[ " + - "'M': M[0] " + + "'M': M " + "]"; runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript); @@ -131,8 +132,8 @@ public class ExecuteGraphQueryRecordTest { byte[] json = JsonOutput.toJson(test).getBytes(); String submissionScript = "Map vertexHashes = new HashMap()\n" + - "vertexHashes.put('1234', tMap[0])\n" + - "[ 'L': L[0], 'result': vertexHashes ]"; + "vertexHashes.put('1234', tMap)\n" + + "[ 'L': L, 'result': vertexHashes ]"; runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript); runner.setProperty("tMap", "/tMap"); runner.setProperty("L", "/L"); @@ -181,6 +182,11 @@ public class ExecuteGraphQueryRecordTest { List> expected = mapper.readValue(expectedRaw, List.class); List> content = mapper.readValue(contentRaw, List.class); + assertEquals(expected.size(), content.size()); + for (int x = 0; x < content.size(); x++) { + assertEquals(expected.get(x), content.get(x)); + } + return expected.equals(content); }