NIFI-7906 Fixed a bug in how record paths were handled.

NIFI-7906 Fixed bugs in unit tests.

NIFI-7906 Updated a few things based on a code review.

NIFI-7906: Fixed typo in record.count attribute

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4739
This commit is contained in:
Mike Thomsen 2020-12-03 15:50:53 -05:00 committed by Matthew Burgess
parent 5f7558cecf
commit 82c84492ce
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 37 additions and 11 deletions

View File

@ -140,7 +140,7 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
SUCCESS, FAILURE, GRAPH SUCCESS, FAILURE, GRAPH
))); )));
public static final String RECORD_COUNT = "records.count"; public static final String RECORD_COUNT = "record.count";
public static final String GRAPH_OPERATION_TIME = "graph.operations.took"; public static final String GRAPH_OPERATION_TIME = "graph.operations.took";
private volatile RecordPathCache recordPathCache; private volatile RecordPathCache recordPathCache;
@ -167,12 +167,25 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
recordPathCache = new RecordPathCache(100); recordPathCache = new RecordPathCache(100);
} }
private List<Object> getRecordValue(Record record, RecordPath recordPath){ private Object getRecordValue(Record record, RecordPath recordPath){
final RecordPathResult result = recordPath.evaluate(record); final RecordPathResult result = recordPath.evaluate(record);
return result.getSelectedFields() final List<FieldValue> values = result.getSelectedFields().collect(Collectors.toList());
.filter(fv -> fv.getValue() != null) if (values != null && !values.isEmpty()) {
.map(FieldValue::getValue) if (values.size() == 1) {
.collect( Collectors.toList()); Object raw = values.get(0).getValue();
if (raw != null && raw.getClass().isArray()) {
Object[] arr = (Object[]) raw;
raw = Arrays.asList(arr);
}
return raw;
} else {
return values.stream().map(fv -> fv.getValue()).collect(Collectors.toList());
}
} else {
return null;
}
} }
@Override @Override
@ -211,6 +224,7 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
Record record; Record record;
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
failedWriter.beginRecordSet(); failedWriter.beginRecordSet();
int records = 0;
while ((record = reader.nextRecord()) != null) { while ((record = reader.nextRecord()) != null) {
FlowFile graph = session.create(input); FlowFile graph = session.create(input);
@ -223,6 +237,9 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
} }
dynamicPropertyMap.putAll(input.getAttributes()); dynamicPropertyMap.putAll(input.getAttributes());
if (getLogger().isDebugEnabled()) {
getLogger().debug("Dynamic Properties: {}", new Object[]{dynamicPropertyMap});
}
List<Map<String, Object>> graphResponses = new ArrayList<>(executeQuery(recordScript, dynamicPropertyMap)); List<Map<String, Object>> graphResponses = new ArrayList<>(executeQuery(recordScript, dynamicPropertyMap));
OutputStream graphOutputStream = session.write(graph); OutputStream graphOutputStream = session.write(graph);
@ -231,15 +248,18 @@ public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
graphOutputStream.close(); graphOutputStream.close();
session.transfer(graph, GRAPH); session.transfer(graph, GRAPH);
} catch (Exception e) { } catch (Exception e) {
getLogger().error("Error processing record at index " + records, e);
// write failed records to a flowfile destined for the failure relationship // write failed records to a flowfile destined for the failure relationship
failedWriter.write(record); failedWriter.write(record);
session.remove(graph); session.remove(graph);
} finally {
records++;
} }
} }
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
delta = (end - start) / 1000; delta = (end - start) / 1000;
if (getLogger().isDebugEnabled()){ if (getLogger().isDebugEnabled()){
getLogger().debug(String.format("Took %s seconds.", delta)); getLogger().debug(String.format("Took %s seconds.\nHandled %d records", delta, records));
} }
failedWriteResult = failedWriter.finishRecordSet(); failedWriteResult = failedWriter.finishRecordSet();
failedWriter.flush(); failedWriter.flush();

View File

@ -37,6 +37,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class ExecuteGraphQueryRecordTest { public class ExecuteGraphQueryRecordTest {
@ -67,7 +68,7 @@ public class ExecuteGraphQueryRecordTest {
byte[] json = JsonOutput.toJson(test).getBytes(); byte[] json = JsonOutput.toJson(test).getBytes();
String submissionScript; String submissionScript;
submissionScript = "[ 'M': M[0] ]"; submissionScript = "[ 'M': M ]";
runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript); runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
runner.setProperty("M", "/M"); runner.setProperty("M", "/M");
@ -98,7 +99,7 @@ public class ExecuteGraphQueryRecordTest {
byte[] json = JsonOutput.toJson(test).getBytes(); byte[] json = JsonOutput.toJson(test).getBytes();
String submissionScript = "[ " + String submissionScript = "[ " +
"'M': M[0] " + "'M': M " +
"]"; "]";
runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript); runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
@ -131,8 +132,8 @@ public class ExecuteGraphQueryRecordTest {
byte[] json = JsonOutput.toJson(test).getBytes(); byte[] json = JsonOutput.toJson(test).getBytes();
String submissionScript = "Map<String, Object> vertexHashes = new HashMap()\n" + String submissionScript = "Map<String, Object> vertexHashes = new HashMap()\n" +
"vertexHashes.put('1234', tMap[0])\n" + "vertexHashes.put('1234', tMap)\n" +
"[ 'L': L[0], 'result': vertexHashes ]"; "[ 'L': L, 'result': vertexHashes ]";
runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript); runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, submissionScript);
runner.setProperty("tMap", "/tMap"); runner.setProperty("tMap", "/tMap");
runner.setProperty("L", "/L"); runner.setProperty("L", "/L");
@ -181,6 +182,11 @@ public class ExecuteGraphQueryRecordTest {
List<Map<String, Object>> expected = mapper.readValue(expectedRaw, List.class); List<Map<String, Object>> expected = mapper.readValue(expectedRaw, List.class);
List<Map<String, Object>> content = mapper.readValue(contentRaw, List.class); List<Map<String, Object>> content = mapper.readValue(contentRaw, List.class);
assertEquals(expected.size(), content.size());
for (int x = 0; x < content.size(); x++) {
assertEquals(expected.get(x), content.get(x));
}
return expected.equals(content); return expected.equals(content);
} }