This commit is contained in:
Clebert Suconic 2017-04-11 19:10:23 -04:00
commit d531c5aac5
6 changed files with 28 additions and 9 deletions

View File

@ -305,8 +305,11 @@ public interface Message {
/**
* Returns the userID - this is an optional user specified UUID that can be set to identify the message
* and will be passed around with the message
*
* This represents historically the JMSMessageID.
* We had in the past used this for the MessageID that was sent on core messages...
*
* later on when we added AMQP this name clashed with AMQPMessage.getUserID();
*
* @return the user id
*/

View File

@ -510,6 +510,22 @@ public class AMQPMessage extends RefCountMessage {
@Override
public Object getUserID() {
Properties properties = getProperties();
if (properties != null && properties.getMessageId() != null) {
return properties.getMessageId();
} else {
return null;
}
}
/**
* Before we added AMQP into Artemis / Hornetq, the name getUserID was already taken by JMSMessageID.
* We cannot simply change the names now as it would break the API for existing clients.
*
* This is to return and read the proper AMQP userID.
* @return
*/
public Object getAMQPUserID() {
Properties properties = getProperties();
if (properties != null && properties.getUserId() != null) {
Binary binary = properties.getUserId();
@ -519,6 +535,7 @@ public class AMQPMessage extends RefCountMessage {
}
}
@Override
public org.apache.activemq.artemis.api.core.Message setUserID(Object userID) {
return null;

View File

@ -147,7 +147,7 @@ public class AMQPMessageTest {
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(USER_NAME, decoded.getUserID());
assertEquals(USER_NAME, decoded.getAMQPUserID());
}
@Test

View File

@ -79,10 +79,7 @@ public class FilterImpl implements Filter {
try {
booleanExpression = SelectorParser.parse(filterStr.toString());
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.invalidFilter(filterStr);
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Invalid filter", e);
}
ActiveMQServerLogger.LOGGER.invalidFilter(filterStr, e);
throw ActiveMQMessageBundle.BUNDLE.invalidFilter(e, filterStr);
}
return new FilterImpl(filterStr, booleanExpression);
@ -108,7 +105,7 @@ public class FilterImpl implements Filter {
boolean result = booleanExpression.matches(new FilterableServerMessage(message));
return result;
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.invalidFilter(sfilterString);
ActiveMQServerLogger.LOGGER.invalidFilter(sfilterString, e);
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Invalid filter", e);
}

View File

@ -1320,7 +1320,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224006, value = "Invalid filter: {0}", format = Message.Format.MESSAGE_FORMAT)
void invalidFilter(SimpleString filter);
void invalidFilter(SimpleString filter, @Cause Throwable cause);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224007, value = "page subscription = {0} error={1}", format = Message.Format.MESSAGE_FORMAT)

View File

@ -264,6 +264,8 @@ public class ProtonTest extends ProtonTestBase {
Assert.assertEquals(1, addressControl.getQueueNames().length);
addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, userName, password);
Wait.waitFor(() -> addressControl.getMessageCount() == 1);
Assert.assertEquals(1, addressControl.getMessageCount());
Connection connection = createConnection("myClientId");