This commit is contained in:
Mark Payne 2015-09-23 15:31:19 -04:00
commit 26f80095b7
23 changed files with 498 additions and 34 deletions

View File

@ -126,4 +126,17 @@ public interface ProcessContext {
*/ */
Set<Relationship> getAvailableRelationships(); Set<Relationship> getAvailableRelationships();
/**
* @return true if the processor has one or more incoming connections,
* false otherwise
*/
boolean hasIncomingConnection();
/**
* @param relationship a relationship to check for connections
* @return true if the relationship has one or more outbound connections,
* false otherwise
*/
boolean hasConnection(Relationship relationship);
} }

View File

@ -48,7 +48,9 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
private boolean yieldCalled = false; private boolean yieldCalled = false;
private boolean enableExpressionValidation = false; private boolean enableExpressionValidation = false;
private boolean allowExpressionValidation = true; private boolean allowExpressionValidation = true;
private boolean incomingConnection = true;
private volatile Set<Relationship> connections = new HashSet<>();
private volatile Set<Relationship> unavailableRelationships = new HashSet<>(); private volatile Set<Relationship> unavailableRelationships = new HashSet<>();
/** /**
@ -288,4 +290,35 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
public Set<Relationship> getUnavailableRelationships() { public Set<Relationship> getUnavailableRelationships() {
return unavailableRelationships; return unavailableRelationships;
} }
@Override
public boolean hasIncomingConnection() {
return incomingConnection;
}
public void setIncomingConnection(final boolean hasIncomingConnection) {
this.incomingConnection = hasIncomingConnection;
}
@Override
public boolean hasConnection(Relationship relationship) {
return this.connections.contains(relationship);
}
public void addConnection(final Relationship relationship) {
this.connections.add(relationship);
}
public void removeConnection(final Relationship relationship) {
this.connections.remove(relationship);
}
public void setConnections(final Set<Relationship> connections) {
if (connections == null) {
this.connections = Collections.emptySet();
} else {
this.connections = Collections.unmodifiableSet(connections);
}
}
} }

View File

@ -518,6 +518,31 @@ public class StandardProcessorTestRunner implements TestRunner {
setRelationshipUnavailable(new Relationship.Builder().name(relationshipName).build()); setRelationshipUnavailable(new Relationship.Builder().name(relationshipName).build());
} }
@Override
public void setIncomingConnection(boolean hasIncomingConnection) {
context.setIncomingConnection(hasIncomingConnection);
}
@Override
public void addConnection(Relationship relationship) {
context.addConnection(relationship);
}
@Override
public void addConnection(String relationshipName) {
addConnection(new Relationship.Builder().name(relationshipName).build());
}
@Override
public void removeConnection(Relationship relationship) {
context.removeConnection(relationship);
}
@Override
public void removeConnection(String relationshipName) {
removeConnection(new Relationship.Builder().name(relationshipName).build());
}
@Override @Override
public void addControllerService(final String identifier, final ControllerService service) throws InitializationException { public void addControllerService(final String identifier, final ControllerService service) throws InitializationException {
addControllerService(identifier, service, new HashMap<String, String>()); addControllerService(identifier, service, new HashMap<String, String>());

View File

@ -488,6 +488,43 @@ public interface TestRunner {
*/ */
void setRelationshipUnavailable(String relationshipName); void setRelationshipUnavailable(String relationshipName);
/**
* Indicates to the framework that the configured processor has one or more
* incoming connections.
*
* @param hasIncomingConnection whether or not the configured processor has an incoming connection
*/
void setIncomingConnection(boolean hasIncomingConnection);
/**
* Indicates to the Framework that the configured processor has a connection for the given Relationship.
*
* @param relationship that has a connection
*/
void addConnection(Relationship relationship);
/**
* Indicates to the Framework that the configured processor has a connection for the
* Relationship with the given name.
*
* @param relationshipName name of relationship that has a connection
*/
void addConnection(String relationshipName);
/**
* Removes the connection for the given Relationship from the configured processor.
*
* @param relationship to remove
*/
void removeConnection(Relationship relationship);
/**
* Removes the connection for the relationship with the given name from the configured processor.
*
* @param relationshipName name of the relationship to remove
*/
void removeConnection(String relationshipName);
/** /**
* Adds the given {@link ControllerService} to this TestRunner so that the * Adds the given {@link ControllerService} to this TestRunner so that the
* configured Processor can access it using the given * configured Processor can access it using the given

View File

@ -82,4 +82,16 @@ public class MockProcessContext implements ProcessContext {
public Set<Relationship> getAvailableRelationships() { public Set<Relationship> getAvailableRelationships() {
return Collections.emptySet(); return Collections.emptySet();
} }
@Override
public boolean hasIncomingConnection() {
return true;
}
@Override
public boolean hasConnection(Relationship relationship) {
return false;
}
} }

View File

@ -95,4 +95,5 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
* @param ignoredReferences to ignore * @param ignoredReferences to ignore
*/ */
public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences); public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
} }

View File

@ -190,4 +190,17 @@ public class ConnectableProcessContext implements ProcessContext {
} }
return new HashSet<>(connectable.getRelationships()); return new HashSet<>(connectable.getRelationships());
} }
@Override
public boolean hasIncomingConnection() {
return connectable.hasIncomingConnection();
}
@Override
public boolean hasConnection(Relationship relationship) {
Set<Connection> connections = connectable.getConnections(relationship);
return connections != null && !connections.isEmpty();
}
} }

View File

@ -179,4 +179,15 @@ public class StandardProcessContext implements ProcessContext, ControllerService
return controllerServiceProvider.getControllerServiceName(serviceIdentifier); return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
} }
@Override
public boolean hasIncomingConnection() {
return procNode.hasIncomingConnection();
}
@Override
public boolean hasConnection(Relationship relationship) {
Set<Connection> connections = procNode.getConnections(relationship);
return connections != null && !connections.isEmpty();
}
} }

View File

@ -111,4 +111,14 @@ public class StandardSchedulingContext implements SchedulingContext {
public Set<Relationship> getAvailableRelationships() { public Set<Relationship> getAvailableRelationships() {
return processContext.getAvailableRelationships(); return processContext.getAvailableRelationships();
} }
@Override
public boolean hasIncomingConnection() {
return processContext.hasIncomingConnection();
}
@Override
public boolean hasConnection(Relationship relationship) {
return processContext.hasConnection(relationship);
}
} }

View File

@ -32,6 +32,12 @@
<artifactId>nifi-image-processors</artifactId> <artifactId>nifi-image-processors</artifactId>
<version>0.3.1-SNAPSHOT</version> <version>0.3.1-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-image-viewer</artifactId>
<version>0.3.1-SNAPSHOT</version>
<type>war</type>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-image-bundle</artifactId>
<version>0.3.1-SNAPSHOT</version>
</parent>
<artifactId>nifi-image-viewer</artifactId>
<description>NiFi image viewer</description>
<packaging>war</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>javax.servlet.jsp-api</artifactId>
</dependency>
<dependency>
<groupId>javax.el</groupId>
<artifactId>javax.el-api</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet.jsp.jstl</groupId>
<artifactId>javax.servlet.jsp.jstl-api</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web;
import java.io.IOException;
import java.io.PrintWriter;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@WebServlet(name = "ImageViewer", urlPatterns = {"/view-content"})
public class ImageViewerController extends HttpServlet {
/**
* Handles generating markup for viewing an image.
*
* @param request servlet request
* @param response servlet response
* @throws ServletException if a servlet-specific error occurs
* @throws IOException if an I/O error occurs
*/
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
final ViewableContent content = (ViewableContent) request.getAttribute(ViewableContent.CONTENT_REQUEST_ATTRIBUTE);
// handle images
if ("image/png".equals(content.getContentType()) || "image/jpeg".equals(content.getContentType()) || "image/gif".equals(content.getContentType())) {
// defer to the jsp
request.getRequestDispatcher("/WEB-INF/jsp/image.jsp").include(request, response);
} else {
final PrintWriter out = response.getWriter();
out.println("Unexpected content type: " + content.getContentType());
}
}
}

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
image/png
image/jpeg
image/gif

View File

@ -0,0 +1,39 @@
<%--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
--%>
<%@ page contentType="text/html" pageEncoding="UTF-8" session="false" %>
<script type="text/javascript" src="../nifi/js/jquery/jquery-2.1.1.min.js"></script>
<style>
#image-holder {
position: absolute;
right: 50px;
bottom: 50px;
left: 100px;
top: 100px;
border: 1px solid #aaa;
overflow: auto;
background-color: #fff;
padding: 4px;
}
</style>
<div id="image-holder"></div>
<script type="text/javascript">
$(document).ready(function() {
var ref = $('#ref').text();
var imgElement = $('<img/>').attr('src', ref);
$('#image-holder').append(imgElement);
});
</script>

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<web-app version="3.0" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd">
<display-name>nifi-image-viewer</display-name>
<servlet>
<servlet-name>ImageViewer</servlet-name>
<servlet-class>org.apache.nifi.web.ImageViewerController</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>ImageViewer</servlet-name>
<url-pattern>/view-content</url-pattern>
</servlet-mapping>
<welcome-file-list>
<welcome-file>view-content</welcome-file>
</welcome-file-list>
</web-app>

View File

@ -29,6 +29,7 @@
<modules> <modules>
<module>nifi-image-processors</module> <module>nifi-image-processors</module>
<module>nifi-image-nar</module> <module>nifi-image-nar</module>
<module>nifi-image-viewer</module>
</modules> </modules>
</project> </project>

View File

@ -161,7 +161,7 @@ public class PutSolrContentStream extends SolrProcessor {
final ObjectHolder<Exception> connectionError = new ObjectHolder<>(null); final ObjectHolder<Exception> connectionError = new ObjectHolder<>(null);
final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue()); final boolean isSolrCloud = SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue());
final String collection = context.getProperty(COLLECTION_PARAM_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong(); final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile)); final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile));

View File

@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.NamedList;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -37,6 +38,8 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Mockito.any; import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq; import static org.mockito.Mockito.eq;
@ -229,6 +232,27 @@ public class TestPutSolrContentStream {
Assert.assertEquals(0, qResponse.getResults().getNumFound()); Assert.assertEquals(0, qResponse.getResults().getNumFound());
} }
@Test
public void testCollectionExpressionLanguage() throws IOException, SolrServerException {
final String collection = "collection1";
final CollectionVerifyingProcessor proc = new CollectionVerifyingProcessor(collection);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutSolrContentStream.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue());
runner.setProperty(PutSolrContentStream.SOLR_LOCATION, "localhost:9983");
runner.setProperty(PutSolrContentStream.COLLECTION, "${solr.collection}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("solr.collection", collection);
try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) {
runner.enqueue(fileIn, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutSolrContentStream.REL_SUCCESS, 1);
}
}
@Test @Test
public void testSolrServerExceptionShouldRouteToFailure() throws IOException, SolrServerException { public void testSolrServerExceptionShouldRouteToFailure() throws IOException, SolrServerException {
final Throwable throwable = new SolrServerException("Invalid Document"); final Throwable throwable = new SolrServerException("Invalid Document");
@ -329,6 +353,36 @@ public class TestPutSolrContentStream {
runner.assertValid(); runner.assertValid();
} }
// Override the createSolrClient method to inject a custom SolrClient.
private class CollectionVerifyingProcessor extends PutSolrContentStream {
private SolrClient mockSolrClient;
private final String expectedCollection;
public CollectionVerifyingProcessor(final String expectedCollection) {
this.expectedCollection = expectedCollection;
}
@Override
protected SolrClient createSolrClient(ProcessContext context) {
mockSolrClient = new SolrClient() {
@Override
public NamedList<Object> request(SolrRequest solrRequest, String s) throws SolrServerException, IOException {
Assert.assertEquals(expectedCollection, solrRequest.getParams().get(PutSolrContentStream.COLLECTION_PARAM_NAME));
return new NamedList<>();
}
@Override
public void shutdown() {
}
};
return mockSolrClient;
}
}
// Override the createSolrClient method to inject a Mock. // Override the createSolrClient method to inject a Mock.
private class ExceptionThrowingProcessor extends PutSolrContentStream { private class ExceptionThrowingProcessor extends PutSolrContentStream {

View File

@ -50,7 +50,10 @@ import org.apache.nifi.util.StopWatch;
@EventDriven @EventDriven
@Tags({ "sql", "select", "jdbc", "query", "database" }) @Tags({ "sql", "select", "jdbc", "query", "database" })
@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." @CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format."
+ " Streaming is used so arbitrarily large result sets are supported.") + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " +
"a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " +
"If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " +
"select query.")
public class ExecuteSQL extends AbstractProcessor { public class ExecuteSQL extends AbstractProcessor {
// Relationships // Relationships
@ -116,9 +119,12 @@ public class ExecuteSQL extends AbstractProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile incoming = session.get(); FlowFile incoming = null;
if (incoming == null) { if (context.hasIncomingConnection()) {
return; incoming = session.get();
if (incoming == null) {
return;
}
} }
final ProcessorLog logger = getLogger(); final ProcessorLog logger = getLogger();
@ -133,7 +139,8 @@ public class ExecuteSQL extends AbstractProcessor {
final Statement st = con.createStatement()) { final Statement st = con.createStatement()) {
st.setQueryTimeout(queryTimeout); // timeout in seconds st.setQueryTimeout(queryTimeout); // timeout in seconds
final LongHolder nrOfRows = new LongHolder(0L); final LongHolder nrOfRows = new LongHolder(0L);
final FlowFile outgoing = session.write(incoming, new OutputStreamCallback() { FlowFile outgoing = (incoming == null ? session.create() : incoming);
outgoing = session.write(outgoing, new OutputStreamCallback() {
@Override @Override
public void process(final OutputStream out) throws IOException { public void process(final OutputStream out) throws IOException {
try { try {

View File

@ -641,7 +641,10 @@ public class MergeContent extends BinFiles {
if (wrapper != null) { if (wrapper != null) {
final FlowFile flowFile = wrapper.getFlowFile(); final FlowFile flowFile = wrapper.getFlowFile();
if (flowFile != null) { if (flowFile != null) {
property = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue().getBytes(); final String value = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
if (value != null) {
property = value.getBytes();
}
} }
} }
} }

View File

@ -52,7 +52,7 @@ public class TestDetectDuplicate {
System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicate", "debug"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicate", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicate", "debug"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicate", "debug");
LOGGER = LoggerFactory.getLogger(TestListenUDP.class); LOGGER = LoggerFactory.getLogger(TestDetectDuplicate.class);
} }
@Test @Test

View File

@ -41,6 +41,7 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.fusesource.hawtbuf.ByteArrayInputStream; import org.fusesource.hawtbuf.ByteArrayInputStream;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -61,32 +62,69 @@ public class TestExecuteSQL {
final static String DB_LOCATION = "target/db"; final static String DB_LOCATION = "target/db";
final static String QUERY_WITH_EL = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL"
+ " where PER.ID = ${person.id}";
final static String QUERY_WITHOUT_EL = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL"
+ " where PER.ID = 10";
@BeforeClass @BeforeClass
public static void setup() { public static void setupClass() {
System.setProperty("derby.stream.error.file", "target/derby.log"); System.setProperty("derby.stream.error.file", "target/derby.log");
} }
private TestRunner runner;
@Before
public void setup() throws InitializationException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(ExecuteSQL.class);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
}
@Test
public void testIncomingConnectionWithNoFlowFile() throws InitializationException {
runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM persons");
runner.run();
runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0);
runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 0);
}
@Test
public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException {
runner.setIncomingConnection(false);
invokeOnTrigger(null, QUERY_WITHOUT_EL, false);
}
@Test @Test
public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(null); invokeOnTrigger(null, QUERY_WITH_EL, true);
} }
@Test @Test
public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException { public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException {
// Does to seem to have any effect when using embedded Derby // Does to seem to have any effect when using embedded Derby
invokeOnTrigger(1); // 1 second max time invokeOnTrigger(1, QUERY_WITH_EL, true); // 1 second max time
} }
public void invokeOnTrigger(final Integer queryTimeout) throws InitializationException, ClassNotFoundException, SQLException, IOException { public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile)
final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class); throws InitializationException, ClassNotFoundException, SQLException, IOException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
if (queryTimeout != null) { if (queryTimeout != null) {
runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs"); runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs");
@ -97,27 +135,21 @@ public class TestExecuteSQL {
dbLocation.delete(); dbLocation.delete();
// load test data to database // load test data to database
final Connection con = dbcp.getConnection(); final Connection con = ((DBCPService)runner.getControllerService("dbcp")).getConnection();
TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000); TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000);
LOGGER.info("test data loaded"); LOGGER.info("test data loaded");
// ResultSet size will be 1x2000x1000 = 2 000 000 rows // ResultSet size will be 1x2000x1000 = 2 000 000 rows
// because of where PER.ID = ${person.id} // because of where PER.ID = ${person.id}
final int nrOfRows = 2000000; final int nrOfRows = 2000000;
final String query = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL"
+ " where PER.ID = ${person.id}";
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query); runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
// incoming FlowFile content is not used, but attributes are used if (incomingFlowFile) {
final Map<String, String> attributes = new HashMap<String, String>(); // incoming FlowFile content is not used, but attributes are used
attributes.put("person.id", "10"); final Map<String, String> attributes = new HashMap<String, String>();
runner.enqueue("Hello".getBytes(), attributes); attributes.put("person.id", "10");
runner.enqueue("Hello".getBytes(), attributes);
}
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);

View File

@ -256,6 +256,27 @@ public class TestMergeContent {
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
} }
@Test
public void testSimpleBinaryConcatWithTextDelimitersHeaderOnly() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT);
runner.setProperty(MergeContent.HEADER, "@");
createFlowFiles(runner);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertContentEquals("@Hello, World!".getBytes("UTF-8"));
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
}
@Test @Test
public void testSimpleBinaryConcatWithFileDelimiters() throws IOException, InterruptedException { public void testSimpleBinaryConcatWithFileDelimiters() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); final TestRunner runner = TestRunners.newTestRunner(new MergeContent());