Allow for configuration of the batch size granted to the remote when an
AMQP federation queue receiver is pulling messages only when there is
local capacity to handle them. Some code housekeeping is done here to
make adding future properties a bit simpler and require fewer changes.
Create a new NettyConnector for each connection attempt that is configured from
distinct broker connection URIs which allows for differing TLS configuration
per remote connection configuration.
Fix the AMQP message scanning to account for the header not being at the
front of the buffer which also accounts for odd case of broker storing
message with delivery annotations ahead of the header.
The local variable constructorPos is initialized with the value of
buffer.position() in line 330, but the variable itself is not used
anywhere else, which may indicate an error.
This is particularly true for the Mirrored SNF queue. Redistribution is not meant for internal queues. If an internal queue happens to have the same name on another server, it should not trigger redistribution when consumers are removed.
It would be possible to work around this by adding an address-setting specific to the address with redistribution disabled.
ClusteredMirrorSoakTest was intermittently failing because of this. For a few seconds while the mirror connection is still being made connections could move messages from one node towards another node if both have the same name.
When federation is configured in two directions between nodes for an address
the message can reflect from one node to another if max hops is not set or not
set correctly and in some federation topologies the max hops value can't solve
the issue and still result in a working configuration. This reflection should
be prevented at the federation consumer level for address consumers.
When Queue consumers attach with filters use those instead of the Queue
filter to filter the messages that are federated to avoid stranding of
messages on the local broker. This will result in multiple federation
consumers if the various attached local consumers all use different
filters but does keep unwanted messages on the remote so that consumers
there can consume those.
Under some scenarios federation demand tracking is losing track of total demand
for a federated resource leading to teardown of federated links before all local
demand has been removed from the resource. This occurs most often if the attempts
to establish a federation link are refused because the resource hasn't yet been
created and an eventual attach succeeds, but can also occur in combination with
a plugin blocking or not blocking federation link creation in some cases.
When an AMQP federation instance attempts to federate an address or queue
it can fail if the remote address or queue is not present or cannot be
created based on broker policy. A federation link can also closed if the
federated resource is removed from the remote broker by management etc.
In those cases the remote broker should note the resources that were
targets of federation and send alerts to the source federation broker to
notify it that these resources become available for federation and the
source should attempt again to create federation links if demand still
exists. This allows an AMQP federation instance to heal itself based on
updates from the remote.
Allows the configuration of AMQP Federation broker connections to be updated and
reloaded. This allows for update, add or remove of AMQP federation broker connections
as well as the basic AMQP sender and receiver broker connections. It checks for and
ignores changes in AMQP broker connections that are performing Mirroring as that
would lead to issues that can break mirroring.
When initially developed the expectation was that no more producers would keep connecting but in a scenario like this
the consumers could actually give up and things will just accumulate on the server.
We should cleanup these upon disconnect.
Mirror acks should be performed atomically with the storage of the source ACK. Both the send of the ack and the recording of the ack should be part of the same transaction (in case of transactional).
We are also adding support on transactions for an afterWired callback for the proper plug of OperationContext sync.
This commit does the following:
- Replaces non-inclusive terms (e.g. master, slave, etc.) in the
source, docs, & configuration.
- Supports previous configuration elements, but logs when old elements
are used.
- Provides migration documentation.
- Updates XSD with new config elements and simplifies by combining some
overlapping complexTypes.
- Removes ambiguous "live" language that's used with regard to high
availability.
- Standardizes use of "primary," "backup," "active," & "passive" as
nomenclature to describe both configuration & runtime state for high
availability.
Allow for core messages to be tunneled over broker connection links used
for AMQP Federation and for broker mirroring. This eliminates the need to
convert from Core to AMQP and from loading core large messages fully into
memory for that conversion.
This commit contains the following changes:
- eliminate used, undeclared dependencies
- eliminate unused, declared dependencies
- fix scope for test dependencies
- eliminate org.hamcrest completely as its use involved deprecated code
as well as dependencies from multiple versions
When resource audit logging is enabled STOMP is completely inoperable
due to an NPE during the protocol handshake. Unfortunately the failure
is completely silent. There are no logs to indicate a problem.
This commit fixes this problem via the following changes:
- Mitigate the original NPE via a check for null
- Move the logic necessary to set the "protocol connection" on the
"transport connection" to a class shared by all implementations.
- Add exception handling to log failures like this in the future.
- Add tests to ensure the audit logging is correct.
During redistribution, we should not copy all message annotations.
In particular we should not copy any of the x-opt-ORIG annotations used on DLQ and other copies.
this was broken after f632e8104b (ARTEMIS-3833 Preserve JMSCorrelationID of distributed AMQP large messages)
The change preserved too much, and as a result of that AmqpLargeMessageRedistributionTest::testSendMessageToBroker0GetFromBroker2 is intermittently failing.
There is no test in this commit as this is fixing AmqpLargeMessageRedistributionTest
When sending, for example, to a predefined anycast address and queue
from a multicast (JMS topic) producer, the routed count on the address
is incremented, but the message count on the matching queue is not. No
indication is given at the client end that the messages failed to get
routed - the messages are just silently dropped.
Fixing this problem requires a slight semantic change. The broker is now
more strict in what it allows specifically with regards to
auto-creation. If, for example, a JMS application attempts to send a
message to a topic and the corresponding multicast address doesn't exist
already or the broker cannot automatically create it or update it then
sending the message will fail.
Also, part of this commit moves a chunk of auto-create logic into
ServerSession and adds an enum for auto-create results. Aside from
helping fix this specific issue this can serve as a foundation for
de-duplicating the auto-create logic spread across many of the protocol
implementations.
- activemq.notifications are being transferred to the target node, unless an ignore is setup
- topics are being duplicated after redistribution
- topics sends are being duplicated when a 2 node cluster mirrors to another 2 node cluster, and both nodes are mirrored.
- interrupted message breaking reference counting
After the server writing to the client is interrupted in AMQP, the reference counting was broken what would require the server restarted
in order to cleanup the files of any interrupted sends.
- Removed consumer during large message delivery damaging large messages
If the consumer failed to deliver messages for any reason, the message on the queue would be duplicated. what would wipe out the body of the message
and other journal errors would happen because of this.
extra debug capabilities added into RefCountMessage as part of ARTEMIS-4206 in order to identify these issues
This fix is scanning journal and paging for existing large messages. We will remove any large messages that do not have a corresponding record in journals or paging.
there are two leaks here:
* QueueImpl::delivery might create a new iterator if a delivery happens right after a consumer was removed, and that iterator might belog to a consumer that was already closed
as a result of that, the iterator may leak messages and hold references until a reboot is done. I have seen scenarios where messages would not be dleivered because of this.
* ProtonTransaction holding references: the last transaction might hold messages in the memory longer than expected. In tests I have performed the messages were accumulating in memory. and I cleared it here.
For pipelined open cases the events processing should ignore additional begin
and attach events if the open event handler closes the connection to avoid the
processing throwing additional exceptions and replacing the error condition in
the connection with an unrelated error about NPE from the additional events.
If the client is using address prefixes to define the routing type along with
durable subscriptions then on re-attach the compairon to check if the subscription
address has changed needs to remove the prefix when comparing against the address
since the prefix isn't propagated when creating the address and will always fail
resulting in the subscription queue being deleted in error.
When an AMQP client subscribes to a new address (non-existing) with a receiver link, the
address is created with routing type ANYCAST regardles of the default address creation
configuration of the broker, and ignores even the broker wide default of MULTICAST.
I am adding an option sync=true or false on mirror. if sync, any client blocking operation will wait a roundtrip to the mirror
acting like a sync replica.
Attempt to standardize all Logger declaration to a singular variable name
which makes the code more consistent and make finding usages of loggers in
the code a bit easier.
Logger statements should use formatting syntax and let the normal framework checks take care of
checking if a logger is enabled instead of string concats and isXEnabled logger checks except
in cases there is known expense to the specifc logging message/arg preparation or passing.
Changes from myself and Robbie Gemmell.
Co-authored-by: Robbie Gemmell <robbie@apache.org>
If an AMQP consumer tries to receive a message and the broker is unable
to convert the message from core to AMQP then the consumer is
disconnected and the offending message stays in the queue. When the
consumer reconnects the conversion error will happen again resulting in
a loop that can only be resolved through administrative action (e.g.
deleting the message manually or sending it to a dead letter address).
This commit fixes that problem by detecting the conversion problem and
sending the message to the queue's dead letter address. It also doesn't
disconnect the consumer.
This commit also changes the log messages associated with sending a
message to the dead letter address since this event can now occur
regardless of the delivery attempts.