NIFI-5834: Restore default PutHiveQL error handling behavior

NIFI-5834: Incorporated review comments

This closes #3179.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Matthew Burgess 2018-11-20 17:58:59 -05:00 committed by Koji Kawamura
parent e603c486f4
commit 455e3c1bc8
4 changed files with 179 additions and 6 deletions

View File

@ -149,10 +149,11 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
} else if (e instanceof SQLException) {
// Use the SQLException's vendor code for guidance -- see Hive's ErrorMsg class for details on error codes
int errorCode = ((SQLException) e).getErrorCode();
getLogger().debug("Error occurred during Hive operation, Hive returned error code {}", new Object[]{errorCode});
if (errorCode >= 10000 && errorCode < 20000) {
return ErrorTypes.InvalidInput;
} else if (errorCode >= 20000 && errorCode < 30000) {
return ErrorTypes.TemporalFailure;
return ErrorTypes.InvalidInput;
} else if (errorCode >= 30000 && errorCode < 40000) {
return ErrorTypes.TemporalInputFailure;
} else if (errorCode >= 40000 && errorCode < 50000) {
@ -160,7 +161,9 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
// a ProcessException, we'll route to failure via an InvalidInput error type.
return ErrorTypes.InvalidInput;
} else {
return ErrorTypes.UnknownFailure;
// Default unknown errors to TemporalFailure (as they were implemented originally), so they can be routed to failure
// or rolled back depending on the user's setting of Rollback On Failure.
return ErrorTypes.TemporalFailure;
}
} else {
return ErrorTypes.UnknownFailure;

View File

@ -719,6 +719,78 @@ public class TestPutHiveQL {
runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 0);
}
@Test
public void testUnknownFailure() throws InitializationException, ProcessException {
final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
final SQLExceptionService service = new SQLExceptionService(null);
service.setErrorCode(2);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.1.value", "1");
attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("hiveql.args.2.value", "Mark");
attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.3.value", "84");
attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
runner.run();
// should fail because there isn't a valid connection and tables don't exist.
runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 1);
}
@Test
public void testUnknownFailureRollbackOnFailure() throws InitializationException, ProcessException {
final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class);
final SQLExceptionService service = new SQLExceptionService(null);
service.setErrorCode(0);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.1.value", "1");
attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("hiveql.args.2.value", "Mark");
attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.3.value", "84");
attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
try {
runner.run();
fail("Should throw ProcessException");
} catch (AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
}
assertEquals(1, runner.getQueueSize().getObjectCount());
runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 0);
}
/**
* Simple implementation only for testing purposes
*/
@ -758,6 +830,7 @@ public class TestPutHiveQL {
private final HiveDBCPService service;
private int allowedBeforeFailure = 0;
private int successful = 0;
private int errorCode = 30000; // Default to a retryable exception code
SQLExceptionService(final HiveDBCPService service) {
this.service = service;
@ -773,8 +846,7 @@ public class TestPutHiveQL {
try {
if (++successful > allowedBeforeFailure) {
final Connection conn = Mockito.mock(Connection.class);
// Throw a retryable error
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException", "42000", 20000));
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException", "42000", errorCode));
return conn;
} else {
return service.getConnection();
@ -789,5 +861,9 @@ public class TestPutHiveQL {
public String getConnectionURL() {
return service != null ? service.getConnectionURL() : null;
}
void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
}
}

View File

@ -148,7 +148,24 @@ public class PutHive3QL extends AbstractHive3QLProcessor {
if (e instanceof SQLNonTransientException) {
return ErrorTypes.InvalidInput;
} else if (e instanceof SQLException) {
return ErrorTypes.TemporalFailure;
// Use the SQLException's vendor code for guidance -- see Hive's ErrorMsg class for details on error codes
int errorCode = ((SQLException) e).getErrorCode();
getLogger().debug("Error occurred during Hive operation, Hive returned error code {}", new Object[]{errorCode});
if (errorCode >= 10000 && errorCode < 20000) {
return ErrorTypes.InvalidInput;
} else if (errorCode >= 20000 && errorCode < 30000) {
return ErrorTypes.InvalidInput;
} else if (errorCode >= 30000 && errorCode < 40000) {
return ErrorTypes.TemporalInputFailure;
} else if (errorCode >= 40000 && errorCode < 50000) {
// These are unknown errors (to include some parse errors), but rather than generating an UnknownFailure which causes
// a ProcessException, we'll route to failure via an InvalidInput error type.
return ErrorTypes.InvalidInput;
} else {
// Default unknown errors to TemporalFailure (as they were implemented originally), so they can be routed to failure
// or rolled back depending on the user's setting of Rollback On Failure.
return ErrorTypes.TemporalFailure;
}
} else {
return ErrorTypes.UnknownFailure;
}

View File

@ -719,6 +719,78 @@ public class TestPutHive3QL {
runner.assertAllFlowFilesTransferred(PutHive3QL.REL_RETRY, 0);
}
@Test
public void testUnknownFailure() throws InitializationException, ProcessException {
final TestRunner runner = TestRunners.newTestRunner(PutHive3QL.class);
final SQLExceptionService service = new SQLExceptionService(null);
service.setErrorCode(2);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutHive3QL.HIVE_DBCP_SERVICE, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.1.value", "1");
attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("hiveql.args.2.value", "Mark");
attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.3.value", "84");
attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
runner.run();
// should fail because there isn't a valid connection and tables don't exist.
runner.assertAllFlowFilesTransferred(PutHive3QL.REL_RETRY, 1);
}
@Test
public void testUnknownFailureRollbackOnFailure() throws InitializationException, ProcessException {
final TestRunner runner = TestRunners.newTestRunner(PutHive3QL.class);
final SQLExceptionService service = new SQLExceptionService(null);
service.setErrorCode(0);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutHive3QL.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
"UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.1.value", "1");
attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR));
attributes.put("hiveql.args.2.value", "Mark");
attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.3.value", "84");
attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER));
attributes.put("hiveql.args.4.value", "1");
runner.enqueue(sql.getBytes(), attributes);
try {
runner.run();
fail("Should throw ProcessException");
} catch (AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
}
assertEquals(1, runner.getQueueSize().getObjectCount());
runner.assertAllFlowFilesTransferred(PutHive3QL.REL_RETRY, 0);
}
/**
* Simple implementation only for testing purposes
*/
@ -758,6 +830,7 @@ public class TestPutHive3QL {
private final Hive3DBCPService service;
private int allowedBeforeFailure = 0;
private int successful = 0;
private int errorCode = 30000; // Default to a retryable exception code
SQLExceptionService(final Hive3DBCPService service) {
this.service = service;
@ -773,7 +846,7 @@ public class TestPutHive3QL {
try {
if (++successful > allowedBeforeFailure) {
final Connection conn = Mockito.mock(Connection.class);
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException"));
Mockito.when(conn.prepareStatement(Mockito.any(String.class))).thenThrow(new SQLException("Unit Test Generated SQLException", "42000", errorCode));
return conn;
} else {
return service.getConnection();
@ -788,5 +861,9 @@ public class TestPutHive3QL {
public String getConnectionURL() {
return service != null ? service.getConnectionURL() : null;
}
void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
}
}