When an AMQP message is sent over a cluster bridge it is embedded into a
Core message. If the size of the AMQP message is barely beneath the
minLargeMessageSize then the Core message in which the AMQP message is
embedded will become a large message. The on the bridge target when the
embedded AMQP message is extracted from the large Core message it will
not be considered "large." In this situation the file for the large Core
message will leak.
Thanks to Erwin Dondorp for the test. I renamed and refactored it a bit,
but the fundamentals came from Erwin.
For embedded use-cases the Micrometer MeterRegistry may be passed in
from the application and used for other meters unrelated to the broker.
Therefore, the broker should not change the config of the MeterRegistry
(e.g. by adding common tags to all meters) as the config change(s) may
be incompatible with the needs of the embedding application.
The class org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection in ActiveMQ Artemis
is based on org.apache.activemq.broker.TransportConnection from ActiveMQ Classic,
and in the latter there is null check that doesn't exist in the former.
Therefore, I think it's worth adding this check.
The info should never be null since it is passed in off the wire from the OpenWire marshaller,
and state.getSessionIds() should also never return null because the underlying ConcurrentMap is initialized when ConnectionState is created.
Allow the client ID to be configured on normal bridge as well as
cluster-connection bridges. This makes the bridge connection easier to
identify on the target broker.
When using the replay functionality the application of filters to
the replayed messages fails to match against AMQP messages due to the
message not getting scanned when some message values are accessed.
Before the changes in 15dd24754a temporary
web resources could proliferate and consume inordinate amounts of disk
space because their directory names were generated uniquely every time
Jetty was started. However, now that they are deterministic no
proliferation is possible. Jetty will create the directories when it
starts, remove them when it stops, and if it fails to clean-up on
shutdown (e.g. crash from OOME) it will clean-up and recreate them when
it starts.
Therefore, our own house-keeping of those directories is no longer
needed and, in fact, causes problems. For example, when executing the
`restartEmbeddedWebServer` management operation the temp web resources
will actually be removed inadvertently causing the web console to fail.
This commit removes the web temp house-keeping logic as well as the
related tests. It also modifies & adds tests to ensure Jetty does this
house-keeping on its own.
Ensure that on server restart the original priority value assigned to an
AMQP message is used when dispatching durable messages from the store.
The AMQP Header section is scanned if present and the priority value
is recovered in an efficient manner.
This is a small usability improvement for management whereby
invocations of some operations no longer require JSON boilerplate. It
impacts the following operations on the ActiveMQServerControl:
- listConnections
- listSessions
- listAddresses
- listQueues
- listConsumers
- listProducers
AckManager.flush would hold a lock on ackManager, There was a possible deadlock with MirrorTarget:
Thread 1:
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.addRetry(AckManager.java:393)
- waiting to lock <0x00000007990a13e8> (a org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.ack(AckManager.java:418)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.performAck(AMQPMirrorControllerTarget.java:479)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.postAcknowledge(AMQPMirrorControllerTarget.java:461)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.actualDelivery(AMQPMirrorControllerTarget.java:318)
at org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver.onMessageComplete(ProtonAbstractReceiver.java:361)
Thread 2:
at jdk.internal.misc.Unsafe.park(java.base@11.0.8/Native Method)
- parking to wait for <0x000000079de0af38> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.8/LockSupport.java:234)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(java.base@11.0.8/AbstractQueuedSynchronizer.java:1079)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(java.base@11.0.8/AbstractQueuedSynchronizer.java:1369)
at java.util.concurrent.CountDownLatch.await(java.base@11.0.8/CountDownLatch.java:278)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.flush(AMQPMirrorControllerTarget.java:230)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager$$Lambda$601/0x00000008005c3040.accept(Unknown Source)
at java.lang.Iterable.forEach(java.base@11.0.8/Iterable.java:75)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.flushMirrorTargets(AckManager.java:184)
- locked <0x00000007990a13e8> (a org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager)
at org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager.initRetry(AckManager.java:162)
do not override the configured netty leak detection from client code.
Use the same logic as server, disabling the default only if no
properties are configured in the JVM.
See 127ce3a84a
Send operations should ignore replication, while the ack of the message should wait a round trip in replication.
That will allow us to ack the message faster and still have consistency with its replica.