mirror of https://github.com/apache/nifi.git
Merge branch 'NIFI-829' into develop
This commit is contained in:
commit
d328ca0a48
|
@ -331,6 +331,7 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||||
private final ProcessSession session;
|
private final ProcessSession session;
|
||||||
|
|
||||||
private final long txId = txIdGenerator.incrementAndGet();
|
private final long txId = txIdGenerator.incrementAndGet();
|
||||||
|
private final long startNanos = System.nanoTime();
|
||||||
|
|
||||||
private FlowFile request;
|
private FlowFile request;
|
||||||
private FlowFile response;
|
private FlowFile response;
|
||||||
|
@ -482,7 +483,7 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||||
// and the status codes.
|
// and the status codes.
|
||||||
if (isSuccess()) {
|
if (isSuccess()) {
|
||||||
// clone the flowfile to capture the response
|
// clone the flowfile to capture the response
|
||||||
response = session.clone(request);
|
response = session.create(request);
|
||||||
|
|
||||||
// write the status attributes
|
// write the status attributes
|
||||||
response = writeStatusAttributes(response);
|
response = writeStatusAttributes(response);
|
||||||
|
@ -495,10 +496,11 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||||
// can potentially be null in edge cases
|
// can potentially be null in edge cases
|
||||||
if (is != null) {
|
if (is != null) {
|
||||||
response = session.importFrom(is, response);
|
response = session.importFrom(is, response);
|
||||||
}
|
|
||||||
|
|
||||||
// invoke provenance events
|
// emit provenance event
|
||||||
session.getProvenanceReporter().receive(response, conn.getURL().toExternalForm());
|
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||||
|
session.getProvenanceReporter().modifyContent(response, "Updated content with data received from " + conn.getURL().toExternalForm(), millis);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import static org.apache.commons.codec.binary.Base64.encodeBase64;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
|
@ -23,6 +28,7 @@ import java.nio.charset.StandardCharsets;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -32,6 +38,8 @@ import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
import org.apache.nifi.processors.standard.InvokeHTTP.Config;
|
import org.apache.nifi.processors.standard.InvokeHTTP.Config;
|
||||||
|
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||||
|
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||||
import org.apache.nifi.ssl.StandardSSLContextService;
|
import org.apache.nifi.ssl.StandardSSLContextService;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
@ -42,12 +50,6 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import static org.apache.commons.codec.binary.Base64.encodeBase64;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -217,6 +219,21 @@ public class TestInvokeHTTP {
|
||||||
bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
|
bundle1.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
|
||||||
bundle1.assertAttributeEquals("Foo", "Bar");
|
bundle1.assertAttributeEquals("Foo", "Bar");
|
||||||
bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
|
bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
|
||||||
|
|
||||||
|
final List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
|
||||||
|
assertEquals(2, provEvents.size());
|
||||||
|
boolean forkEvent = false;
|
||||||
|
boolean contentModEvent = false;
|
||||||
|
for (final ProvenanceEventRecord event : provEvents) {
|
||||||
|
if (event.getEventType() == ProvenanceEventType.FORK) {
|
||||||
|
forkEvent = true;
|
||||||
|
} else if (event.getEventType() == ProvenanceEventType.CONTENT_MODIFIED) {
|
||||||
|
contentModEvent = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(forkEvent);
|
||||||
|
assertTrue(contentModEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue