NIFI-5744: Put exception message to attribute while ExecuteSQL fail

This closes #3107.

Signed-off-by: Peter Wicks <patricker@gmail.com>
This commit is contained in:
yjhyjhyjh0 2018-10-24 21:22:06 +08:00 committed by Peter Wicks
parent 7bcf9fcb5d
commit 3c7012ffda
No known key found for this signature in database
GPG Key ID: 79ABE9BA9C7AB3CD
5 changed files with 55 additions and 1 deletions

View File

@ -61,6 +61,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
public static final String RESULTSET_INDEX = "executesql.resultset.index";
public static final String RESULT_ERROR_MESSAGE = "executesql.error.message";
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
@ -402,6 +403,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
new Object[]{selectQuery, e});
context.yield();
}
session.putAttribute(fileToProcess,RESULT_ERROR_MESSAGE,e.getMessage());
session.transfer(fileToProcess, REL_FAILURE);
}
}

View File

@ -87,6 +87,8 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
+ "If 'Max Rows Per Flow File' is set, then this number will reflect only the fetch time for the rows in the Flow File instead of the entire result set."),
@WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, "
+ "the zero based index of this result set."),
@WritesAttribute(attribute = "executesql.error.message", description = "If processing an incoming flow file causes "
+ "an Exception, the Flow File is routed to failure and this attribute is set to the exception message."),
@WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "

View File

@ -79,6 +79,8 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
@WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),
@WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, "
+ "the zero based index of this result set."),
@WritesAttribute(attribute = "executesql.error.message", description = "If processing an incoming flow file causes "
+ "an Exception, the Flow File is routed to failure and this attribute is set to the exception message."),
@WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "

View File

@ -52,6 +52,7 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -463,7 +464,7 @@ public class TestExecuteSQL {
ResultSet rs = mock(ResultSet.class);
when(statement.getResultSet()).thenReturn(rs);
// Throw an exception the first time you access the ResultSet, this is after the flow file to hold the results has been created.
when(rs.getMetaData()).thenThrow(SQLException.class);
when(rs.getMetaData()).thenThrow(new SQLException("test execute statement failed"));
runner.addControllerService("mockdbcp", dbcp, new HashMap<>());
runner.enableControllerService(dbcp);
@ -475,6 +476,10 @@ public class TestExecuteSQL {
runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 1);
runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0);
// Assert exception message has been put to flow file attribute
MockFlowFile failedFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0);
Assert.assertEquals("java.sql.SQLException: test execute statement failed",failedFlowFile.getAttribute(ExecuteSQL.RESULT_ERROR_MESSAGE));
}
public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final Map<String,String> attrs, final boolean setQueryProperty)

View File

@ -28,6 +28,7 @@ import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -38,6 +39,8 @@ import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
@ -45,6 +48,10 @@ import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestExecuteSQLRecord {
@ -350,6 +357,42 @@ public class TestExecuteSQLRecord {
assertEquals(durationTime, fetchTime + executionTime);
}
@SuppressWarnings("unchecked")
@Test
public void testWithSqlExceptionErrorProcessingResultSet() throws Exception {
DBCPService dbcp = mock(DBCPService.class);
Connection conn = mock(Connection.class);
when(dbcp.getConnection(any(Map.class))).thenReturn(conn);
when(dbcp.getIdentifier()).thenReturn("mockdbcp");
PreparedStatement statement = mock(PreparedStatement.class);
when(conn.prepareStatement(anyString())).thenReturn(statement);
when(statement.execute()).thenReturn(true);
ResultSet rs = mock(ResultSet.class);
when(statement.getResultSet()).thenReturn(rs);
// Throw an exception the first time you access the ResultSet, this is after the flow file to hold the results has been created.
when(rs.getMetaData()).thenThrow(new SQLException("test execute statement failed"));
runner.addControllerService("mockdbcp", dbcp, new HashMap<>());
runner.enableControllerService(dbcp);
runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "mockdbcp");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
runner.enableControllerService(recordWriter);
runner.setIncomingConnection(true);
runner.enqueue("SELECT 1");
runner.run();
runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 1);
runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0);
// Assert exception message has been put to flow file attribute
MockFlowFile failedFlowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_FAILURE).get(0);
Assert.assertEquals("java.sql.SQLException: test execute statement failed", failedFlowFile.getAttribute(AbstractExecuteSQL.RESULT_ERROR_MESSAGE));
}
@Test
public void testPreQuery() throws Exception {
// remove previous test database, if any