Fix the AMQP message scanning to account for the header not being at the
front of the buffer which also accounts for odd case of broker storing
message with delivery annotations ahead of the header.
The local variable constructorPos is initialized with the value of
buffer.position() in line 330, but the variable itself is not used
anywhere else, which may indicate an error.
In 1st branch (line 274) of the if() statement, the
Boolean.parseBoolean() method accepts the value
frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL), which is used in line
273.
In 2nd branch (line 276), the Boolean.parseBoolean() method accepts the
value frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL), although the
other value frame.hasHeader(Stomp.Headers.Subscribe.ACTIVEMQ_NO_LOCAL)
is used.
Found by Linux Verification Center (portal.linuxtesting.ru) with SVACE.
Author A. Slepykh.
This is particularly true for the Mirrored SNF queue. Redistribution is not meant for internal queues. If an internal queue happens to have the same name on another server, it should not trigger redistribution when consumers are removed.
It would be possible to work around this by adding an address-setting specific to the address with redistribution disabled.
ClusteredMirrorSoakTest was intermittently failing because of this. For a few seconds while the mirror connection is still being made connections could move messages from one node towards another node if both have the same name.
When federation is configured in two directions between nodes for an address
the message can reflect from one node to another if max hops is not set or not
set correctly and in some federation topologies the max hops value can't solve
the issue and still result in a working configuration. This reflection should
be prevented at the federation consumer level for address consumers.
Generate MQTT message IDs from full allowed range of 1-65535 and skip
currently used values. Do not use atomic integer for current ID, because
all accesses and modifications are performed in synchronized context.
When Queue consumers attach with filters use those instead of the Queue
filter to filter the messages that are federated to avoid stranding of
messages on the local broker. This will result in multiple federation
consumers if the various attached local consumers all use different
filters but does keep unwanted messages on the remote so that consumers
there can consume those.
Under some scenarios federation demand tracking is losing track of total demand
for a federated resource leading to teardown of federated links before all local
demand has been removed from the resource. This occurs most often if the attempts
to establish a federation link are refused because the resource hasn't yet been
created and an eventual attach succeeds, but can also occur in combination with
a plugin blocking or not blocking federation link creation in some cases.
When an AMQP federation instance attempts to federate an address or queue
it can fail if the remote address or queue is not present or cannot be
created based on broker policy. A federation link can also closed if the
federated resource is removed from the remote broker by management etc.
In those cases the remote broker should note the resources that were
targets of federation and send alerts to the source federation broker to
notify it that these resources become available for federation and the
source should attempt again to create federation links if demand still
exists. This allows an AMQP federation instance to heal itself based on
updates from the remote.
Currently when an MQTT topic filter contains characters from the
configured wildcard syntax the conversion to/from this syntax breaks.
For example, when using the default wildcard syntax if an MQTT topic
filter contains a . the conversion from the MQTT wildcard syntax to the
core wildcard syntax and back will result in the `.` being replaced with
a `/.`.
This commit fixes that plus a few other things...
- Implements proper conversions to/from one WildcardConfiguration to
another.
- Refactors the MQTT code which invokes these conversion methods. This
includes simplifying a lot of test code.
- Adds lots of tests for everything.
- Clarifies some variable naming to better distinguish between core and
MQTT.
This commit:
- Eliminates MQTT session storage on every successful connection.
Instead data is only written when subsriptions are created or
destroyed.
- Adds a configuration property for the storage timeout.
- Updates the documentation with relevant information.
- Refactors a few bits of code to eliminate unnecessary variables, etc.
Change from forcing a session start cycle on each consumer add
event and start only those consumers that were added which will
trigger a prompt delivery action on each. The session should be
marked started on create to account for the remove of the start
on each consumer add event.
If the broker is embedded into a Jakarta environment then the existing
artemis-openwire-protocol module won't work because it uses javax
classes.
This commit adds a new Jakarta-specific module that can be used to
support OpenWire clients in Jakarta environments (e.g. Spring Boot 3).
Users will simply need to include this version on their classpath to
enable support.
Allows the configuration of AMQP Federation broker connections to be updated and
reloaded. This allows for update, add or remove of AMQP federation broker connections
as well as the basic AMQP sender and receiver broker connections. It checks for and
ignores changes in AMQP broker connections that are performing Mirroring as that
would lead to issues that can break mirroring.
When initially developed the expectation was that no more producers would keep connecting but in a scenario like this
the consumers could actually give up and things will just accumulate on the server.
We should cleanup these upon disconnect.
Mirror acks should be performed atomically with the storage of the source ACK. Both the send of the ack and the recording of the ack should be part of the same transaction (in case of transactional).
We are also adding support on transactions for an afterWired callback for the proper plug of OperationContext sync.
This commit does the following:
- Replaces non-inclusive terms (e.g. master, slave, etc.) in the
source, docs, & configuration.
- Supports previous configuration elements, but logs when old elements
are used.
- Provides migration documentation.
- Updates XSD with new config elements and simplifies by combining some
overlapping complexTypes.
- Removes ambiguous "live" language that's used with regard to high
availability.
- Standardizes use of "primary," "backup," "active," & "passive" as
nomenclature to describe both configuration & runtime state for high
availability.
Starting with 2.28.0, the broker doesn't translate the character `/` to
the configured wildcard delimiter (i.e. `.` by default) when creating
subscription queues for MQTT clients.
This commit fixes that regression and restores the proper translation.
Allow for core messages to be tunneled over broker connection links used
for AMQP Federation and for broker mirroring. This eliminates the need to
convert from Core to AMQP and from loading core large messages fully into
memory for that conversion.