diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloStateProcessor.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloStateProcessor.java deleted file mode 100644 index 0e058f467e..0000000000 --- a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloStateProcessor.java +++ /dev/null @@ -1,167 +0,0 @@ -package org.apache.nifi.processors.helloworld; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.Stateful; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnShutdown; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.state.Scope; -import org.apache.nifi.components.state.StateManager; -import org.apache.nifi.components.state.StateMap; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; - -import java.io.IOException; -import java.time.LocalDateTime; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -@Tags("state") -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@Stateful(scopes = Scope.CLUSTER, description = "") -public class HelloStateProcessor extends AbstractProcessor { - - private static final String COUNTER_KEY = "counter"; - private static final String TIMESTAMP_KEY = "timestamp"; - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .build(); - - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .build(); - - public static final Set RELATIONSHIPS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); - - @Override - public Set getRelationships() { - return RELATIONSHIPS; - } - - @Override - protected void init(ProcessorInitializationContext context) { - getLogger().info("init"); - } - - @Override - protected Collection customValidate(ValidationContext validationContext) { - getLogger().info("customValidate"); - return Collections.emptyList(); - } - - @OnScheduled - public void onScheduled(ProcessContext context) throws IOException { - getLogger().info("onScheduled"); - if (getNodeTypeProvider().isPrimary()) { - final StateManager stateManager = context.getStateManager(); - final StateMap state = stateManager.getState(Scope.CLUSTER); - - if (!state.getStateVersion().isPresent()) { - stateManager.setState(new HashMap<>(), Scope.CLUSTER); - } - } - } - - @OnUnscheduled - public void onUnscheduled() { - getLogger().info("onUnscheduled"); - } - - @OnStopped - public void onStopped() { - getLogger().info("onStopped"); - } - - @OnShutdown - public void onShutdown() { - getLogger().info("onShutdown"); - } - -// public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { -// try { -// getLogger().info("onTrigger"); -// -// FlowFile flowFile = session.get(); -// if (flowFile == null) { -// getLogger().info("Null FlowFile"); -// return; -// } -// -// StateMap oldState = session.getState(Scope.CLUSTER); -// -// Map stateMap = new HashMap<>(oldState.toMap()); -// -// String counterStr = stateMap.get(COUNTER_KEY); -// -// int counter = counterStr != null ? Integer.parseInt(counterStr) : 0; -// counter++; -// stateMap.put(COUNTER_KEY, Integer.toString(counter)); -// -// stateMap.put(TIMESTAMP_KEY, LocalDateTime.now().toString()); -// -// boolean success = session.replaceState(oldState, stateMap, Scope.CLUSTER); // reread state -// -// if (success) { -// session.transfer(flowFile, REL_SUCCESS); -// } else { -// session.transfer(flowFile, REL_FAILURE); -// } -// } catch (Exception e) { -// getLogger().error("HelloWorldProcessor failure", e); -// throw new ProcessException("HelloWorldProcessor failure", e); -// } -// } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - try { - getLogger().info("onTrigger"); - - FlowFile flowFile = session.get(); - if (flowFile == null) { - getLogger().info("Null FlowFile"); - return; - } - - StateManager stateManager = context.getStateManager(); - - StateMap oldState = stateManager.getState(Scope.CLUSTER); - - Map stateMap = new HashMap<>(oldState.toMap()); - - String counterStr = stateMap.get(COUNTER_KEY); - - int counter = counterStr != null ? Integer.parseInt(counterStr) : 0; - counter++; - stateMap.put(COUNTER_KEY, Integer.toString(counter)); - - boolean success = stateManager.replace(oldState, stateMap, Scope.CLUSTER); - - if (success) { - session.transfer(flowFile, REL_SUCCESS); - } else { - session.transfer(flowFile, REL_FAILURE); - - } - } catch (Exception e) { - getLogger().error("HelloWorldProcessor failure", e); - throw new ProcessException("HelloWorldProcessor failure", e); - } - } -} diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloWorldProcessor.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloWorldProcessor.java deleted file mode 100644 index c30c80e039..0000000000 --- a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloWorldProcessor.java +++ /dev/null @@ -1,157 +0,0 @@ -package org.apache.nifi.processors.helloworld; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.configuration.DefaultSchedule; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnShutdown; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.FlowFileFilter; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; - -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -@Tags("hello") -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -public class HelloWorldProcessor extends AbstractProcessor { - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .build(); - - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .build(); - - public static final Set RELATIONSHIPS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); - - @Override - public Set getRelationships() { - return RELATIONSHIPS; - } - - @Override - protected void init(ProcessorInitializationContext context) { - getLogger().info("init"); - } - - @Override - protected Collection customValidate(ValidationContext validationContext) { - getLogger().info("customValidate"); - return Collections.emptyList(); - } - - @OnScheduled - public void onScheduled() { - getLogger().info("onScheduled"); - } - - @OnUnscheduled - public void onUnscheduled() { - getLogger().info("onUnscheduled"); - } - - @OnStopped - public void onStopped() { - getLogger().info("onStopped"); - } - - @OnShutdown - public void onShutdown() { - getLogger().info("onShutdown"); - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - try { - getLogger().info("onTrigger"); - - FlowFile flowFile = session.get(); - if (flowFile == null) { - getLogger().info("Null FlowFile"); - return; - } - -// List flowFiles = session.get(ff -> { -// //String uuid = ff.getAttribute(CoreAttributes.UUID.key()); -// String uuid = ff.getAttribute("my.uuid"); -// if (uuid != null && uuid.equals("just this")) { -// return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE; -// } else { -// return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE; -// } -// }); -// if (flowFiles.isEmpty()) { -// getLogger().info("Null FlowFile"); -// return; -// } -// FlowFile flowFile = flowFiles.get(0); - - String name = flowFile.getAttribute("name"); - if (name == null) { - name = "Anonymous"; - } - - if ("Exception".equals(name)) { - throw new ProcessException("exception"); - // => session.rollback(true) in AbstractProcessor (if thrown from onTrigger()) - } - - if ("Yield".equals(name)) { - context.yield(); - session.remove(flowFile); - return; - } - - if ("Penalize".equals(name)) { - session.penalize(flowFile); - session.rollback(); - return; - // => no effect, will be rolled back - // same as Rollback case below - // session.rollback(true) should be used or ProcessException should be thrown - } - - if ("Rollback".equals(name)) { - session.rollback(); - return; - // => retriggered immediately - } - - if ("Failure".equals(name)) { - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - return; - } - - String greeting = String.format("Hello %s!", name); - getLogger().info(greeting); - - try (OutputStream os = session.write(flowFile)) { - os.write(greeting.getBytes()); - } - - session.transfer(flowFile, REL_SUCCESS); - } catch (Exception e) { - getLogger().error("HelloWorldProcessor failure", e); - throw new ProcessException("HelloWorldProcessor failure", e); - } - } -} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java index e462972387..a2debbdce5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java @@ -227,8 +227,6 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran return typeFactory.createJavaType(BigInteger.class); case DECIMAL: return typeFactory.createJavaType(BigDecimal.class); - case ENUM: - return typeFactory.createJavaType(Enum.class); case CHOICE: final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType; DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index d8a9a1f069..20f6dca199 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -251,7 +251,7 @@ public class TestQueryRecord { runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader"); runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer"); runner.setProperty(REL_NAME, - "SELECT title, name, jobLevel" + + "SELECT title, name" + " FROM FLOWFILE" + " WHERE CARDINALITY(addresses) > 1"); @@ -270,7 +270,6 @@ public class TestQueryRecord { final Record output = written.get(0); assertEquals("John Doe", output.getValue("name")); assertEquals("Software Engineer", output.getValue("title")); - assertEquals(JobLevel.IC2, output.getValue("jobLevel")); } @Test @@ -778,7 +777,6 @@ public class TestQueryRecord { * { * "name": "John Doe", * "title": "Software Engineer", - * "jobLevel": "IC2", * "age": 40, * "addresses": [{ * "streetNumber": 4820, @@ -817,7 +815,6 @@ public class TestQueryRecord { personFields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); personFields.add(new RecordField("age", RecordFieldType.INT.getDataType())); personFields.add(new RecordField("title", RecordFieldType.STRING.getDataType())); - personFields.add(new RecordField("jobLevel", RecordFieldType.ENUM.getDataType())); personFields.add(new RecordField("height", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.DOUBLE.getDataType(), RecordFieldType.INT.getDataType()))); personFields.add(new RecordField("addresses", RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.RECORD.getRecordDataType(addressSchema)) )); final RecordSchema personSchema = new SimpleRecordSchema(personFields); @@ -847,7 +844,6 @@ public class TestQueryRecord { map.put("age", 30); map.put("height", 60.5); map.put("title", "Software Engineer"); - map.put("jobLevel", JobLevel.IC2); map.put("addresses", new Record[] {homeAddress, workAddress}); return new MapRecord(personSchema, map); } @@ -1285,10 +1281,4 @@ public class TestQueryRecord { } - public enum JobLevel { - IC1, - IC2, - IC3 - } - }