mirror of https://github.com/apache/nifi.git
NIFI-13389 Streamlined use of putAllAttributes() in ConsumeJMS
This closes #8955 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
fc01e0ccf4
commit
bd1ad8f9f4
|
@ -61,7 +61,6 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
@ -364,12 +363,10 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
|
||||||
|
|
||||||
final Map<String, String> jmsHeaders = response.getMessageHeaders();
|
final Map<String, String> jmsHeaders = response.getMessageHeaders();
|
||||||
final Map<String, String> jmsProperties = response.getMessageProperties();
|
final Map<String, String> jmsProperties = response.getMessageProperties();
|
||||||
|
Map<String, String> attributes = mergeJmsAttributes(jmsHeaders, jmsProperties);
|
||||||
flowFile = updateFlowFileAttributesWithJMSAttributes(mergeJmsAttributes(jmsHeaders, jmsProperties), flowFile, processSession);
|
attributes.put(JMS_SOURCE_DESTINATION_NAME, destinationName);
|
||||||
flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
|
attributes.put(JMS_MESSAGETYPE, response.getMessageType());
|
||||||
flowFile = processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType());
|
return processSession.putAllAttributes(flowFile, attributes);
|
||||||
|
|
||||||
return flowFile;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processMessageSet(ProcessContext context, ProcessSession session, JMSConsumer consumer, String destinationName, String errorQueueName,
|
private void processMessageSet(ProcessContext context, ProcessSession session, JMSConsumer consumer, String destinationName, String errorQueueName,
|
||||||
|
@ -496,22 +493,6 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Copies JMS attributes (i.e., headers and properties) as FF attributes.
|
|
||||||
* Given that FF attributes mandate that values are of type String, the
|
|
||||||
* copied values of JMS attributes will be "stringified" via
|
|
||||||
* String.valueOf(attribute).
|
|
||||||
*/
|
|
||||||
private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, String> jmsAttributes, FlowFile flowFile, ProcessSession processSession) {
|
|
||||||
Map<String, String> attributes = new HashMap<>();
|
|
||||||
for (Entry<String, String> entry : jmsAttributes.entrySet()) {
|
|
||||||
attributes.put(entry.getKey(), entry.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
flowFile = processSession.putAllAttributes(flowFile, attributes);
|
|
||||||
return flowFile;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<String, String> mergeJmsAttributes(Map<String, String> headers, Map<String, String> properties) {
|
private Map<String, String> mergeJmsAttributes(Map<String, String> headers, Map<String, String> properties) {
|
||||||
final Map<String, String> jmsAttributes = new HashMap<>(headers);
|
final Map<String, String> jmsAttributes = new HashMap<>(headers);
|
||||||
properties.forEach((key, value) -> {
|
properties.forEach((key, value) -> {
|
||||||
|
|
Loading…
Reference in New Issue