NIFI-12011 Added MIME Type to ExecuteStreamCommand and ExecuteProcess

This closes #7660

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matt Burgess 2023-08-29 18:41:03 -04:00 committed by exceptionfactory
parent 8c68e5d8cc
commit 24736f6276
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 53 additions and 11 deletions

View File

@ -33,6 +33,7 @@ import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -83,7 +84,8 @@ import java.util.concurrent.locks.ReentrantLock;
)
@WritesAttributes({
@WritesAttribute(attribute = "command", description = "Executed command"),
@WritesAttribute(attribute = "command.arguments", description = "Arguments of the command")
@WritesAttribute(attribute = "command.arguments", description = "Arguments of the command"),
@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type of the output if the 'Output MIME Type' property is set and 'Batch Duration' is not set")
})
public class ExecuteProcess extends AbstractProcessor {
@ -146,6 +148,13 @@ public class ExecuteProcess extends AbstractProcessor {
.defaultValue(" ")
.build();
static final PropertyDescriptor MIME_TYPE = new PropertyDescriptor.Builder()
.name("Output MIME type")
.displayName("Output MIME Type")
.description("Specifies the value to set for the \"mime.type\" attribute. This property is ignored if 'Batch Duration' is set.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -173,6 +182,7 @@ public class ExecuteProcess extends AbstractProcessor {
properties.add(REDIRECT_ERROR_STREAM);
properties.add(WORKING_DIR);
properties.add(ARG_DELIMITER);
properties.add(MIME_TYPE);
return properties;
}
@ -265,6 +275,7 @@ public class ExecuteProcess extends AbstractProcessor {
try {
longRunningProcess.get();
} catch (final InterruptedException ie) {
// Ignore
} catch (final ExecutionException ee) {
getLogger().error("Process execution failed due to {}", new Object[] { ee.getCause() });
}
@ -273,6 +284,7 @@ public class ExecuteProcess extends AbstractProcessor {
try {
TimeUnit.NANOSECONDS.sleep(batchNanos);
} catch (final InterruptedException ie) {
// Ignore
}
}
@ -290,15 +302,20 @@ public class ExecuteProcess extends AbstractProcessor {
session.remove(flowFile);
getLogger().error("Failed to read data from Process, so will not generate FlowFile");
} else {
// add command and arguments as attribute
flowFile = session.putAttribute(flowFile, ATTRIBUTE_COMMAND, command);
// add command, arguments, and MIME type as attributes
Map<String,String> attributes = new HashMap<>();
attributes.put(ATTRIBUTE_COMMAND, command);
if(arguments != null) {
flowFile = session.putAttribute(flowFile, ATTRIBUTE_COMMAND_ARGS, arguments);
attributes.put(ATTRIBUTE_COMMAND_ARGS, arguments);
}
if (batchNanos == null && context.getProperty(ExecuteProcess.MIME_TYPE).isSet()) {
attributes.put(CoreAttributes.MIME_TYPE.key(), context.getProperty(ExecuteProcess.MIME_TYPE).getValue());
}
flowFile = session.putAllAttributes(flowFile, attributes);
// All was good. Generate event and transfer FlowFile.
session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString);
getLogger().info("Created {} and routed to success", new Object[] { flowFile });
getLogger().info("Created {} and routed to success", flowFile);
session.transfer(flowFile, REL_SUCCESS);
}
}

View File

@ -41,6 +41,7 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -159,7 +160,8 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "execution.command", description = "The name of the command executed"),
@WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments. Sensitive properties will be masked"),
@WritesAttribute(attribute = "execution.status", description = "The exit status code returned from executing the command"),
@WritesAttribute(attribute = "execution.error", description = "Any error messages returned from executing the command")})
@WritesAttribute(attribute = "execution.error", description = "Any error messages returned from executing the command"),
@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type of the output if the 'Output MIME Type' property is set and 'Output Destination Attribute' is not set")})
@Restricted(
restrictions = {
@Restriction(
@ -274,6 +276,14 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.defaultValue("256")
.build();
static final PropertyDescriptor MIME_TYPE = new PropertyDescriptor.Builder()
.name("Output MIME Type")
.displayName("Output MIME Type")
.description("Specifies the value to set for the \"mime.type\" attribute. This property is ignored if 'Output Destination Attribute' is set.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private static final List<PropertyDescriptor> PROPERTIES;
private static final String MASKED_ARGUMENT = "********";
@ -287,6 +297,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
props.add(IGNORE_STDIN);
props.add(PUT_OUTPUT_IN_ATTRIBUTE);
props.add(PUT_ATTRIBUTE_MAX_LENGTH);
props.add(MIME_TYPE);
PROPERTIES = Collections.unmodifiableList(props);
Set<Relationship> outputStreamRelationships = new HashSet<>();
@ -516,6 +527,9 @@ public class ExecuteStreamCommand extends AbstractProcessor {
attributes.put("execution.status", Integer.toString(exitCode));
attributes.put("execution.command", executeCommand);
attributes.put("execution.command.args", commandArguments);
if (context.getProperty(MIME_TYPE).isSet() && !putToAttribute) {
attributes.put(CoreAttributes.MIME_TYPE.key(), context.getProperty(MIME_TYPE).getValue());
}
outputFlowFile = session.putAllAttributes(outputFlowFile, attributes);
if (NONZERO_STATUS_RELATIONSHIP.equals(outputFlowFileRelationship)) {

View File

@ -34,6 +34,8 @@ import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -64,6 +66,9 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
description = "Specifies an attribute on generated FlowFiles defined by the Dynamic Property's key and value." +
" If Expression Language is used, evaluation will be performed only once per batch of generated FlowFiles.")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type of the output if the 'Mime Type' property is set"),
})
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class GenerateFlowFile extends AbstractProcessor {

View File

@ -21,6 +21,7 @@ import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.util.LogMessage;
@ -90,6 +91,7 @@ public class TestExecuteProcess {
runner.setProperty(ExecuteProcess.COMMAND, "echo");
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "test-args");
runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis");
runner.setProperty(ExecuteProcess.MIME_TYPE, "application/json");
runner.run();
@ -97,6 +99,7 @@ public class TestExecuteProcess {
for (final MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile);
System.out.println(new String(flowFile.toByteArray()));
flowFile.assertAttributeNotExists(CoreAttributes.MIME_TYPE.key());
}
}
@ -124,7 +127,7 @@ public class TestExecuteProcess {
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
if(!flowFiles.isEmpty()) {
assertTrue(flowFiles.get(0).getAttribute("command").equals("ping"));
assertEquals("ping", flowFiles.get(0).getAttribute("command"));
}
}
@ -151,7 +154,6 @@ public class TestExecuteProcess {
for (final MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile);
totalFlowFilesSize += flowFile.getSize();
// System.out.println(new String(flowFile.toByteArray()));
}
assertEquals(inFile.length(), totalFlowFilesSize);
@ -198,12 +200,10 @@ public class TestExecuteProcess {
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
long totalFlowFilesSize = 0;
for (final MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile);
totalFlowFilesSize += flowFile.getSize();
// System.out.println(new String(flowFile.toByteArray()));
}
// assertEquals(inFile.length(), totalFlowFilesSize);
assertEquals(inFile.length(), totalFlowFilesSize);
}
@Test
@ -233,6 +233,7 @@ public class TestExecuteProcess {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
final List<LogMessage> warnMessages = runner.getLogger().getWarnMessages();

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
@ -85,6 +86,7 @@ public class TestExecuteStreamCommand {
String jarPath = exJar.getAbsolutePath();
exJar.setExecutable(true);
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
controller.setProperty(ExecuteStreamCommand.MIME_TYPE.getName(), "text/plain");
controller.enqueue(dummy.toPath());
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
@ -103,6 +105,7 @@ public class TestExecuteStreamCommand {
String attribute = outputFlowFile.getAttribute("execution.command.args");
String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar";
assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
outputFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain");
MockFlowFile originalFlowFile = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP).get(0);
assertEquals(outputFlowFile.getAttribute("execution.status"), originalFlowFile.getAttribute("execution.status"));
@ -506,6 +509,7 @@ public class TestExecuteStreamCommand {
File dummy = new File("src/test/resources/hello.txt");
assertTrue(dummy.exists());
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
controller.setProperty(ExecuteStreamCommand.MIME_TYPE, "application/json");
controller.enqueue("".getBytes());
if(isWindows()) {
@ -526,6 +530,7 @@ public class TestExecuteStreamCommand {
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
MockFlowFile outputFlowFile = flowFiles.get(0);
outputFlowFile.assertContentEquals("");
outputFlowFile.assertAttributeNotExists(CoreAttributes.MIME_TYPE.key());
String ouput = outputFlowFile.getAttribute("executeStreamCommand.output");
assertTrue(ouput.startsWith("Hello"));
assertEquals("0", outputFlowFile.getAttribute("execution.status"));