Removing the batched get of flowfiles to utilize the framework provided batching support

This commit is contained in:
Aldrin Piri 2015-02-20 16:05:16 -05:00
parent 2862771235
commit 81234f3a6d
1 changed files with 55 additions and 60 deletions

View File

@ -151,8 +151,8 @@ public class EvaluateJsonPath extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
List<FlowFile> flowFiles = processSession.get(50);
if (flowFiles.isEmpty()) {
FlowFile flowFile = processSession.get();
if (flowFile == null) {
return;
}
@ -175,66 +175,61 @@ public class EvaluateJsonPath extends AbstractProcessor {
returnType = destination.equals(DESTINATION_CONTENT) ? RETURN_TYPE_JSON : RETURN_TYPE_SCALAR;
}
flowFileLoop:
for (FlowFile flowFile : flowFiles) {
final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, flowFile);
final DocumentContext documentContext = JsonUtils.validateAndEstablishJsonContext(processSession, flowFile);
if (documentContext == null) {
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
processSession.transfer(flowFile, REL_FAILURE);
continue flowFileLoop;
}
final Map<String, String> jsonPathResults = new HashMap<>();
jsonPathEvalLoop:
for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) {
String jsonPathAttrKey = attributeJsonPathEntry.getKey();
JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
try {
Object result = documentContext.read(jsonPathExp);
if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result)) {
logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()});
processSession.transfer(flowFile, REL_FAILURE);
continue flowFileLoop;
}
resultHolder.set(result);
} catch (PathNotFoundException e) {
logger.warn("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
if (destination.equals(DESTINATION_ATTRIBUTE)) {
jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
continue jsonPathEvalLoop;
} else {
processSession.transfer(flowFile, REL_NO_MATCH);
continue flowFileLoop;
}
}
final String resultRepresentation = JsonUtils.getResultRepresentation(resultHolder.get());
switch (destination) {
case DESTINATION_ATTRIBUTE:
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
break;
case DESTINATION_CONTENT:
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
}
}
});
break;
}
}
flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
processSession.transfer(flowFile, REL_MATCH);
if (documentContext == null) {
logger.error("FlowFile {} did not have valid JSON content.", new Object[]{flowFile});
processSession.transfer(flowFile, REL_FAILURE);
return;
}
final Map<String, String> jsonPathResults = new HashMap<>();
jsonPathEvalLoop:
for (final Map.Entry<String, JsonPath> attributeJsonPathEntry : attributeToJsonPathMap.entrySet()) {
String jsonPathAttrKey = attributeJsonPathEntry.getKey();
JsonPath jsonPathExp = attributeJsonPathEntry.getValue();
final ObjectHolder<Object> resultHolder = new ObjectHolder<>(null);
try {
Object result = documentContext.read(jsonPathExp);
if (returnType.equals(RETURN_TYPE_SCALAR) && !JsonUtils.isJsonScalar(result)) {
logger.error("Unable to return a scalar value for the expression {} for FlowFile {}. Evaluated value was {}. Transferring to {}.",
new Object[]{jsonPathExp.getPath(), flowFile.getId(), result.toString(), REL_FAILURE.getName()});
processSession.transfer(flowFile, REL_FAILURE);
return;
}
resultHolder.set(result);
} catch (PathNotFoundException e) {
logger.warn("FlowFile {} could not find path {} for attribute key {}.", new Object[]{flowFile.getId(), jsonPathExp.getPath(), jsonPathAttrKey}, e);
if (destination.equals(DESTINATION_ATTRIBUTE)) {
jsonPathResults.put(jsonPathAttrKey, StringUtils.EMPTY);
continue jsonPathEvalLoop;
} else {
processSession.transfer(flowFile, REL_NO_MATCH);
return;
}
}
final String resultRepresentation = JsonUtils.getResultRepresentation(resultHolder.get());
switch (destination) {
case DESTINATION_ATTRIBUTE:
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
case DESTINATION_CONTENT:
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(resultRepresentation.getBytes(StandardCharsets.UTF_8));
}
}
});
break;
}
}
flowFile = processSession.putAllAttributes(flowFile, jsonPathResults);
processSession.transfer(flowFile, REL_MATCH);
}
}