Revert "NIFI-11590: Missing Enum data type handling in FlowFileTable"

This reverts commit 4e304ac586.
This commit is contained in:
Peter Turcsanyi 2023-05-26 11:07:45 +02:00
parent 4e304ac586
commit de62c6261f
4 changed files with 1 additions and 337 deletions

View File

@ -1,167 +0,0 @@
package org.apache.nifi.processors.helloworld;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@Tags("state")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Stateful(scopes = Scope.CLUSTER, description = "")
public class HelloStateProcessor extends AbstractProcessor {
private static final String COUNTER_KEY = "counter";
private static final String TIMESTAMP_KEY = "timestamp";
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected void init(ProcessorInitializationContext context) {
getLogger().info("init");
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
getLogger().info("customValidate");
return Collections.emptyList();
}
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
getLogger().info("onScheduled");
if (getNodeTypeProvider().isPrimary()) {
final StateManager stateManager = context.getStateManager();
final StateMap state = stateManager.getState(Scope.CLUSTER);
if (!state.getStateVersion().isPresent()) {
stateManager.setState(new HashMap<>(), Scope.CLUSTER);
}
}
}
@OnUnscheduled
public void onUnscheduled() {
getLogger().info("onUnscheduled");
}
@OnStopped
public void onStopped() {
getLogger().info("onStopped");
}
@OnShutdown
public void onShutdown() {
getLogger().info("onShutdown");
}
// public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
// try {
// getLogger().info("onTrigger");
//
// FlowFile flowFile = session.get();
// if (flowFile == null) {
// getLogger().info("Null FlowFile");
// return;
// }
//
// StateMap oldState = session.getState(Scope.CLUSTER);
//
// Map<String, String> stateMap = new HashMap<>(oldState.toMap());
//
// String counterStr = stateMap.get(COUNTER_KEY);
//
// int counter = counterStr != null ? Integer.parseInt(counterStr) : 0;
// counter++;
// stateMap.put(COUNTER_KEY, Integer.toString(counter));
//
// stateMap.put(TIMESTAMP_KEY, LocalDateTime.now().toString());
//
// boolean success = session.replaceState(oldState, stateMap, Scope.CLUSTER); // reread state
//
// if (success) {
// session.transfer(flowFile, REL_SUCCESS);
// } else {
// session.transfer(flowFile, REL_FAILURE);
// }
// } catch (Exception e) {
// getLogger().error("HelloWorldProcessor failure", e);
// throw new ProcessException("HelloWorldProcessor failure", e);
// }
// }
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
try {
getLogger().info("onTrigger");
FlowFile flowFile = session.get();
if (flowFile == null) {
getLogger().info("Null FlowFile");
return;
}
StateManager stateManager = context.getStateManager();
StateMap oldState = stateManager.getState(Scope.CLUSTER);
Map<String, String> stateMap = new HashMap<>(oldState.toMap());
String counterStr = stateMap.get(COUNTER_KEY);
int counter = counterStr != null ? Integer.parseInt(counterStr) : 0;
counter++;
stateMap.put(COUNTER_KEY, Integer.toString(counter));
boolean success = stateManager.replace(oldState, stateMap, Scope.CLUSTER);
if (success) {
session.transfer(flowFile, REL_SUCCESS);
} else {
session.transfer(flowFile, REL_FAILURE);
}
} catch (Exception e) {
getLogger().error("HelloWorldProcessor failure", e);
throw new ProcessException("HelloWorldProcessor failure", e);
}
}
}

View File

@ -1,157 +0,0 @@
package org.apache.nifi.processors.helloworld;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags("hello")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
public class HelloWorldProcessor extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected void init(ProcessorInitializationContext context) {
getLogger().info("init");
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
getLogger().info("customValidate");
return Collections.emptyList();
}
@OnScheduled
public void onScheduled() {
getLogger().info("onScheduled");
}
@OnUnscheduled
public void onUnscheduled() {
getLogger().info("onUnscheduled");
}
@OnStopped
public void onStopped() {
getLogger().info("onStopped");
}
@OnShutdown
public void onShutdown() {
getLogger().info("onShutdown");
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
try {
getLogger().info("onTrigger");
FlowFile flowFile = session.get();
if (flowFile == null) {
getLogger().info("Null FlowFile");
return;
}
// List<FlowFile> flowFiles = session.get(ff -> {
// //String uuid = ff.getAttribute(CoreAttributes.UUID.key());
// String uuid = ff.getAttribute("my.uuid");
// if (uuid != null && uuid.equals("just this")) {
// return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
// } else {
// return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
// }
// });
// if (flowFiles.isEmpty()) {
// getLogger().info("Null FlowFile");
// return;
// }
// FlowFile flowFile = flowFiles.get(0);
String name = flowFile.getAttribute("name");
if (name == null) {
name = "Anonymous";
}
if ("Exception".equals(name)) {
throw new ProcessException("exception");
// => session.rollback(true) in AbstractProcessor (if thrown from onTrigger())
}
if ("Yield".equals(name)) {
context.yield();
session.remove(flowFile);
return;
}
if ("Penalize".equals(name)) {
session.penalize(flowFile);
session.rollback();
return;
// => no effect, will be rolled back
// same as Rollback case below
// session.rollback(true) should be used or ProcessException should be thrown
}
if ("Rollback".equals(name)) {
session.rollback();
return;
// => retriggered immediately
}
if ("Failure".equals(name)) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
String greeting = String.format("Hello %s!", name);
getLogger().info(greeting);
try (OutputStream os = session.write(flowFile)) {
os.write(greeting.getBytes());
}
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
getLogger().error("HelloWorldProcessor failure", e);
throw new ProcessException("HelloWorldProcessor failure", e);
}
}
}

View File

@ -227,8 +227,6 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran
return typeFactory.createJavaType(BigInteger.class);
case DECIMAL:
return typeFactory.createJavaType(BigDecimal.class);
case ENUM:
return typeFactory.createJavaType(Enum.class);
case CHOICE:
final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType;
DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0);

View File

@ -251,7 +251,7 @@ public class TestQueryRecord {
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(REL_NAME,
"SELECT title, name, jobLevel" +
"SELECT title, name" +
" FROM FLOWFILE" +
" WHERE CARDINALITY(addresses) > 1");
@ -270,7 +270,6 @@ public class TestQueryRecord {
final Record output = written.get(0);
assertEquals("John Doe", output.getValue("name"));
assertEquals("Software Engineer", output.getValue("title"));
assertEquals(JobLevel.IC2, output.getValue("jobLevel"));
}
@Test
@ -778,7 +777,6 @@ public class TestQueryRecord {
* {
* "name": "John Doe",
* "title": "Software Engineer",
* "jobLevel": "IC2",
* "age": 40,
* "addresses": [{
* "streetNumber": 4820,
@ -817,7 +815,6 @@ public class TestQueryRecord {
personFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
personFields.add(new RecordField("title", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("jobLevel", RecordFieldType.ENUM.getDataType()));
personFields.add(new RecordField("height", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.DOUBLE.getDataType(), RecordFieldType.INT.getDataType())));
personFields.add(new RecordField("addresses", RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.RECORD.getRecordDataType(addressSchema)) ));
final RecordSchema personSchema = new SimpleRecordSchema(personFields);
@ -847,7 +844,6 @@ public class TestQueryRecord {
map.put("age", 30);
map.put("height", 60.5);
map.put("title", "Software Engineer");
map.put("jobLevel", JobLevel.IC2);
map.put("addresses", new Record[] {homeAddress, workAddress});
return new MapRecord(personSchema, map);
}
@ -1285,10 +1281,4 @@ public class TestQueryRecord {
}
public enum JobLevel {
IC1,
IC2,
IC3
}
}