NIFI-4319 - Fixed ArrayIndexOutOfBoundsException in QueryCassandra

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2112
This commit is contained in:
Pierre Villard 2017-08-26 15:51:12 +02:00 committed by Matthew Burgess
parent 3de0b8edff
commit a53a37f9ca
3 changed files with 58 additions and 2 deletions

View File

@ -40,6 +40,7 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -263,6 +264,10 @@ public class QueryCassandra extends AbstractCassandraProcessor {
// set attribute how many rows were selected // set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
// set mime.type based on output format
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(),
JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary");
logger.info("{} contains {} Avro records; transferring to 'success'", logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()}); new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows",
@ -510,7 +515,7 @@ public class QueryCassandra extends AbstractCassandraProcessor {
final int nrOfColumns = (columnDefinitions == null ? 0 : columnDefinitions.size()); final int nrOfColumns = (columnDefinitions == null ? 0 : columnDefinitions.size());
String tableName = "NiFi_Cassandra_Query_Record"; String tableName = "NiFi_Cassandra_Query_Record";
if (nrOfColumns > 0) { if (nrOfColumns > 0) {
String tableNameFromMeta = columnDefinitions.getTable(1); String tableNameFromMeta = columnDefinitions.getTable(0);
if (!StringUtils.isBlank(tableNameFromMeta)) { if (!StringUtils.isBlank(tableNameFromMeta)) {
tableName = tableNameFromMeta; tableName = tableNameFromMeta;
} }

View File

@ -61,7 +61,7 @@ public class CassandraQueryTestUtil {
} }
}); });
when(columnDefinitions.getTable(1)).thenReturn("users"); when(columnDefinitions.getTable(0)).thenReturn("users");
when(columnDefinitions.getType(anyInt())).thenAnswer(new Answer<DataType>() { when(columnDefinitions.getType(anyInt())).thenAnswer(new Answer<DataType>() {
@ -103,6 +103,43 @@ public class CassandraQueryTestUtil {
return resultSet; return resultSet;
} }
public static ResultSet createMockResultSetOneColumn() throws Exception {
ResultSet resultSet = mock(ResultSet.class);
ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class);
when(columnDefinitions.size()).thenReturn(1);
when(columnDefinitions.getName(anyInt())).thenAnswer(new Answer<String>() {
List<String> colNames = Arrays.asList("user_id");
@Override
public String answer(InvocationOnMock invocationOnMock) throws Throwable {
return colNames.get((Integer) invocationOnMock.getArguments()[0]);
}
});
when(columnDefinitions.getTable(0)).thenReturn("users");
when(columnDefinitions.getType(anyInt())).thenAnswer(new Answer<DataType>() {
List<DataType> dataTypes = Arrays.asList(DataType.text());
@Override
public DataType answer(InvocationOnMock invocationOnMock) throws Throwable {
return dataTypes.get((Integer) invocationOnMock.getArguments()[0]);
}
});
List<Row> rows = Arrays.asList(
createRow("user1"),
createRow("user2")
);
when(resultSet.iterator()).thenReturn(rows.iterator());
when(resultSet.all()).thenReturn(rows);
when(resultSet.getAvailableWithoutFetching()).thenReturn(rows.size());
when(resultSet.isFullyFetched()).thenReturn(false).thenReturn(true);
when(resultSet.getColumnDefinitions()).thenReturn(columnDefinitions);
return resultSet;
}
public static Row createRow(String user_id, String first_name, String last_name, Set<String> emails, public static Row createRow(String user_id, String first_name, String last_name, Set<String> emails,
List<String> top_places, Map<Date, String> todo, boolean registered, List<String> top_places, Map<Date, String> todo, boolean registered,
float scale, double metric) { float scale, double metric) {
@ -119,4 +156,10 @@ public class CassandraQueryTestUtil {
return row; return row;
} }
public static Row createRow(String user_id) {
Row row = mock(Row.class);
when(row.getString(0)).thenReturn(user_id);
return row;
}
} }

View File

@ -240,6 +240,14 @@ public class QueryCassandraTest {
testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_FAILURE, 1); testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_FAILURE, 1);
} }
@Test
public void testCreateSchemaOneColumn() throws Exception {
ResultSet rs = CassandraQueryTestUtil.createMockResultSetOneColumn();
Schema schema = QueryCassandra.createSchema(rs);
assertNotNull(schema);
assertEquals(schema.getName(), "users");
}
@Test @Test
public void testCreateSchema() throws Exception { public void testCreateSchema() throws Exception {
ResultSet rs = CassandraQueryTestUtil.createMockResultSet(); ResultSet rs = CassandraQueryTestUtil.createMockResultSet();