If a jms client (be it openwire, amqp, or core jms) receives a message that
is from a different protocol, the JMSMessageID maybe null when the
jms client expects it.
Add consumer priority support
Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs
When broker's advisory is disabled (supportAdvisory=false) any
advisory consumer won't get created at broker and the advisory
consumer ID won't be stored.
Legacy openwire clients can have a reference of advisory consumer
regardless broker's settings and therefore when it closes the
advisory consumer the broker has no reference to it.
Therefore broker throws an exception like:
javax.jms.IllegalStateException: Cannot
remove a consumer that had not been registered
If the broker stores the consumer info (even it doesn't create
it) the exception can be avoided.
There's a *slight* semantic change with the behavior of the queue query
and binding query to make them consistent with the address query, namely
that they will return the name of the queue and the name of the address
in every case and the returned names will be not use the FQQN syntax but
will be parsed to reflect their actual names in the broker.
When trying to get the bindings for an address the getBindingsForAddress
method will create a Bindings instance if there are no bindings for the
address. This is unnecessary in most circumstances so use the
lookupBindingsForAddress method instead and check for null.
Refactor ServerJMSMessage so it correctly transposes all JMSX headers.
Push common JMSX mappings for JMS to Message Interface mappings into MessageUtil to avoid duplication in ActiveMQMessage and ServerJMSMessage
Add test cases
Add GroupSequence to Message Interface
Implement Support closing/reset group in queue impl
Update Documentation (copy from activemq5)
Change/Fix OpenWireMessageConverter to use default of 0 if not set, for OpenWire as per documentation http://activemq.apache.org/activemq-message-properties.html
Implement custom LVQ Key and Non-Destructive in broker - protocol agnostic
Make feature configurable via broker.xml, core apis and activemqservercontrol
Add last-value-key test cases
Add non-destructive with lvq test cases
Add non-destructive with expiry-delay test cases
Update documents
Add new methods to support create, update with new attributes
Refactor to pass through queue-attributes in client side methods to reduce further method changes for adding new attributes in future and avoid methods with endless parameters. (note: in future this should prob be done server side too)
Update existing test cases and fake impls for new methods/attributes
An OpenWire client can use a compound destination name of the form
"a,b,c..." and consume from, or subscribe to, multiple destinations.
Such a compound destination only works for topics when the subscriber
is non-durable. Attempting to create a durable subscription on a
compound address will end up with an error.
The cause is when creating durable subs to multiple topics/addresses
the broker uses the same name to create internal queues, which
causes duplicate name conflict.
1. Add tests case to verify issue and fix, tests also tests for same behavior using CORE, OPENWIRE and AMQP JMS Clients.
2. Update Core Client to check for queue before creating, sharedQueue as per createQueue logic.
3. Update ServerSessionPacketHandler to handle packets from old clients to perform to implement the same fix server side for older clients.
4. Correct AMQP protocol so correct error code is returned on security exception so that amqp jms can correctly throw JMSsecurityException
5. Correct AMQP protocol to check for queue exists before create
6. Correct OpenWire protocol to check for address exists before create
If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.
(Perf improvement)
If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.
OpenWireMessageConverter::toAMQMessage on bytes messages is lazy
allocating a write buffer with a default size of 1024 even when
it won't be used to write anything.
It avoid an useless allocation by reducing it to new byte[0].
messageAcknowledged plugin callback methods
Knowing the consumer that expired or acked a message (if available) is
useful and right now a message reference only contains a consumer id
which by itself is not unique so the actual consumer needs to be passed
It includes:
- Message References: no longer uses boxed primitives and AtomicInteger
- Node: intrusive nodes no longer need a reference field holding itself
- RefCountMessage: no longer uses AtomicInteger, but AtomicIntegerFieldUpdater
connection
To prevent a socket from hanging open by a bad client the broker should
make sure to stop the transport if a connection attempt fails by an
OpenWire client
Check for AMQSession being null before handling various TX state checks
in order to ensure the correct errors are thrown and TX rollback is
handled properly.
OpenWireFormat instances are shared between OpenWire connections/sessions/consumers, preventing the clients to scale due to the synchronized marshal/unmarshal on it.
It includes:
- direct transport buffer pooling
- groupId SimpleString pooling
- clientId SimpleString pooling
- reduced ActiveMQDestination[] and AtomicLong allocations on AMQSession:send
- reduced ActiveMQDestination allocations
- refactored shouldBlockProducer path of AMQPSession::send to reduce method size
- exclusive OpenWireFormat per session and connection (in/out) to avoid contention
- refactored trace log to favour inlining
- changed lastSent volatile set into lazy set to avoid full barrier cost on x86
- stateless OpenWireMessageConverter
- send's lock removal thanks to thread-safe NettyConnection
Adding new metrics for tracking message counts and sizes on a Queue.
This includes tracking metrics for pending, delivering and scheduled
messages. The paging store also tracks message size now.
* Move byte util code into ByteUtil
* Re-use the new equals method in SimpleString
* Apply same pools/interners to client decode
* Create String to SimpleString pools/interners for property access via String keys (producer and consumer benefits)
* Lazy init the pools on withing the get methods of CoreMessageObjectPools to get the specific pool, to avoid having this scattered every where.
* reduce SimpleString creation in conversion to/from core message methods with JMS wrapper.
* reduce SimpleString creation in conversion to/from Core in OpenWire, AMQP, MQTT.
When openwire client uses compressed option to send messages
(jms.useCompression=true) openwire client failed to receive them.
The reason is in OpenwireMessageConverter.toAMQMessage():
1. message.setContent() should be called after setting properties
(It will cause the compressed content to decompressed before delivering to clients)
2. message.onSend() should not be called here (it should be used
by producers. If used here it changes the internal flags of the
message and cause receive to fail).
Openwire clients create consumers to advisory topics to receive
notifications. As a result there are internal queues created
on advisory topics. Those consumer shouldn't be exposed via
management APIs which are used by the Console
To fix that the broker doesn't register any queues from
advisory addresses.
Also refactors a code to remove Openwire specific contants
from AddressInfo class.
If message senders and receivers uses different
wireformat.tightEncodingEnabled options, broker will get marshalling
problem. This is because when openwire messages are converted to
core messages, and later these core messages converted to openwire
messages, the broker uses a mashaller that comes with the connection
used to carry the messages.
For example, if a producer sents a message using option "wireformat
.tightEncodingEnabled=false" and a receiver tries to receive it
using 'true' for the same option, it'll never get it because the
broker will fail to use a "tight encoding" marshaller to
decode a 'loose encoded' message.
To fix the problem, we always use 'tight encoding' for internal
message converters.
Remove some redundant code in the processing of the OpenWire Destination
name, both in the topic handler and in the method in general the
destination name only needs to be converted once and only one
SimpleString instance needs to be created from that which will then feed
all the places that either the String form or the SimpleString form of
the result is needed.
By default, every openwire connection will create a queue
under the multicast address ActiveMQ.Advisory.TempQueue.
If a openwire client is create temporary queues these queues
will fill up with messages for as long as the associated
openwire connection is alive. It appears these messages
do not get consumed from the queues.
The reason behind is that advisory messages don't require
acknowledgement so the messages stay at the queue.
delegate to the jdk saslServer. Allow acceptor configuration of supported mechanismis; saslMechanisms=<a,b>
and allow login config scope for krb5 to be configured via saslLoginConfigScope=x
Add krb5sslloginmodule that will populate userPrincipal that can be mapped to roles independently
Generalised callback handlers to take a connection and pull certs or peerprincipal based on
callback. This bubbled up into api change in securitystore and security manager
Use AcitveMQDestination for subscription naming, fixing and aligning queue naming in the process.
The change is behind a configuration toggle so to avoid causing any breaking changes for uses not expecting.
As part of my refactoring on AMQP, the broker shouldn't rely on Application properties
for any broker semantic changes on delivery.
I am removing any access to those now, so we can properly deal with this post 2.0.0.
with this we could send and receive message in their raw format,
without requiring conversions to Core.
- MessageImpl and ServerMessage are removed as part of this
- AMQPMessage and CoreMessage will have the specialized message format for each protocol
- The protocol manager is now responsible to send the message
- The message will provide an encoder for journal and paging
When openwire sends back an exception response, it doesn't set
the correct correlation id. This causes the client to miss the
response and the exception won't get caught.
To fix it we need to add the correlation id before sending.
When sending an empty ObjectMessage, broker doesn't
write a 'length' field to the message buffer. In delivery
the broker tries to read the length from the buffer, which
causes "IndexOutOfBoundsException".
To fix it, we need to check if the buffer is empty or not,
and only read it if the buffer is not empty.
When a producer sends a messages to a temp destination created from
another connection, it fails. The reason behind it is that the
producer's connection didn't receive the advisory message (notification)
from broker about this temp destination, and it will throw an exception
if it doesn't know this temp destination.
The fix is send the advisory to the client so that it knows this destination.
When creating a 'no-local' openwire consumer, it doesn't work,
meaning it can still receive messages from the same connection.
The fix is similar to what Artemis client does, which is adding
a 'filter' to the consumer/subscription.
The difference is that with OpenWire we have to do it on the
broker side.