diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java index c61a31802f..bb69c65cea 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java @@ -126,4 +126,17 @@ public interface ProcessContext { */ Set getAvailableRelationships(); + /** + * @return true if the processor has one or more incoming connections, + * false otherwise + */ + boolean hasIncomingConnection(); + + /** + * @param relationship a relationship to check for connections + * @return true if the relationship has one or more outbound connections, + * false otherwise + */ + boolean hasConnection(Relationship relationship); + } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index 8b3ed9564d..a350ecbe10 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -48,7 +48,9 @@ public class MockProcessContext extends MockControllerServiceLookup implements S private boolean yieldCalled = false; private boolean enableExpressionValidation = false; private boolean allowExpressionValidation = true; + private boolean incomingConnection = true; + private volatile Set connections = new HashSet<>(); private volatile Set unavailableRelationships = new HashSet<>(); /** @@ -288,4 +290,35 @@ public class MockProcessContext extends MockControllerServiceLookup implements S public Set getUnavailableRelationships() { return unavailableRelationships; } + + @Override + public boolean hasIncomingConnection() { + return incomingConnection; + } + + public void setIncomingConnection(final boolean hasIncomingConnection) { + this.incomingConnection = hasIncomingConnection; + } + + @Override + public boolean hasConnection(Relationship relationship) { + return this.connections.contains(relationship); + } + + public void addConnection(final Relationship relationship) { + this.connections.add(relationship); + } + + public void removeConnection(final Relationship relationship) { + this.connections.remove(relationship); + } + + public void setConnections(final Set connections) { + if (connections == null) { + this.connections = Collections.emptySet(); + } else { + this.connections = Collections.unmodifiableSet(connections); + } + } + } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 80265a74e4..eeeff61536 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -518,6 +518,31 @@ public class StandardProcessorTestRunner implements TestRunner { setRelationshipUnavailable(new Relationship.Builder().name(relationshipName).build()); } + @Override + public void setIncomingConnection(boolean hasIncomingConnection) { + context.setIncomingConnection(hasIncomingConnection); + } + + @Override + public void addConnection(Relationship relationship) { + context.addConnection(relationship); + } + + @Override + public void addConnection(String relationshipName) { + addConnection(new Relationship.Builder().name(relationshipName).build()); + } + + @Override + public void removeConnection(Relationship relationship) { + context.removeConnection(relationship); + } + + @Override + public void removeConnection(String relationshipName) { + removeConnection(new Relationship.Builder().name(relationshipName).build()); + } + @Override public void addControllerService(final String identifier, final ControllerService service) throws InitializationException { addControllerService(identifier, service, new HashMap()); diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index 6bc3442af7..6e66bfe738 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -488,6 +488,43 @@ public interface TestRunner { */ void setRelationshipUnavailable(String relationshipName); + /** + * Indicates to the framework that the configured processor has one or more + * incoming connections. + * + * @param hasIncomingConnection whether or not the configured processor has an incoming connection + */ + void setIncomingConnection(boolean hasIncomingConnection); + + /** + * Indicates to the Framework that the configured processor has a connection for the given Relationship. + * + * @param relationship that has a connection + */ + void addConnection(Relationship relationship); + + /** + * Indicates to the Framework that the configured processor has a connection for the + * Relationship with the given name. + * + * @param relationshipName name of relationship that has a connection + */ + void addConnection(String relationshipName); + + /** + * Removes the connection for the given Relationship from the configured processor. + * + * @param relationship to remove + */ + void removeConnection(Relationship relationship); + + /** + * Removes the connection for the relationship with the given name from the configured processor. + * + * @param relationshipName name of the relationship to remove + */ + void removeConnection(String relationshipName); + /** * Adds the given {@link ControllerService} to this TestRunner so that the * configured Processor can access it using the given diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java index 76d7d3d447..3238d4ac0a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java @@ -82,4 +82,16 @@ public class MockProcessContext implements ProcessContext { public Set getAvailableRelationships() { return Collections.emptySet(); } + + @Override + public boolean hasIncomingConnection() { + return true; + } + + @Override + public boolean hasConnection(Relationship relationship) { + return false; + } + + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 66967baf2b..f2a83d0bf4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -95,4 +95,5 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen * @param ignoredReferences to ignore */ public abstract void verifyCanStart(Set ignoredReferences); + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java index 7617e7c8ed..da7162e91f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java @@ -190,4 +190,17 @@ public class ConnectableProcessContext implements ProcessContext { } return new HashSet<>(connectable.getRelationships()); } + + @Override + public boolean hasIncomingConnection() { + return connectable.hasIncomingConnection(); + } + + @Override + public boolean hasConnection(Relationship relationship) { + Set connections = connectable.getConnections(relationship); + return connections != null && !connections.isEmpty(); + } + + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index d1bfacf6bd..76849bdae3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -179,4 +179,15 @@ public class StandardProcessContext implements ProcessContext, ControllerService return controllerServiceProvider.getControllerServiceName(serviceIdentifier); } + @Override + public boolean hasIncomingConnection() { + return procNode.hasIncomingConnection(); + } + + @Override + public boolean hasConnection(Relationship relationship) { + Set connections = procNode.getConnections(relationship); + return connections != null && !connections.isEmpty(); + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java index ef737287c0..27d1264516 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java @@ -111,4 +111,14 @@ public class StandardSchedulingContext implements SchedulingContext { public Set getAvailableRelationships() { return processContext.getAvailableRelationships(); } + + @Override + public boolean hasIncomingConnection() { + return processContext.hasIncomingConnection(); + } + + @Override + public boolean hasConnection(Relationship relationship) { + return processContext.hasConnection(relationship); + } } diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-nar/pom.xml b/nifi-nar-bundles/nifi-image-bundle/nifi-image-nar/pom.xml index 1c60968c1e..5527bfd289 100644 --- a/nifi-nar-bundles/nifi-image-bundle/nifi-image-nar/pom.xml +++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-nar/pom.xml @@ -32,6 +32,12 @@ nifi-image-processors 0.3.1-SNAPSHOT + + org.apache.nifi + nifi-image-viewer + 0.3.1-SNAPSHOT + war + diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/pom.xml b/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/pom.xml new file mode 100755 index 0000000000..d8b20daee3 --- /dev/null +++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/pom.xml @@ -0,0 +1,48 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-image-bundle + 0.3.1-SNAPSHOT + + nifi-image-viewer + NiFi image viewer + war + + + org.apache.nifi + nifi-api + + + javax.servlet.jsp + javax.servlet.jsp-api + + + javax.el + javax.el-api + + + javax.servlet.jsp.jstl + javax.servlet.jsp.jstl-api + + + javax.servlet + javax.servlet-api + + + diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/java/org/apache/nifi/web/ImageViewerController.java b/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/java/org/apache/nifi/web/ImageViewerController.java new file mode 100755 index 0000000000..a3ba1cd61d --- /dev/null +++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/java/org/apache/nifi/web/ImageViewerController.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web; + +import java.io.IOException; +import java.io.PrintWriter; + +import javax.servlet.ServletException; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +@WebServlet(name = "ImageViewer", urlPatterns = {"/view-content"}) +public class ImageViewerController extends HttpServlet { + + /** + * Handles generating markup for viewing an image. + * + * @param request servlet request + * @param response servlet response + * @throws ServletException if a servlet-specific error occurs + * @throws IOException if an I/O error occurs + */ + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + final ViewableContent content = (ViewableContent) request.getAttribute(ViewableContent.CONTENT_REQUEST_ATTRIBUTE); + + // handle images + if ("image/png".equals(content.getContentType()) || "image/jpeg".equals(content.getContentType()) || "image/gif".equals(content.getContentType())) { + // defer to the jsp + request.getRequestDispatcher("/WEB-INF/jsp/image.jsp").include(request, response); + } else { + final PrintWriter out = response.getWriter(); + out.println("Unexpected content type: " + content.getContentType()); + } + } +} diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/webapp/META-INF/nifi-content-viewer b/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/webapp/META-INF/nifi-content-viewer new file mode 100755 index 0000000000..71cf2a29e7 --- /dev/null +++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/webapp/META-INF/nifi-content-viewer @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +image/png +image/jpeg +image/gif \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/webapp/WEB-INF/jsp/image.jsp b/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/webapp/WEB-INF/jsp/image.jsp new file mode 100755 index 0000000000..9dc5e3c4a8 --- /dev/null +++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/webapp/WEB-INF/jsp/image.jsp @@ -0,0 +1,39 @@ +<%-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--%> +<%@ page contentType="text/html" pageEncoding="UTF-8" session="false" %> + + +
+ \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/webapp/WEB-INF/web.xml b/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/webapp/WEB-INF/web.xml new file mode 100755 index 0000000000..bccf333a82 --- /dev/null +++ b/nifi-nar-bundles/nifi-image-bundle/nifi-image-viewer/src/main/webapp/WEB-INF/web.xml @@ -0,0 +1,29 @@ + + + + nifi-image-viewer + + ImageViewer + org.apache.nifi.web.ImageViewerController + + + ImageViewer + /view-content + + + view-content + + diff --git a/nifi-nar-bundles/nifi-image-bundle/pom.xml b/nifi-nar-bundles/nifi-image-bundle/pom.xml index a254fd7910..2a5836f6bc 100644 --- a/nifi-nar-bundles/nifi-image-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-image-bundle/pom.xml @@ -29,6 +29,7 @@ nifi-image-processors nifi-image-nar + nifi-image-viewer diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java index 6eb287b57b..560ad34921 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrContentStream.java @@ -161,7 +161,7 @@ public class PutSolrContentStream extends SolrProcessor { final ObjectHolder connectionError = new ObjectHolder<>(null); final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue()); - final String collection = context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue(); final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong(); final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile)); diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java index eaa009c368..336b28721b 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java @@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.NamedList; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -37,6 +38,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; @@ -229,6 +232,27 @@ public class TestPutSolrContentStream { Assert.assertEquals(0, qResponse.getResults().getNumFound()); } + @Test + public void testCollectionExpressionLanguage() throws IOException, SolrServerException { + final String collection = "collection1"; + final CollectionVerifyingProcessor proc = new CollectionVerifyingProcessor(collection); + + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "localhost:9983"); + runner.setProperty(PutSolrContentStream.COLLECTION, "${solr.collection}"); + + final Map attributes = new HashMap<>(); + attributes.put("solr.collection", collection); + + try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { + runner.enqueue(fileIn, attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_SUCCESS, 1); + } + } + @Test public void testSolrServerExceptionShouldRouteToFailure() throws IOException, SolrServerException { final Throwable throwable = new SolrServerException("Invalid Document"); @@ -329,6 +353,36 @@ public class TestPutSolrContentStream { runner.assertValid(); } + // Override the createSolrClient method to inject a custom SolrClient. + private class CollectionVerifyingProcessor extends PutSolrContentStream { + + private SolrClient mockSolrClient; + + private final String expectedCollection; + + public CollectionVerifyingProcessor(final String expectedCollection) { + this.expectedCollection = expectedCollection; + } + + @Override + protected SolrClient createSolrClient(ProcessContext context) { + mockSolrClient = new SolrClient() { + @Override + public NamedList request(SolrRequest solrRequest, String s) throws SolrServerException, IOException { + Assert.assertEquals(expectedCollection, solrRequest.getParams().get(PutSolrContentStream.COLLECTION_PARAM_NAME)); + return new NamedList<>(); + } + + @Override + public void shutdown() { + + } + + }; + return mockSolrClient; + } + + } // Override the createSolrClient method to inject a Mock. private class ExceptionThrowingProcessor extends PutSolrContentStream { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index f3be494e76..45fd1a8732 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -50,7 +50,10 @@ import org.apache.nifi.util.StopWatch; @EventDriven @Tags({ "sql", "select", "jdbc", "query", "database" }) @CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." - + " Streaming is used so arbitrarily large result sets are supported.") + + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query.") public class ExecuteSQL extends AbstractProcessor { // Relationships @@ -116,9 +119,12 @@ public class ExecuteSQL extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final FlowFile incoming = session.get(); - if (incoming == null) { - return; + FlowFile incoming = null; + if (context.hasIncomingConnection()) { + incoming = session.get(); + if (incoming == null) { + return; + } } final ProcessorLog logger = getLogger(); @@ -133,7 +139,8 @@ public class ExecuteSQL extends AbstractProcessor { final Statement st = con.createStatement()) { st.setQueryTimeout(queryTimeout); // timeout in seconds final LongHolder nrOfRows = new LongHolder(0L); - final FlowFile outgoing = session.write(incoming, new OutputStreamCallback() { + FlowFile outgoing = (incoming == null ? session.create() : incoming); + outgoing = session.write(outgoing, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { try { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index c8f9bbe07e..e9258df743 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -641,7 +641,10 @@ public class MergeContent extends BinFiles { if (wrapper != null) { final FlowFile flowFile = wrapper.getFlowFile(); if (flowFile != null) { - property = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue().getBytes(); + final String value = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue(); + if (value != null) { + property = value.getBytes(); + } } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index 4166d94657..12a5cd4ad9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -52,7 +52,7 @@ public class TestDetectDuplicate { System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicate", "debug"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicate", "debug"); - LOGGER = LoggerFactory.getLogger(TestListenUDP.class); + LOGGER = LoggerFactory.getLogger(TestDetectDuplicate.class); } @Test diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index efa2705ef1..95a521037c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -41,6 +41,7 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.fusesource.hawtbuf.ByteArrayInputStream; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -61,32 +62,69 @@ public class TestExecuteSQL { final static String DB_LOCATION = "target/db"; + final static String QUERY_WITH_EL = "select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL" + + " where PER.ID = ${person.id}"; + + final static String QUERY_WITHOUT_EL = "select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL" + + " where PER.ID = 10"; + + @BeforeClass - public static void setup() { + public static void setupClass() { System.setProperty("derby.stream.error.file", "target/derby.log"); } + private TestRunner runner; + + @Before + public void setup() throws InitializationException { + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map dbcpProperties = new HashMap<>(); + + runner = TestRunners.newTestRunner(ExecuteSQL.class); + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp"); + } + + @Test + public void testIncomingConnectionWithNoFlowFile() throws InitializationException { + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM persons"); + runner.run(); + runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0); + runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 0); + } + + @Test + public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException { + runner.setIncomingConnection(false); + invokeOnTrigger(null, QUERY_WITHOUT_EL, false); + } + @Test public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { - invokeOnTrigger(null); + invokeOnTrigger(null, QUERY_WITH_EL, true); } @Test public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException { // Does to seem to have any effect when using embedded Derby - invokeOnTrigger(1); // 1 second max time + invokeOnTrigger(1, QUERY_WITH_EL, true); // 1 second max time } - public void invokeOnTrigger(final Integer queryTimeout) throws InitializationException, ClassNotFoundException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class); - - final DBCPService dbcp = new DBCPServiceSimpleImpl(); - final Map dbcpProperties = new HashMap<>(); - - runner.addControllerService("dbcp", dbcp, dbcpProperties); - - runner.enableControllerService(dbcp); - runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp"); + public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile) + throws InitializationException, ClassNotFoundException, SQLException, IOException { if (queryTimeout != null) { runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs"); @@ -97,27 +135,21 @@ public class TestExecuteSQL { dbLocation.delete(); // load test data to database - final Connection con = dbcp.getConnection(); + final Connection con = ((DBCPService)runner.getControllerService("dbcp")).getConnection(); TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000); LOGGER.info("test data loaded"); // ResultSet size will be 1x2000x1000 = 2 000 000 rows // because of where PER.ID = ${person.id} final int nrOfRows = 2000000; - final String query = "select " - + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" - + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" - + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" - + ", ROW_NUMBER() OVER () as rownr " - + " from persons PER, products PRD, relationships REL" - + " where PER.ID = ${person.id}"; - runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query); - // incoming FlowFile content is not used, but attributes are used - final Map attributes = new HashMap(); - attributes.put("person.id", "10"); - runner.enqueue("Hello".getBytes(), attributes); + if (incomingFlowFile) { + // incoming FlowFile content is not used, but attributes are used + final Map attributes = new HashMap(); + attributes.put("person.id", "10"); + runner.enqueue("Hello".getBytes(), attributes); + } runner.run(); runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index c53c488bb9..cd06ba060d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -256,6 +256,27 @@ public class TestMergeContent { bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); } + @Test + public void testSimpleBinaryConcatWithTextDelimitersHeaderOnly() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); + runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT); + runner.setProperty(MergeContent.HEADER, "@"); + + createFlowFiles(runner); + runner.run(); + + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + bundle.assertContentEquals("@Hello, World!".getBytes("UTF-8")); + bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); + } + @Test public void testSimpleBinaryConcatWithFileDelimiters() throws IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent());