Add consumer priority support
Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs
When broker's advisory is disabled (supportAdvisory=false) any
advisory consumer won't get created at broker and the advisory
consumer ID won't be stored.
Legacy openwire clients can have a reference of advisory consumer
regardless broker's settings and therefore when it closes the
advisory consumer the broker has no reference to it.
Therefore broker throws an exception like:
javax.jms.IllegalStateException: Cannot
remove a consumer that had not been registered
If the broker stores the consumer info (even it doesn't create
it) the exception can be avoided.
There's a *slight* semantic change with the behavior of the queue query
and binding query to make them consistent with the address query, namely
that they will return the name of the queue and the name of the address
in every case and the returned names will be not use the FQQN syntax but
will be parsed to reflect their actual names in the broker.
When trying to get the bindings for an address the getBindingsForAddress
method will create a Bindings instance if there are no bindings for the
address. This is unnecessary in most circumstances so use the
lookupBindingsForAddress method instead and check for null.
If a client sends a message to a multicast address and using a qpid-jms
client to receive the message from one of the queues using fully
qualified queue name will fail with following error message:
Address xxxx is not configured for queue support
[condition = amqp:illegal-state]
It should be able to receive the message without any error.
These improvements were also part of this task:
- Routing is now cached as much as possible.
- A new Runnable is avoided for each individual message,
since we use the Netty executor to perform delivery
https://issues.apache.org/jira/browse/ARTEMIS-2205
Update to latest proton-j release and refactor the dispostion code to use
the new type enums to better deal with the dispistions. Updates to Qpid JMS
0.37.0 which still uses the current netty 4.1.28.Final dependency.
Refactor ServerJMSMessage so it correctly transposes all JMSX headers.
Push common JMSX mappings for JMS to Message Interface mappings into MessageUtil to avoid duplication in ActiveMQMessage and ServerJMSMessage
With AMQP protocol when some messages are received in a transaction,
calling JMX QueueControl.listDeliveringMessages() returns empty list
before the transaction is committed.
Add test cases
Add GroupSequence to Message Interface
Implement Support closing/reset group in queue impl
Update Documentation (copy from activemq5)
Change/Fix OpenWireMessageConverter to use default of 0 if not set, for OpenWire as per documentation http://activemq.apache.org/activemq-message-properties.html
Implement custom LVQ Key and Non-Destructive in broker - protocol agnostic
Make feature configurable via broker.xml, core apis and activemqservercontrol
Add last-value-key test cases
Add non-destructive with lvq test cases
Add non-destructive with expiry-delay test cases
Update documents
Add new methods to support create, update with new attributes
Refactor to pass through queue-attributes in client side methods to reduce further method changes for adding new attributes in future and avoid methods with endless parameters. (note: in future this should prob be done server side too)
Update existing test cases and fake impls for new methods/attributes
GlobalDiskFullTest was broken before this fix.
Basically when using multiple addresses over a session you would miss flow credits on all your producers except to the first one
that ran out of credit.
This is fixing the test org.apache.activemq.artemis.tests.integration.amqp.DivertTopicToQueueTest
This test was broken because the copy wouldn't use the Buffer view gotten by data.duplicate().
Add Concurrency Test to expose concurrency errors seen in logs.
Add Fix to ensure TypedProperties to ensure threadsafety
Add forEach and forEachKey to allow for provide a thread safe way of iterating through keys and values, without needing to duplicate the collection.
Add getMapNames method to remove code duplication and to ensure thread safe
Major refactoring of the AMQPMessage abstraction to resolve
some issue of message corruption still present in the code and
improve the API handling of message changes and re-encoding.
Improves handling of decoding of message sections limiting the
work to only the portions needed and ensuring the state data
is always updated with what has been done. Fixes issues of
corrupt state on copy of message or other changes in filters.
Ensure that the Body of the message is never decoded in the partial
decode phase of the message processing and also gaurd against the
decode of ApplicationProperties which should be done lazily. Add
lazy decode of DeliveryAnnotations as they are not used at present.
Cleans up some of the code on the proton event handler, most noteable:
1. Fix IOCallback creation on each outbound send, use single instance
as the handler only ever does a flush and has no attached state.
2. Fix redundent locking and unlocking of connection lock on the event
path that already ensures that lock is held.
3. Set presettle state on the server sender at attach as it cannot
change afterwards so checking on every message is not needed.
4. Improve buffer type checking on receive to reduce amount of work
Once a re-encode of the message is done the buffer is not being marked
as valid and so subsequent checks on the buffer are all assuming the
message data is not valid and re-encoding over and over. This can lead
to poor performance in some cases and corrupted data in others.
Avoid firing the offerProducerCredit code when we know that the credit
isnt low enough that a refill is needed, which avoids lock contention
and garbage creation as each inbound message is processed.
Ensure the broker looks at local receiver credit when checking for
credit top off threshold and then do a proper top off back to the high
water mark to sync with how client receivers manage their credit.
Update the Qpid JMS and Proton dependencies to lastest and sync Netty
with the 4.1.28.Final version used by Qpid JMS to avoid clash that
breaks a test. Adds override of new Proton-J WritableBuffer API that
allows it to use the Netty String encoder when needed instead of the
slower default version.
Update Qpid JMS to v0.36.0
Proton-J to v0.29.0
Netty to 4.1.28.Final
An OpenWire client can use a compound destination name of the form
"a,b,c..." and consume from, or subscribe to, multiple destinations.
Such a compound destination only works for topics when the subscriber
is non-durable. Attempting to create a durable subscription on a
compound address will end up with an error.
The cause is when creating durable subs to multiple topics/addresses
the broker uses the same name to create internal queues, which
causes duplicate name conflict.
Anonymous senders (those created without a target address) are not
blocked when max-disk-usage is reached. The cause is that when such
a sender is created on the broker, the broker doesn't check the
disk/memory usage and gives out the credit immediately.
When "large" messages are converted to / from core in order to be stored
in the large message store the type of the AMQP body section is being
lost and reconstituted incorrectly in some cases. The message needs to
be annotated with the original AMQP type for the body and that used to
manage the conversion back to AMQP from Core.
Avoids pooling corner cases interacting with ARTEMIS 1843 + ARTEMIS 1861 improvements.
Also tagging ARTEMIS-1941 to note test needs altered along with it.
updates max frame size tests to verify behaviour seen with standalone
brokers rather than non represenative test-only conditions, as well
as more closely validate the recieved messages
1. Add tests case to verify issue and fix, tests also tests for same behavior using CORE, OPENWIRE and AMQP JMS Clients.
2. Update Core Client to check for queue before creating, sharedQueue as per createQueue logic.
3. Update ServerSessionPacketHandler to handle packets from old clients to perform to implement the same fix server side for older clients.
4. Correct AMQP protocol so correct error code is returned on security exception so that amqp jms can correctly throw JMSsecurityException
5. Correct AMQP protocol to check for queue exists before create
6. Correct OpenWire protocol to check for address exists before create
If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.
(Perf improvement)
If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.
Configure a value of 128KB for AMQP max frame size by default to improve
overall performance and provide a limit on delivery size before chunking
begins.
Use new no copy variants for the delivery send and receive and make
use of the ReadableBuffer type that is now used to convery tranfer
payloads without a copy. Also set max outgoing frame size to match
the configured maxFrameSize for the AMQP protocol head to avoid the
case where an overly large frame can be written instead of chunking
a large message.
OpenWireMessageConverter::toAMQMessage on bytes messages is lazy
allocating a write buffer with a default size of 1024 even when
it won't be used to write anything.
It avoid an useless allocation by reducing it to new byte[0].
messageAcknowledged plugin callback methods
Knowing the consumer that expired or acked a message (if available) is
useful and right now a message reference only contains a consumer id
which by itself is not unique so the actual consumer needs to be passed
Make sure the correct buffer is used when decoding the stored Core
message that originated from the conversion of an AMQP message sent and
annotated as a JMS ObjectMessage which trips the large message boundary.
It includes:
- Message References: no longer uses boxed primitives and AtomicInteger
- Node: intrusive nodes no longer need a reference field holding itself
- RefCountMessage: no longer uses AtomicInteger, but AtomicIntegerFieldUpdater
Alternate patch that doesn't copy the message bytes unless doing a
redelivery or skipping delivery annotations in the original version of
the message. Proton-J will copy the bytes provided to the Sender's send
method so a copy isn't necessary on most common sends.
connection
To prevent a socket from hanging open by a bad client the broker should
make sure to stop the transport if a connection attempt fails by an
OpenWire client
Check for AMQSession being null before handling various TX state checks
in order to ensure the correct errors are thrown and TX rollback is
handled properly.
OpenWireFormat instances are shared between OpenWire connections/sessions/consumers, preventing the clients to scale due to the synchronized marshal/unmarshal on it.
It includes:
- direct transport buffer pooling
- groupId SimpleString pooling
- clientId SimpleString pooling
- reduced ActiveMQDestination[] and AtomicLong allocations on AMQSession:send
- reduced ActiveMQDestination allocations
- refactored shouldBlockProducer path of AMQPSession::send to reduce method size
- exclusive OpenWireFormat per session and connection (in/out) to avoid contention
- refactored trace log to favour inlining
- changed lastSent volatile set into lazy set to avoid full barrier cost on x86
- stateless OpenWireMessageConverter
- send's lock removal thanks to thread-safe NettyConnection
This is good when you are a customer and an artemis engineer (e.g. me) asks your journal print-data but you can't do it because that would expose your user's data. If you do artemis data print --safe, that will only expose the journal structure without exposing user's data and eliminate any liability between the engineer and users.
Adding new metrics for tracking message counts and sizes on a Queue.
This includes tracking metrics for pending, delivering and scheduled
messages. The paging store also tracks message size now.
Support exlusive consumer
Allow default address level settings for exclusive consumer
Allow queue level setting in broker.xml
Add the ability to set queue settings via Core JMS using address. Similar to ActiveMQ 5.X
Allow for Core JMS client to define exclusive consumer using address parameters
Add tests
HornetQClientProtocolManager is used to connect HornteQ servers.
During reconnect, it sends a CheckFailoverMessage packet to the
server as part of reconnection. This packet is not supported by
HornetQ server (existing release), so it will break the backward
compatibility.
Also fixed a failover issue where a hornetq NettyConnector's
ConnectorFactory is serialized to the clients who cannot
instantiate it because class not found exception.
Flag needs to be set when auto creating an address so that the address
can be removed later if auto delete is configured when creating a
subscription with MQTT
* Move byte util code into ByteUtil
* Re-use the new equals method in SimpleString
* Apply same pools/interners to client decode
* Create String to SimpleString pools/interners for property access via String keys (producer and consumer benefits)
* Lazy init the pools on withing the get methods of CoreMessageObjectPools to get the specific pool, to avoid having this scattered every where.
* reduce SimpleString creation in conversion to/from core message methods with JMS wrapper.
* reduce SimpleString creation in conversion to/from Core in OpenWire, AMQP, MQTT.
If the Stomp consumer was closed at or near the same time a message was
dispatched then an NPE might result. Throwing an exception is a
relatively expensive operation in the JVM because of the stacktrace
information that needs to be generated, so in cases where it is known
that a null value could be returned one should check and handle it
appropriately.
When openwire client uses compressed option to send messages
(jms.useCompression=true) openwire client failed to receive them.
The reason is in OpenwireMessageConverter.toAMQMessage():
1. message.setContent() should be called after setting properties
(It will cause the compressed content to decompressed before delivering to clients)
2. message.onSend() should not be called here (it should be used
by producers. If used here it changes the internal flags of the
message and cause receive to fail).
Apply fix so that when using JNDI via tomcat resource it works.
Replace original extract of JNDIStorable taken from Qpid, and use ActiveMQ5's as fits better to address this issue. (which primary use case is users migrating from 5.x)
Refactored ActiveMQConnectionFactory to externalise and turn into reference by StringRefAddr's instead of custom RefAddr which isnt standard.
Refactored ActiveMQDestinations similar
Refactored ActiveMQDestination to remove redundent and duplicated name field and ensured getters still behave the same
Unsubscribe topic in clustered environment left open references to the core consumer. This patch properly closes the consumer which results in correct removal of the consumer reference on a remote queue.
Use Map<String, Object> to access the ApplicationProperties section
which is the spec defined type for that section. This will prevent
breakage should proton-j be updated to reflect that in the definition of
that class.
This will cache the last query, optimizing most of the cases
This won't optimize the case where you are sending producers with different address,
but this is not the one I'm after now.
Openwire clients create consumers to advisory topics to receive
notifications. As a result there are internal queues created
on advisory topics. Those consumer shouldn't be exposed via
management APIs which are used by the Console
To fix that the broker doesn't register any queues from
advisory addresses.
Also refactors a code to remove Openwire specific contants
from AddressInfo class.
I'm doing an overal improvement on large message support for AMQP
However this commit is just about a Bug on the converter.
It will be moot after all the changes I'm making, but I would rather keep this separate
as a way to cherry-pick on previous versions eventually.
Extend test cases in MessageTypesTest to cover Core to AMQP combinations for all JMSTypeTests
Fix ServerJMSBytesMessage to correctly return bodyLength
Remove unused/dead code
Allows for JMS selectors on JMSCorrelationID as well as JMSXGroupID
and JMSXUserID along with some fixes to avoid an NPE case and fixes
to the conversion of AMQP MessageID and CorrelationID values when
doing cross protocol mappings. Adds new tests to cover more cases
of using the JMS selector with Qpid JMS and the AMQP test client.
add the adaptTransportConfiguration() method to the
ClientProtocolManagerFactory so that transport configurations used by
the ClientProtocolManager have an opportunity to adapt their transport
configuration.
This allows the HornetQClientProtocolManagerFactory to adapt the
transport configuration received by remote HornetQ broker to replace the
HornetQ-based NettyConnectorFactory by the Artemis-based one.
JIRA: https://issues.apache.org/jira/browse/ARTEMIS-1431
If message senders and receivers uses different
wireformat.tightEncodingEnabled options, broker will get marshalling
problem. This is because when openwire messages are converted to
core messages, and later these core messages converted to openwire
messages, the broker uses a mashaller that comes with the connection
used to carry the messages.
For example, if a producer sents a message using option "wireformat
.tightEncodingEnabled=false" and a receiver tries to receive it
using 'true' for the same option, it'll never get it because the
broker will fail to use a "tight encoding" marshaller to
decode a 'loose encoded' message.
To fix the problem, we always use 'tight encoding' for internal
message converters.
Remove some redundant code in the processing of the OpenWire Destination
name, both in the topic handler and in the method in general the
destination name only needs to be converted once and only one
SimpleString instance needs to be created from that which will then feed
all the places that either the String form or the SimpleString form of
the result is needed.
By default, every openwire connection will create a queue
under the multicast address ActiveMQ.Advisory.TempQueue.
If a openwire client is create temporary queues these queues
will fill up with messages for as long as the associated
openwire connection is alive. It appears these messages
do not get consumed from the queues.
The reason behind is that advisory messages don't require
acknowledgement so the messages stay at the queue.
Remove the unused transaction remove method as the getTransaction method
provides the remove operation and the remove method was doing an invalud
remove of wrong type from the transactions collection.
Added test case for cross protocol on JMSDeliveryMode proving issue, and asserting fix
Added fix to AmqpCoreConverter to ensure durability (JMSDeliveryMode) is retained.
Similar issue spotted with JMSPriority as with JMSDeliveyMode, fixing at the same time.
Added extra test case for jmspriority
Added fix for jmspriority