This commit enforces a consistent style for parentheses, namely that
there is no padding. Futhermore, it updates all the code that violates
this styling so that the code is styled consistently across the entire
code-base.
This commit enforces a consistent style for commas, namely that a space
should follow a comma. Futhermore, it update all the code that violates
this styling up to date so that the code is styled consistently across
the entire code-base.
Messages that are in-flight on a link when the federation consumer is closed
due to removal of local demand might get enqueued on the local broker but the
disposition might not get sent to the remote leading to a release of the message
back to the source queue leading to a duplicate being left on the remote. The
AMQP receiver links need to be drained and any pending settlements need to be
sent before the link can be allowed to be detached.
Add basic management views for AMQP broker connections and implement control
types for AMQP federation features along with the broker connection management
views. Some initial work also to provide support for other broker connection
features to add management control and also plan for future views of incoming
broker connections and management of AMQP federation resources.
When the final frame of a large message is being written to the file in
the session thread and an IO error occurs such that that connection is
torn down, the large message reader can be closed before the message is
fully processed resulting in corruption. The large message file close
logic needs to occur on the session thread so that the processing of the
bytes can finish and the message gets added to the Queue and the close
can react by not deleting the file when it runs following the read task.
In order to better indicate their nature as broker feature specific queues
we can mark the temporary queues created for AMQP federation events and
control link messages as internal.
Prevents unintended link stealing scenarios when link names match for a new
link whose name is in use by another link still awaiting a detach response.
The broker uses unique IDs for logging statements. As logging changes
over time we need a way to "retire" these IDs (e.g. when certain
logging statements are no longer needed) to ensure they are not used
This commit does the following:
- Removes all the logging methods which are no longer used and
"retires" the corresponding ID (i.e. adds them to the `retiredIDs`
list for that LogBundle).
- Updates the validation for IDs that have been retired or are in
active use including a new suggestion about a valid ID to use when an
invalid ID is found.
- Fixes all the regular expressions in all the various uses of
`@LogBundle` to ensure there are no overlaps to prevent duplicates
between bundles.
Changes from myself and Robbie Gemmell (see PR). This closes#5303.
If an exception is thrown in the AMQP send path and there is an
active transaction we should mark that as rollback only so the
client will see an error when it tries to commit a transaction
that had a failed send.
When a bytes property is added to an AMQPMessage and it is then reencoded it
will fail without first wrapping the byte array in an AMQP binary as required
by the ApplicationProperties section specification defined type allowances.
There was already some verification at AMQPMirrorControllerSource::invalidTarget
however the verification failed on soak test ReplicatedBothNodesMirrorTest,
and an user I was working with also gave me evidence of this happening.
I'm improving the previous verification, which is actually a simplification that works on every case.
PriorityLinkedList has multiple sub-lists, before this commit PriorityLinkedList::setNodeStore would set the same node store between all the lists.
When a removeWithID was called for an item on list[0] the remove from list[4] would always succeed first. This operation would work correctly most of the time except
when tail and head is being used. Many NullPointerExceptions would be seen while iterating on the list for remove operations, and the navigation would be completely broken.
A test was added to PriorityLinkedListTest to make sure the correct lists were used however I was not able to reproduce the NPE condition in that test.
AccumulatedInPageSoakTest reproduced the exact condition for the NPE when significant load is used.
When using the replay functionality the application of filters to
the replayed messages fails to match against AMQP messages due to the
message not getting scanned when some message values are accessed.
Ensure that on server restart the original priority value assigned to an
AMQP message is used when dispatching durable messages from the store.
The AMQP Header section is scanned if present and the priority value
is recovered in an efficient manner.
AckManager.flush would hold a lock on ackManager, There was a possible deadlock with MirrorTarget:
Thread 1:
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.addRetry(AckManager.java:393)
- waiting to lock <0x00000007990a13e8> (a org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.ack(AckManager.java:418)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.performAck(AMQPMirrorControllerTarget.java:479)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.postAcknowledge(AMQPMirrorControllerTarget.java:461)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.actualDelivery(AMQPMirrorControllerTarget.java:318)
at org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver.onMessageComplete(ProtonAbstractReceiver.java:361)
Thread 2:
at jdk.internal.misc.Unsafe.park(java.base@11.0.8/Native Method)
- parking to wait for <0x000000079de0af38> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.8/LockSupport.java:234)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(java.base@11.0.8/AbstractQueuedSynchronizer.java:1079)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(java.base@11.0.8/AbstractQueuedSynchronizer.java:1369)
at java.util.concurrent.CountDownLatch.await(java.base@11.0.8/CountDownLatch.java:278)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.flush(AMQPMirrorControllerTarget.java:230)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager$$Lambda$601/0x00000008005c3040.accept(Unknown Source)
at java.lang.Iterable.forEach(java.base@11.0.8/Iterable.java:75)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.flushMirrorTargets(AckManager.java:184)
- locked <0x00000007990a13e8> (a org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.initRetry(AckManager.java:162)
Send operations should ignore replication, while the ack of the message should wait a round trip in replication.
That will allow us to ack the message faster and still have consistency with its replica.
If a user for some reason force closes the local mirror connection SNF
consumer or the actual connection but hasn't stopped the broker connection
itself the connection should recover and rebuild. The fix ensures that if
local connections are closed first then local session resources get freed.
When an address consumer explicitly closes or is closed we should remove the
address binding for that consumer right away instead of waiting for possible
configured auto delete as the demand is gone and we don't need to binding to
stick around any longer.
Say you use Mirroring and journal replication combined.
The target will wait a round trip on replica before sends are done.
It is possible to ignore that rountrip now with an option added into Configuration#mirrorReplicaSync
The federation sender links can react incorrectly when the source broker
closes a receiver because the demand is gone and they can result in the
remote broker closing its side of the connection causing the whole federation
to need to be rebuilt. Handle the closure events correctly to prevent an
unexpected close and rebuild.
When using targeted FQQN permissions the AMQP sender needs to check that
it can access not only the address but also the queue if sent an FQQN so
that the security can validate if the sender has been granted directed
access to the FQQN as a whole.
for regular messages it's quite obvious when the message is leaving the queue but for paged messages it becomes a challenge. We should just ignore the update for paged messages.
Handle any exceptions from the proton transport and set the error on the
transport for processing in the events dispatch cycle by adding in handling
of the transport error event.