NIFI-972 attribute to indicate rows count and cleanup

Signed-off-by: Toivo Adams <toivo.adams@gmail.com>
Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Toivo Adams 2015-10-23 12:44:27 +03:00 committed by Mark Payne
parent ba3225fe92
commit a9e5325047
2 changed files with 8 additions and 14 deletions

View File

@ -53,9 +53,13 @@ import org.apache.nifi.util.StopWatch;
+ " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " +
"a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " +
"If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " +
"select query.")
"select query. " +
"FlowFile attribute 'executesql.row.count' indicates how many rows were selected."
)
public class ExecuteSQL extends AbstractProcessor {
public static final String RESULT_ROW_COUNT = "executesql.row.count";
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -153,6 +157,9 @@ public class ExecuteSQL extends AbstractProcessor {
}
});
// set attribute how many rows were selected
outgoing = session.putAttribute(outgoing, RESULT_ROW_COUNT, nrOfRows.get().toString());
logger.info("{} contains {} Avro records", new Object[] { nrOfRows.get() });
logger.info("Transferred {} to 'success'", new Object[] { outgoing });
session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));

View File

@ -134,53 +134,41 @@ public class JdbcCommon {
case NCHAR:
case NVARCHAR:
case VARCHAR:
// builder.name(meta.getColumnName(i)).type().stringType().noDefault();
// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
// builder.name(meta.getColumnName(i)).type().stringType().stringDefault(null);
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
case BOOLEAN:
// builder.name(meta.getColumnName(i)).type().nullable().booleanType().noDefault();
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
break;
case INTEGER:
case SMALLINT:
case TINYINT:
// builder.name(meta.getColumnName(i)).type().intType().noDefault();
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
break;
case BIGINT:
// builder.name(meta.getColumnName(i)).type().nullable().longType().noDefault();
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
break;
// java.sql.RowId is interface, is seems to be database
// implementation specific, let's convert to String
case ROWID:
// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
case FLOAT:
case REAL:
// builder.name(meta.getColumnName(i)).type().nullable().floatType().noDefault();
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
break;
case DOUBLE:
// builder.name(meta.getColumnName(i)).type().nullable().doubleType().noDefault();
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
break;
// Did not find direct suitable type, need to be clarified!!!!
case DECIMAL:
case NUMERIC:
// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
@ -188,7 +176,6 @@ public class JdbcCommon {
case DATE:
case TIME:
case TIMESTAMP:
// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;