mirror of https://github.com/apache/nifi.git
NIFI-421 ExecuteProcess back-pressure support, version 1b
Signed-off-by: Toivo Adams <toivo.adams@gmail.com> Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
20f11b1a77
commit
ad98ac50ca
|
@ -116,6 +116,9 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
private volatile ExecutorService executor;
|
private volatile ExecutorService executor;
|
||||||
|
private Future<?> longRunningProcess;
|
||||||
|
private AtomicBoolean failure = new AtomicBoolean(false);
|
||||||
|
private volatile ProxyOutputStream proxyOut;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<Relationship> getRelationships() {
|
public Set<Relationship> getRelationships() {
|
||||||
|
@ -209,15 +212,105 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
|
final long startNanos = System.nanoTime();
|
||||||
|
|
||||||
|
if (proxyOut==null)
|
||||||
|
proxyOut = new ProxyOutputStream(getLogger());
|
||||||
|
|
||||||
|
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
|
||||||
|
|
||||||
|
final List<String> commandStrings = createCommandStrings(context);
|
||||||
|
final String commandString = StringUtils.join(commandStrings, " ");
|
||||||
|
|
||||||
|
if (longRunningProcess == null || longRunningProcess.isDone())
|
||||||
|
try {
|
||||||
|
longRunningProcess = launchProcess(context, commandStrings, batchNanos, proxyOut);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().error("Failed to create process due to {}", new Object[] { ioe });
|
||||||
|
context.yield();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
getLogger().info("Read from long running process");
|
||||||
|
|
||||||
|
if (!isScheduled()) {
|
||||||
|
getLogger().info("User stopped processor; will terminate process immediately");
|
||||||
|
longRunningProcess.cancel(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a FlowFile that we can write to and set the OutputStream for
|
||||||
|
// the FlowFile
|
||||||
|
// as the delegate for the ProxyOuptutStream, then wait until the
|
||||||
|
// process finishes
|
||||||
|
// or until the specified amount of time
|
||||||
|
FlowFile flowFile = session.create();
|
||||||
|
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(final OutputStream flowFileOut) throws IOException {
|
||||||
|
try (final OutputStream out = new BufferedOutputStream(flowFileOut)) {
|
||||||
|
proxyOut.setDelegate(out);
|
||||||
|
|
||||||
|
if (batchNanos == null) {
|
||||||
|
// we are not creating batches; wait until process
|
||||||
|
// terminates.
|
||||||
|
// NB!!! Maybe get(long timeout, TimeUnit unit) should
|
||||||
|
// be used to avoid waiting forever.
|
||||||
|
try {
|
||||||
|
longRunningProcess.get();
|
||||||
|
} catch (final InterruptedException ie) {
|
||||||
|
} catch (final ExecutionException ee) {
|
||||||
|
getLogger().error("Process execution failed due to {}", new Object[] { ee.getCause() });
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// wait the allotted amount of time.
|
||||||
|
try {
|
||||||
|
TimeUnit.NANOSECONDS.sleep(batchNanos);
|
||||||
|
} catch (final InterruptedException ie) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
proxyOut.setDelegate(null); // prevent from writing to this
|
||||||
|
// stream
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (flowFile.getSize() == 0L) {
|
||||||
|
// If no data was written to the file, remove it
|
||||||
|
session.remove(flowFile);
|
||||||
|
} else if (failure.get()) {
|
||||||
|
// If there was a failure processing the output of the Process,
|
||||||
|
// remove the FlowFile
|
||||||
|
session.remove(flowFile);
|
||||||
|
getLogger().error("Failed to read data from Process, so will not generate FlowFile");
|
||||||
|
} else {
|
||||||
|
// All was good. Generate event and transfer FlowFile.
|
||||||
|
session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString);
|
||||||
|
getLogger().info("Created {} and routed to success", new Object[] { flowFile });
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit the session so that the FlowFile is transferred to the next
|
||||||
|
// processor
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<String> createCommandStrings(final ProcessContext context) {
|
||||||
|
|
||||||
final String command = context.getProperty(COMMAND).getValue();
|
final String command = context.getProperty(COMMAND).getValue();
|
||||||
final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
|
final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
|
||||||
final Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean();
|
|
||||||
|
|
||||||
final List<String> commandStrings = new ArrayList<>(args.size() + 1);
|
final List<String> commandStrings = new ArrayList<>(args.size() + 1);
|
||||||
commandStrings.add(command);
|
commandStrings.add(command);
|
||||||
commandStrings.addAll(args);
|
commandStrings.addAll(args);
|
||||||
|
return commandStrings;
|
||||||
|
}
|
||||||
|
|
||||||
final String commandString = StringUtils.join(commandStrings, " ");
|
protected Future<?> launchProcess(final ProcessContext context, final List<String> commandStrings, final Long batchNanos,
|
||||||
|
final ProxyOutputStream proxyOut) throws IOException {
|
||||||
|
|
||||||
|
final Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean();
|
||||||
|
|
||||||
final ProcessBuilder builder = new ProcessBuilder(commandStrings);
|
final ProcessBuilder builder = new ProcessBuilder(commandStrings);
|
||||||
final String workingDirName = context.getProperty(WORKING_DIR).getValue();
|
final String workingDirName = context.getProperty(WORKING_DIR).getValue();
|
||||||
|
@ -236,24 +329,15 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
builder.environment().putAll(environment);
|
builder.environment().putAll(environment);
|
||||||
}
|
}
|
||||||
|
|
||||||
final long startNanos = System.nanoTime();
|
getLogger().info("Start creating new Process > {} ", new Object[] { commandStrings });
|
||||||
final Process process;
|
final Process newProcess = builder.redirectErrorStream(redirectErrorStream).start();
|
||||||
try {
|
|
||||||
process = builder.redirectErrorStream(redirectErrorStream).start();
|
|
||||||
} catch (final IOException ioe) {
|
|
||||||
getLogger().error("Failed to create process due to {}", new Object[]{ioe});
|
|
||||||
context.yield();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
|
|
||||||
|
|
||||||
// Submit task to read error stream from process
|
// Submit task to read error stream from process
|
||||||
if (!redirectErrorStream) {
|
if (!redirectErrorStream) {
|
||||||
executor.submit(new Runnable() {
|
executor.submit(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
|
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getErrorStream()))) {
|
||||||
while (reader.read() >= 0) {
|
while (reader.read() >= 0) {
|
||||||
}
|
}
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
|
@ -263,19 +347,25 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Submit task to read output of Process and write to FlowFile.
|
// Submit task to read output of Process and write to FlowFile.
|
||||||
final ProxyOutputStream proxyOut = new ProxyOutputStream(getLogger());
|
failure = new AtomicBoolean(false);
|
||||||
final AtomicBoolean failure = new AtomicBoolean(false);
|
|
||||||
final AtomicBoolean finishedCopying = new AtomicBoolean(false);
|
|
||||||
final Future<?> future = executor.submit(new Callable<Object>() {
|
final Future<?> future = executor.submit(new Callable<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object call() throws IOException {
|
public Object call() throws IOException {
|
||||||
try {
|
try {
|
||||||
if (batchNanos == null) {
|
if (batchNanos == null) {
|
||||||
// if we aren't batching, just copy the stream from the process to the flowfile.
|
// if we aren't batching, just copy the stream from the
|
||||||
try (final BufferedInputStream bufferedIn = new BufferedInputStream(process.getInputStream())) {
|
// process to the flowfile.
|
||||||
|
try (final BufferedInputStream bufferedIn = new BufferedInputStream(newProcess.getInputStream())) {
|
||||||
final byte[] buffer = new byte[4096];
|
final byte[] buffer = new byte[4096];
|
||||||
int len;
|
int len;
|
||||||
while ((len = bufferedIn.read(buffer)) > 0) {
|
while ((len = bufferedIn.read(buffer)) > 0) {
|
||||||
|
|
||||||
|
// NB!!!! Maybe all data should be read from
|
||||||
|
// input stream in case of !isScheduled() to
|
||||||
|
// avoid subprocess deadlock?
|
||||||
|
// (we just don't write data to proxyOut)
|
||||||
|
// Or because we don't use this subprocess
|
||||||
|
// anymore anyway, we don't care?
|
||||||
if (!isScheduled()) {
|
if (!isScheduled()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -284,12 +374,16 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// we are batching, which means that the output of the process is text. It doesn't make sense to grab
|
// we are batching, which means that the output of the
|
||||||
// arbitrary batches of bytes from some process and send it along as a piece of data, so we assume that
|
// process is text. It doesn't make sense to grab
|
||||||
|
// arbitrary batches of bytes from some process and send
|
||||||
|
// it along as a piece of data, so we assume that
|
||||||
// setting a batch during means text.
|
// setting a batch during means text.
|
||||||
// Also, we don't want that text to get split up in the middle of a line, so we use BufferedReader
|
// Also, we don't want that text to get split up in the
|
||||||
// to read lines of text and write them as lines of text.
|
// middle of a line, so we use BufferedReader
|
||||||
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
|
// to read lines of text and write them as lines of
|
||||||
|
// text.
|
||||||
|
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getInputStream()))) {
|
||||||
String line;
|
String line;
|
||||||
|
|
||||||
while ((line = reader.readLine()) != null) {
|
while ((line = reader.readLine()) != null) {
|
||||||
|
@ -305,97 +399,25 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
failure.set(true);
|
failure.set(true);
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
finishedCopying.set(true);
|
int exitCode;
|
||||||
|
try {
|
||||||
|
exitCode = newProcess.exitValue();
|
||||||
|
} catch (Exception e) {
|
||||||
|
exitCode = -99999;
|
||||||
|
}
|
||||||
|
getLogger().info("Process finished with exit code {} ", new Object[] { exitCode });
|
||||||
|
// getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis",
|
||||||
|
// new Object[]{exitCode, flowFileCount, millis});
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// continue to do this loop until both the process has finished and we have finished copying
|
return future;
|
||||||
// the output from the process to the FlowFile. Unfortunately, even after calling Process.exitValue(),
|
|
||||||
// there can be data buffered on the InputStream; so we will wait until the stream is empty as well.
|
|
||||||
int flowFileCount = 0;
|
|
||||||
while (!finishedCopying.get() || isAlive(process)) {
|
|
||||||
if (!isScheduled()) {
|
|
||||||
getLogger().info("User stopped processor; will terminate process immediately");
|
|
||||||
process.destroy();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a FlowFile that we can write to and set the OutputStream for the FlowFile
|
|
||||||
// as the delegate for the ProxyOuptutStream, then wait until the process finishes
|
|
||||||
// or until the specified amount of time
|
|
||||||
FlowFile flowFile = session.create();
|
|
||||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(final OutputStream flowFileOut) throws IOException {
|
|
||||||
try (final OutputStream out = new BufferedOutputStream(flowFileOut)) {
|
|
||||||
proxyOut.setDelegate(out);
|
|
||||||
|
|
||||||
if (batchNanos == null) {
|
|
||||||
// we are not creating batches; wait until process terminates.
|
|
||||||
Integer exitCode = null;
|
|
||||||
while (exitCode == null) {
|
|
||||||
try {
|
|
||||||
exitCode = process.waitFor();
|
|
||||||
} catch (final InterruptedException ie) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// wait the allotted amount of time.
|
|
||||||
try {
|
|
||||||
TimeUnit.NANOSECONDS.sleep(batchNanos);
|
|
||||||
} catch (final InterruptedException ie) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
proxyOut.setDelegate(null); // prevent from writing to this stream
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (flowFile.getSize() == 0L) {
|
|
||||||
// If no data was written to the file, remove it
|
|
||||||
session.remove(flowFile);
|
|
||||||
} else if (failure.get()) {
|
|
||||||
// If there was a failure processing the output of the Process, remove the FlowFile
|
|
||||||
session.remove(flowFile);
|
|
||||||
getLogger().error("Failed to read data from Process, so will not generate FlowFile");
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
// All was good. Generate event and transfer FlowFile.
|
|
||||||
session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString);
|
|
||||||
getLogger().info("Created {} and routed to success", new Object[]{flowFile});
|
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
|
||||||
flowFileCount++;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Commit the session so that the FlowFile is transferred to the next processor
|
|
||||||
session.commit();
|
|
||||||
}
|
|
||||||
|
|
||||||
final int exitCode;
|
|
||||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
|
||||||
try {
|
|
||||||
exitCode = process.waitFor();
|
|
||||||
} catch (final InterruptedException ie) {
|
|
||||||
getLogger().warn("Process was interrupted before finishing");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
future.get();
|
|
||||||
} catch (final ExecutionException e) {
|
|
||||||
getLogger().error("Failed to copy output from Process to FlowFile due to {}", new Object[]{e.getCause()});
|
|
||||||
} catch (final InterruptedException ie) {
|
|
||||||
getLogger().error("Interrupted while waiting to copy data form Process to FlowFile");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", new Object[]{exitCode, flowFileCount, millis});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NB!!! Currently not used, Future<?> longRunningProcess is used to check whether process is done or not.
|
||||||
private boolean isAlive(final Process process) {
|
private boolean isAlive(final Process process) {
|
||||||
// unfortunately, java provides no straight-forward way to test if a Process is alive.
|
// unfortunately, java provides no straight-forward way to test if a Process is alive.
|
||||||
// In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7,
|
// In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7,
|
||||||
|
|
|
@ -20,11 +20,14 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestExecuteProcess {
|
public class TestExecuteProcess {
|
||||||
|
@ -58,6 +61,7 @@ public class TestExecuteProcess {
|
||||||
assertEquals("good bye", twoQuotedArg.get(1));
|
assertEquals("good bye", twoQuotedArg.get(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Ignore // won't run under Windows
|
||||||
@Test
|
@Test
|
||||||
public void testEcho() {
|
public void testEcho() {
|
||||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
|
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
|
||||||
|
@ -75,4 +79,82 @@ public class TestExecuteProcess {
|
||||||
System.out.println(new String(flowFile.toByteArray()));
|
System.out.println(new String(flowFile.toByteArray()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// @Test
|
||||||
|
public void testBigBinaryInputData() {
|
||||||
|
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
|
||||||
|
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG");
|
||||||
|
|
||||||
|
String workingDirName = "/var/test";
|
||||||
|
String testFile = "eclipse-java-luna-SR2-win32.zip";
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class);
|
||||||
|
runner.setProperty(ExecuteProcess.COMMAND, "cmd");
|
||||||
|
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, " /c type " + testFile);
|
||||||
|
runner.setProperty(ExecuteProcess.WORKING_DIR, workingDirName);
|
||||||
|
|
||||||
|
File inFile = new File(workingDirName, testFile);
|
||||||
|
System.out.println(inFile.getAbsolutePath());
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
|
||||||
|
long totalFlowFilesSize = 0;
|
||||||
|
for (final MockFlowFile flowFile : flowFiles) {
|
||||||
|
System.out.println(flowFile);
|
||||||
|
totalFlowFilesSize += flowFile.getSize();
|
||||||
|
// System.out.println(new String(flowFile.toByteArray()));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(inFile.length(), totalFlowFilesSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBigInputSplit() {
|
||||||
|
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
|
||||||
|
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG");
|
||||||
|
|
||||||
|
String workingDirName = "/var/test";
|
||||||
|
String testFile = "Novo_dicionário_da_língua_portuguesa_by_Cândido_de_Figueiredo.txt";
|
||||||
|
// String testFile = "eclipse-java-luna-SR2-win32.zip";
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class);
|
||||||
|
runner.setProperty(ExecuteProcess.COMMAND, "cmd");
|
||||||
|
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, " /c type " + testFile);
|
||||||
|
runner.setProperty(ExecuteProcess.WORKING_DIR, workingDirName);
|
||||||
|
runner.setProperty(ExecuteProcess.BATCH_DURATION, "150 millis");
|
||||||
|
|
||||||
|
File inFile = new File(workingDirName, testFile);
|
||||||
|
System.out.println(inFile.getAbsolutePath());
|
||||||
|
|
||||||
|
// runner.run(1,false,true);
|
||||||
|
|
||||||
|
ProcessContext processContext = runner.getProcessContext();
|
||||||
|
|
||||||
|
ExecuteProcess processor = (ExecuteProcess) runner.getProcessor();
|
||||||
|
processor.updateScheduledTrue();
|
||||||
|
processor.setupExecutor(processContext);
|
||||||
|
|
||||||
|
processor.onTrigger(processContext, runner.getProcessSessionFactory());
|
||||||
|
processor.onTrigger(processContext, runner.getProcessSessionFactory());
|
||||||
|
processor.onTrigger(processContext, runner.getProcessSessionFactory());
|
||||||
|
processor.onTrigger(processContext, runner.getProcessSessionFactory());
|
||||||
|
processor.onTrigger(processContext, runner.getProcessSessionFactory());
|
||||||
|
processor.onTrigger(processContext, runner.getProcessSessionFactory());
|
||||||
|
processor.onTrigger(processContext, runner.getProcessSessionFactory());
|
||||||
|
processor.onTrigger(processContext, runner.getProcessSessionFactory());
|
||||||
|
processor.onTrigger(processContext, runner.getProcessSessionFactory());
|
||||||
|
|
||||||
|
// runner.run(5,true,false);
|
||||||
|
|
||||||
|
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
|
||||||
|
long totalFlowFilesSize = 0;
|
||||||
|
for (final MockFlowFile flowFile : flowFiles) {
|
||||||
|
System.out.println(flowFile);
|
||||||
|
totalFlowFilesSize += flowFile.getSize();
|
||||||
|
// System.out.println(new String(flowFile.toByteArray()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// assertEquals(inFile.length(), totalFlowFilesSize);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue