NIFI-12478 Return Message Type as body for JMS Object Messages (#8131)

This commit is contained in:
David Handermann 2023-12-07 15:41:34 -06:00 committed by GitHub
parent b9dbcab160
commit f73888e7dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 9 additions and 24 deletions

View File

@ -38,6 +38,7 @@ import javax.jms.StreamMessage;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Enumeration; import java.util.Enumeration;
@ -184,7 +185,8 @@ class JMSConsumer extends JMSWorker {
messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message);
} else if (message instanceof ObjectMessage) { } else if (message instanceof ObjectMessage) {
messageType = ObjectMessage.class.getSimpleName(); messageType = ObjectMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((ObjectMessage) message); // Return Message Type as body to avoid unsupported class references
messageBody = messageType.getBytes(StandardCharsets.UTF_8);
} else if (message instanceof StreamMessage) { } else if (message instanceof StreamMessage) {
messageType = StreamMessage.class.getSimpleName(); messageType = StreamMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage) message); messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage) message);

View File

@ -31,13 +31,11 @@ import javax.jms.BytesMessage;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MapMessage; import javax.jms.MapMessage;
import javax.jms.MessageEOFException; import javax.jms.MessageEOFException;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage; import javax.jms.StreamMessage;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SerializationUtils;
/** /**
* *
@ -88,19 +86,6 @@ abstract class MessageBodyToBytesConverter {
} }
} }
/**
*
* @param message instance of {@link ObjectMessage}
* @return byte array representing the {@link ObjectMessage}
*/
public static byte[] toBytes(ObjectMessage message) {
try {
return SerializationUtils.serialize(message.getObject());
} catch (Exception e) {
throw new MessageConversionException("Failed to convert " + ObjectMessage.class.getSimpleName() + " to byte[]", e);
}
}
/** /**
* @param message instance of {@link StreamMessage} * @param message instance of {@link StreamMessage}
* @return byte array representing the {@link StreamMessage} * @return byte array representing the {@link StreamMessage}

View File

@ -18,7 +18,6 @@ package org.apache.nifi.jms.processors;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse; import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -70,9 +69,9 @@ public class JMSPublisherConsumerIT {
}; };
Consumer<JMSResponse> responseChecker = response -> { Consumer<JMSResponse> responseChecker = response -> {
assertEquals( assertArrayEquals(
"stringAsObject", ObjectMessage.class.getSimpleName().getBytes(StandardCharsets.UTF_8),
SerializationUtils.deserialize(response.getMessageBody()) response.getMessageBody()
); );
}; };
@ -447,7 +446,7 @@ public class JMSPublisherConsumerIT {
} }
callbackInvoked.set(true); callbackInvoked.set(true);
assertEquals("1", new String(response.getMessageBody())); assertEquals("2", new String(response.getMessageBody()));
acknowledge(response); acknowledge(response);
}); });
} }
@ -464,7 +463,7 @@ public class JMSPublisherConsumerIT {
} }
callbackInvoked.set(true); callbackInvoked.set(true);
assertEquals("2", new String(response.getMessageBody())); assertEquals("1", new String(response.getMessageBody()));
throw new RuntimeException("intentional to avoid explicit ack"); throw new RuntimeException("intentional to avoid explicit ack");
}); });
} }
@ -483,7 +482,7 @@ public class JMSPublisherConsumerIT {
} }
callbackInvoked.set(true); callbackInvoked.set(true);
assertEquals("2", new String(response.getMessageBody())); assertEquals("1", new String(response.getMessageBody()));
acknowledge(response); acknowledge(response);
}); });
} }

View File

@ -1166,7 +1166,6 @@
!ITDeleteAzureBlobStorage_v12, !ITDeleteAzureBlobStorage_v12,
!ITMoveAzureDataLakeStorage, !ITMoveAzureDataLakeStorage,
!AzureGraphUserGroupProviderIT, !AzureGraphUserGroupProviderIT,
!JMSPublisherConsumerIT#validateMessageRedeliveryWhenNotAcked,
!GremlinClientServiceYamlSettingsAndBytecodeIT, !GremlinClientServiceYamlSettingsAndBytecodeIT,
!GremlinClientServiceControllerSettingsIT, !GremlinClientServiceControllerSettingsIT,
!ITestConsumeEmail#validateUrl, !ITestConsumeEmail#validateUrl,