This is fixing the test org.apache.activemq.artemis.tests.integration.amqp.DivertTopicToQueueTest
This test was broken because the copy wouldn't use the Buffer view gotten by data.duplicate().
Add Concurrency Test to expose concurrency errors seen in logs.
Add Fix to ensure TypedProperties to ensure threadsafety
Add forEach and forEachKey to allow for provide a thread safe way of iterating through keys and values, without needing to duplicate the collection.
Add getMapNames method to remove code duplication and to ensure thread safe
Major refactoring of the AMQPMessage abstraction to resolve
some issue of message corruption still present in the code and
improve the API handling of message changes and re-encoding.
Improves handling of decoding of message sections limiting the
work to only the portions needed and ensuring the state data
is always updated with what has been done. Fixes issues of
corrupt state on copy of message or other changes in filters.
Ensure that the Body of the message is never decoded in the partial
decode phase of the message processing and also gaurd against the
decode of ApplicationProperties which should be done lazily. Add
lazy decode of DeliveryAnnotations as they are not used at present.
Cleans up some of the code on the proton event handler, most noteable:
1. Fix IOCallback creation on each outbound send, use single instance
as the handler only ever does a flush and has no attached state.
2. Fix redundent locking and unlocking of connection lock on the event
path that already ensures that lock is held.
3. Set presettle state on the server sender at attach as it cannot
change afterwards so checking on every message is not needed.
4. Improve buffer type checking on receive to reduce amount of work
Once a re-encode of the message is done the buffer is not being marked
as valid and so subsequent checks on the buffer are all assuming the
message data is not valid and re-encoding over and over. This can lead
to poor performance in some cases and corrupted data in others.
Avoid firing the offerProducerCredit code when we know that the credit
isnt low enough that a refill is needed, which avoids lock contention
and garbage creation as each inbound message is processed.
Ensure the broker looks at local receiver credit when checking for
credit top off threshold and then do a proper top off back to the high
water mark to sync with how client receivers manage their credit.
Update the Qpid JMS and Proton dependencies to lastest and sync Netty
with the 4.1.28.Final version used by Qpid JMS to avoid clash that
breaks a test. Adds override of new Proton-J WritableBuffer API that
allows it to use the Netty String encoder when needed instead of the
slower default version.
Update Qpid JMS to v0.36.0
Proton-J to v0.29.0
Netty to 4.1.28.Final
An OpenWire client can use a compound destination name of the form
"a,b,c..." and consume from, or subscribe to, multiple destinations.
Such a compound destination only works for topics when the subscriber
is non-durable. Attempting to create a durable subscription on a
compound address will end up with an error.
The cause is when creating durable subs to multiple topics/addresses
the broker uses the same name to create internal queues, which
causes duplicate name conflict.
Anonymous senders (those created without a target address) are not
blocked when max-disk-usage is reached. The cause is that when such
a sender is created on the broker, the broker doesn't check the
disk/memory usage and gives out the credit immediately.
When "large" messages are converted to / from core in order to be stored
in the large message store the type of the AMQP body section is being
lost and reconstituted incorrectly in some cases. The message needs to
be annotated with the original AMQP type for the body and that used to
manage the conversion back to AMQP from Core.
Avoids pooling corner cases interacting with ARTEMIS 1843 + ARTEMIS 1861 improvements.
Also tagging ARTEMIS-1941 to note test needs altered along with it.
updates max frame size tests to verify behaviour seen with standalone
brokers rather than non represenative test-only conditions, as well
as more closely validate the recieved messages
1. Add tests case to verify issue and fix, tests also tests for same behavior using CORE, OPENWIRE and AMQP JMS Clients.
2. Update Core Client to check for queue before creating, sharedQueue as per createQueue logic.
3. Update ServerSessionPacketHandler to handle packets from old clients to perform to implement the same fix server side for older clients.
4. Correct AMQP protocol so correct error code is returned on security exception so that amqp jms can correctly throw JMSsecurityException
5. Correct AMQP protocol to check for queue exists before create
6. Correct OpenWire protocol to check for address exists before create
If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.
(Perf improvement)
If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.
Configure a value of 128KB for AMQP max frame size by default to improve
overall performance and provide a limit on delivery size before chunking
begins.
Use new no copy variants for the delivery send and receive and make
use of the ReadableBuffer type that is now used to convery tranfer
payloads without a copy. Also set max outgoing frame size to match
the configured maxFrameSize for the AMQP protocol head to avoid the
case where an overly large frame can be written instead of chunking
a large message.
OpenWireMessageConverter::toAMQMessage on bytes messages is lazy
allocating a write buffer with a default size of 1024 even when
it won't be used to write anything.
It avoid an useless allocation by reducing it to new byte[0].
messageAcknowledged plugin callback methods
Knowing the consumer that expired or acked a message (if available) is
useful and right now a message reference only contains a consumer id
which by itself is not unique so the actual consumer needs to be passed
Make sure the correct buffer is used when decoding the stored Core
message that originated from the conversion of an AMQP message sent and
annotated as a JMS ObjectMessage which trips the large message boundary.
It includes:
- Message References: no longer uses boxed primitives and AtomicInteger
- Node: intrusive nodes no longer need a reference field holding itself
- RefCountMessage: no longer uses AtomicInteger, but AtomicIntegerFieldUpdater
Alternate patch that doesn't copy the message bytes unless doing a
redelivery or skipping delivery annotations in the original version of
the message. Proton-J will copy the bytes provided to the Sender's send
method so a copy isn't necessary on most common sends.
connection
To prevent a socket from hanging open by a bad client the broker should
make sure to stop the transport if a connection attempt fails by an
OpenWire client
Check for AMQSession being null before handling various TX state checks
in order to ensure the correct errors are thrown and TX rollback is
handled properly.
OpenWireFormat instances are shared between OpenWire connections/sessions/consumers, preventing the clients to scale due to the synchronized marshal/unmarshal on it.
It includes:
- direct transport buffer pooling
- groupId SimpleString pooling
- clientId SimpleString pooling
- reduced ActiveMQDestination[] and AtomicLong allocations on AMQSession:send
- reduced ActiveMQDestination allocations
- refactored shouldBlockProducer path of AMQPSession::send to reduce method size
- exclusive OpenWireFormat per session and connection (in/out) to avoid contention
- refactored trace log to favour inlining
- changed lastSent volatile set into lazy set to avoid full barrier cost on x86
- stateless OpenWireMessageConverter
- send's lock removal thanks to thread-safe NettyConnection
This is good when you are a customer and an artemis engineer (e.g. me) asks your journal print-data but you can't do it because that would expose your user's data. If you do artemis data print --safe, that will only expose the journal structure without exposing user's data and eliminate any liability between the engineer and users.
Adding new metrics for tracking message counts and sizes on a Queue.
This includes tracking metrics for pending, delivering and scheduled
messages. The paging store also tracks message size now.
Support exlusive consumer
Allow default address level settings for exclusive consumer
Allow queue level setting in broker.xml
Add the ability to set queue settings via Core JMS using address. Similar to ActiveMQ 5.X
Allow for Core JMS client to define exclusive consumer using address parameters
Add tests
HornetQClientProtocolManager is used to connect HornteQ servers.
During reconnect, it sends a CheckFailoverMessage packet to the
server as part of reconnection. This packet is not supported by
HornetQ server (existing release), so it will break the backward
compatibility.
Also fixed a failover issue where a hornetq NettyConnector's
ConnectorFactory is serialized to the clients who cannot
instantiate it because class not found exception.
Flag needs to be set when auto creating an address so that the address
can be removed later if auto delete is configured when creating a
subscription with MQTT
* Move byte util code into ByteUtil
* Re-use the new equals method in SimpleString
* Apply same pools/interners to client decode
* Create String to SimpleString pools/interners for property access via String keys (producer and consumer benefits)
* Lazy init the pools on withing the get methods of CoreMessageObjectPools to get the specific pool, to avoid having this scattered every where.
* reduce SimpleString creation in conversion to/from core message methods with JMS wrapper.
* reduce SimpleString creation in conversion to/from Core in OpenWire, AMQP, MQTT.
If the Stomp consumer was closed at or near the same time a message was
dispatched then an NPE might result. Throwing an exception is a
relatively expensive operation in the JVM because of the stacktrace
information that needs to be generated, so in cases where it is known
that a null value could be returned one should check and handle it
appropriately.
When openwire client uses compressed option to send messages
(jms.useCompression=true) openwire client failed to receive them.
The reason is in OpenwireMessageConverter.toAMQMessage():
1. message.setContent() should be called after setting properties
(It will cause the compressed content to decompressed before delivering to clients)
2. message.onSend() should not be called here (it should be used
by producers. If used here it changes the internal flags of the
message and cause receive to fail).
Apply fix so that when using JNDI via tomcat resource it works.
Replace original extract of JNDIStorable taken from Qpid, and use ActiveMQ5's as fits better to address this issue. (which primary use case is users migrating from 5.x)
Refactored ActiveMQConnectionFactory to externalise and turn into reference by StringRefAddr's instead of custom RefAddr which isnt standard.
Refactored ActiveMQDestinations similar
Refactored ActiveMQDestination to remove redundent and duplicated name field and ensured getters still behave the same
Unsubscribe topic in clustered environment left open references to the core consumer. This patch properly closes the consumer which results in correct removal of the consumer reference on a remote queue.
Use Map<String, Object> to access the ApplicationProperties section
which is the spec defined type for that section. This will prevent
breakage should proton-j be updated to reflect that in the definition of
that class.
This will cache the last query, optimizing most of the cases
This won't optimize the case where you are sending producers with different address,
but this is not the one I'm after now.
Openwire clients create consumers to advisory topics to receive
notifications. As a result there are internal queues created
on advisory topics. Those consumer shouldn't be exposed via
management APIs which are used by the Console
To fix that the broker doesn't register any queues from
advisory addresses.
Also refactors a code to remove Openwire specific contants
from AddressInfo class.
I'm doing an overal improvement on large message support for AMQP
However this commit is just about a Bug on the converter.
It will be moot after all the changes I'm making, but I would rather keep this separate
as a way to cherry-pick on previous versions eventually.
Extend test cases in MessageTypesTest to cover Core to AMQP combinations for all JMSTypeTests
Fix ServerJMSBytesMessage to correctly return bodyLength
Remove unused/dead code
Allows for JMS selectors on JMSCorrelationID as well as JMSXGroupID
and JMSXUserID along with some fixes to avoid an NPE case and fixes
to the conversion of AMQP MessageID and CorrelationID values when
doing cross protocol mappings. Adds new tests to cover more cases
of using the JMS selector with Qpid JMS and the AMQP test client.
add the adaptTransportConfiguration() method to the
ClientProtocolManagerFactory so that transport configurations used by
the ClientProtocolManager have an opportunity to adapt their transport
configuration.
This allows the HornetQClientProtocolManagerFactory to adapt the
transport configuration received by remote HornetQ broker to replace the
HornetQ-based NettyConnectorFactory by the Artemis-based one.
JIRA: https://issues.apache.org/jira/browse/ARTEMIS-1431
If message senders and receivers uses different
wireformat.tightEncodingEnabled options, broker will get marshalling
problem. This is because when openwire messages are converted to
core messages, and later these core messages converted to openwire
messages, the broker uses a mashaller that comes with the connection
used to carry the messages.
For example, if a producer sents a message using option "wireformat
.tightEncodingEnabled=false" and a receiver tries to receive it
using 'true' for the same option, it'll never get it because the
broker will fail to use a "tight encoding" marshaller to
decode a 'loose encoded' message.
To fix the problem, we always use 'tight encoding' for internal
message converters.
Remove some redundant code in the processing of the OpenWire Destination
name, both in the topic handler and in the method in general the
destination name only needs to be converted once and only one
SimpleString instance needs to be created from that which will then feed
all the places that either the String form or the SimpleString form of
the result is needed.
By default, every openwire connection will create a queue
under the multicast address ActiveMQ.Advisory.TempQueue.
If a openwire client is create temporary queues these queues
will fill up with messages for as long as the associated
openwire connection is alive. It appears these messages
do not get consumed from the queues.
The reason behind is that advisory messages don't require
acknowledgement so the messages stay at the queue.
Remove the unused transaction remove method as the getTransaction method
provides the remove operation and the remove method was doing an invalud
remove of wrong type from the transactions collection.
Added test case for cross protocol on JMSDeliveryMode proving issue, and asserting fix
Added fix to AmqpCoreConverter to ensure durability (JMSDeliveryMode) is retained.
Similar issue spotted with JMSPriority as with JMSDeliveyMode, fixing at the same time.
Added extra test case for jmspriority
Added fix for jmspriority
Avoid null checking each disposition before then checking the type, also
account for not knowing the type. Rearrange the handling code to
prioritize the most common case which is "Accepted"
Only reencode the Header on a Message when the redelivering the Message
to avoid overhead and unneeded modification to the original encoding of
the Header.
Fix for MQTT in connect due to deprecated methods bug causing NPE, call new methods and null check ourselves.
Also raised https://github.com/netty/netty/issues/7076 upstream but i guess we will keep this, as the old methods are deprecated anyhow.
delegate to the jdk saslServer. Allow acceptor configuration of supported mechanismis; saslMechanisms=<a,b>
and allow login config scope for krb5 to be configured via saslLoginConfigScope=x
On completion of drain the response is not flushed and the
client can wait a few seconds before another broker task
flushes the work. Flush the connection after updating the
linked as being drained. Also perform the work with the
connection lock held to prevent conccurent update of proton
state.
Add krb5sslloginmodule that will populate userPrincipal that can be mapped to roles independently
Generalised callback handlers to take a connection and pull certs or peerprincipal based on
callback. This bubbled up into api change in securitystore and security manager
Use AcitveMQDestination for subscription naming, fixing and aligning queue naming in the process.
The change is behind a configuration toggle so to avoid causing any breaking changes for uses not expecting.
Adds headers AMQ_SCHEDULED_DELAY and AMQ_SCHEDULED_TIME to STOMP
protocol handling to allow for delayed and scheduled time of a
message. The AMQ_SCHEDULED_DELAY brings forward the same option
from the 5.x broker and the AMQ_SCHEDULED_TIME option adds a fixed
time of delivery alternative to match that of AMQP and others.
Add test case, to prove the issue, and then obviously ensure it works, post fix.
Apply changes in logic of createQueueName to handle global better and fix the behaviour.
Create queues so names are same as behaviour with core client.
this will fix a few multiple protocol tests on ConsumerTests.
And a few other AMQP tests dealing with conversions.
You would get a classCastException without this commit.
If an error escapes into the event processing layer we close the
connection with an error condition to avoid the client becoming stuck on
waiting for a response from the broker and the broker side being in an
unknown state.
Multiplication operations where the operands have type `int` but the
result is cast to `long` may lead to overflow.
Fixes two instances of this problem, by ensuring the operands are cast
to `long` during multiplication.
This resolves the "Result of integer multiplication cast to long"
alerts at https://lgtm.com/projects/g/apache/activemq-artemis/alerts.
When creating some AMQP resources (senders, receivers, etc) the broker
can return an error of 'failed' instead of the security error that is
expected in these cases. In the case of a receiver being created and
a security error happening the broker fails to send back a response
causing the client to hang waiting for an attach response.
Broker should support full qualified queue names (FQQN)
as well as bare queue names. This means when clients access
to a queue they have two equivalent ways to do so. One way
is by queue names and the other is by FQQN (i.e. address::qname)
names. Currently only receiving is supported.
On link attach we currently default out SenderSettleMode to MIXED which
while legal doesn't truly reflect what the client asked for. We instead
now update the link to reflect the mode requested by the client
Also add some tests to ensure that we always return the
ReceiverSettleMode as FIRST since we don't support SECOND.
Broker should support full qualified queue names (FQQN)
as well as bare queue names. This means when clients access
to a queue they have two equivalent ways to do so. One way
is by queue names and the other is by FQQN (i.e. address::qname)
names. Currently only receiving is supported.
Resuse a single small buffer for all txn commands (declare / dischare) to
avoid creating lots of small arrays and ByteBuffer wrappers for txn operations.
Fix the getUserID and getTimestamp methods in AMQPMessage to read and
return the correct values. Adds some tests to cover these cases and
cleans up some others.
Ensure that the header value for priority is read and returned in a form
that is scaled such that it won't cause an IndexOutOfBoundsException
from the QueueImpl priority array. Adds some additional testing for
message priority support.
When populate-validated-user = true AMQP messages can cause exceptions.
This feature isn't particularly applicable to AMQP so this commit
eliminates the exception and leaves the AMQP messages untouched
even if populate-validated-user = true. In other words,
populate-validated-user + AMQP is not supported.
When I added flow control, some tests that were using reflection started to fail.
Also as a precaution I'm using <= on the flow control low credit check
When a message is sent to the broker with a TransactionState indicating
that the message should be included in a transaction the disposition from
the broker indicating acceptance of the message should be done using a
TransactionState value that contained the TX ID and the Accepted
disposition.
The coordinator needs to refill credit on the receiver once it has been
exhausted, otherwise the remote cannot send additional declare or
discharge commands to the broker.
As part of my refactoring on AMQP, the broker shouldn't rely on Application properties
for any broker semantic changes on delivery.
I am removing any access to those now, so we can properly deal with this post 2.0.0.
with this we could send and receive message in their raw format,
without requiring conversions to Core.
- MessageImpl and ServerMessage are removed as part of this
- AMQPMessage and CoreMessage will have the specialized message format for each protocol
- The protocol manager is now responsible to send the message
- The message will provide an encoder for journal and paging
When openwire sends back an exception response, it doesn't set
the correct correlation id. This causes the client to miss the
response and the exception won't get caught.
To fix it we need to add the correlation id before sending.
When sending an empty ObjectMessage, broker doesn't
write a 'length' field to the message buffer. In delivery
the broker tries to read the length from the buffer, which
causes "IndexOutOfBoundsException".
To fix it, we need to check if the buffer is empty or not,
and only read it if the buffer is not empty.
When a producer sends a messages to a temp destination created from
another connection, it fails. The reason behind it is that the
producer's connection didn't receive the advisory message (notification)
from broker about this temp destination, and it will throw an exception
if it doesn't know this temp destination.
The fix is send the advisory to the client so that it knows this destination.
When creating a 'no-local' openwire consumer, it doesn't work,
meaning it can still receive messages from the same connection.
The fix is similar to what Artemis client does, which is adding
a 'filter' to the consumer/subscription.
The difference is that with OpenWire we have to do it on the
broker side.
Adds tests for handling of Rejected, Released and Modified outcomes for
a delivery sent to a receiver. Tests show that for the Modified outcome
the broker is redelivering the message to the same receiver when the
undeliverable here value is set which violates the AMQP 1.0 specified
handling of that field. Also for Rejected outcome the broker should
be sending the rejected message to the DLQ as Rejected is supposed to
be a terminal outcome.
Small fix included to not adjust the delivery count if the Modified
outcome does not indicate that the delivery failed.
If the SASL plain mechanism arrives with the authzid value set the
mechanism needs to account for its presence and use the correct fields
of the exchange to get the username and password values. Adds some
tests to validate this fix.
The broker needs to return only the filters that are supported on a
receiver attach otherwise the remote is not aware that the broker is not
able to honor the requested configuration of the receiver.
Refactor the AMQP Message transformers both for better performance and
also to fix a number of issues with the transformers creating inbound
and outbound messages with incorrectly mapped values or extra data
appended where it should not be.
Removes the append of the destination annotations from the outbound JMS
transformer as the legacy client has be unsupported at Qpid for quite
some time now.
Since we don't need client implementations any longer, given the maturity level of
qpid jms, these classes can go, as a result a lot of the interfaces can be removed.
As part of this I am removing proton-plug, and reorganizing the packages in a way I think it
makes more sense and easier to other developers to understand and maintain it.
https://issues.apache.org/jira/browse/ARTEMIS-751
max-disk-usage = how much of a disk we can use before the system blocks
global-max-size = how much bytes we can take from memory for messages before we start enter into the configured page mode
This will also change the default created configuration into page-mode as that's more reliable for systems.