Merge pull request #3224 from mattyb149/NIFI-5904

NIFI-5904: Fix PutHive3Streaming handling of RecordReaderFactoryException
This commit is contained in:
Pierre Villard 2018-12-18 09:50:25 +01:00 committed by GitHub
commit c52f880cc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 55 additions and 8 deletions

View File

@ -435,7 +435,11 @@ public class PutHive3Streaming extends AbstractProcessor {
throw new ShouldRetryException(te.getLocalizedMessage(), te); throw new ShouldRetryException(te.getLocalizedMessage(), te);
} }
} catch (RecordReaderFactoryException rrfe) { } catch (RecordReaderFactoryException rrfe) {
throw new ProcessException(rrfe); if (rollbackOnFailure) {
throw new ProcessException(rrfe);
} else {
session.transfer(flowFile, REL_FAILURE);
}
} }
} catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) { } catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) {
if (rollbackOnFailure) { if (rollbackOnFailure) {

View File

@ -54,8 +54,10 @@ import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MapRecord;
@ -152,18 +154,26 @@ public class TestPutHive3Streaming {
} }
private void configure(final PutHive3Streaming processor, final int numUsers) throws InitializationException { private void configure(final PutHive3Streaming processor, final int numUsers) throws InitializationException {
configure(processor, numUsers, -1); configure(processor, numUsers, false, -1);
} }
private void configure(final PutHive3Streaming processor, final int numUsers, int failAfter) throws InitializationException { private void configure(final PutHive3Streaming processor, final int numUsers, boolean failOnCreateReader, int failAfter) throws InitializationException {
configure(processor, numUsers, failAfter, null); configure(processor, numUsers, failOnCreateReader, failAfter, null);
} }
private void configure(final PutHive3Streaming processor, final int numUsers, final int failAfter, private void configure(final PutHive3Streaming processor, final int numUsers, final boolean failOnCreateReader, final int failAfter,
final BiFunction<Integer, MockRecordParser, Void> recordGenerator) throws InitializationException { final BiFunction<Integer, MockRecordParser, Void> recordGenerator) throws InitializationException {
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH); runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
MockRecordParser readerFactory = new MockRecordParser(); MockRecordParser readerFactory = new MockRecordParser() {
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
if (failOnCreateReader) {
throw new SchemaNotFoundException("test");
}
return super.createRecordReader(variables, in, logger);
}
};
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) { for (final RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
@ -344,7 +354,7 @@ public class TestPutHive3Streaming {
@Test @Test
public void onTriggerBadInput() throws Exception { public void onTriggerBadInput() throws Exception {
configure(processor, 1, 0); configure(processor, 1, false, 0);
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default"); runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
@ -356,7 +366,7 @@ public class TestPutHive3Streaming {
@Test @Test
public void onTriggerBadInputRollbackOnFailure() throws Exception { public void onTriggerBadInputRollbackOnFailure() throws Exception {
configure(processor, 1, 0); configure(processor, 1, false, 0);
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default"); runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
@ -375,6 +385,39 @@ public class TestPutHive3Streaming {
assertEquals(1, runner.getQueueSize().getObjectCount()); assertEquals(1, runner.getQueueSize().getObjectCount());
} }
@Test
public void onTriggerBadCreate() throws Exception {
configure(processor, 1, true, 0);
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1);
}
@Test
public void onTriggerBadCreateRollbackOnFailure() throws Exception {
configure(processor, 1, true, 0);
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true");
runner.enqueue(new byte[0]);
try {
runner.run();
fail("ProcessException should be thrown");
} catch (AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
}
runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0);
// Assert incoming FlowFile stays in input queue.
assertEquals(1, runner.getQueueSize().getObjectCount());
}
@Test @Test
public void onTriggerMultipleRecordsSingleTransaction() throws Exception { public void onTriggerMultipleRecordsSingleTransaction() throws Exception {