This reverts commit dbb3a90fe6.
The org.apache.activemq.artemis.core.server.Queue#getRate method is for
slow-consumer detection and is designed for internal use only.
Furthermore, it's too opaque to be trusted by a remote user as it only
returns the number of message added to the queue since *the last time
it was called*. The problem here is that the user calling it doesn't
know when it was invoked last. Therefore, they could be getting the
rate of messages added for the last 5 minutes or the last 5
milliseconds. This can lead to inconsistent and misleading results.
There are three main ways for users to track rates of message
production and consumption:
1. Use a metrics plugin. This is the most feature-rich and flexible
way to track broker metrics, although it requires tools (e.g.
Prometheus) to store the metrics and display them (e.g. Grafana).
2. Invoke the getMessageCount() and getMessagesAdded() management
methods and store the returned values along with the time they were
retrieved. A time-series database is a great tool for this job. This is
exactly what tools like Prometheus do. That data can then be used to
create informative graphs, etc. using tools like Grafana. Of course, one
can skip all the tools and just do some simple math to calculate rates
based on the last time the counts were retrieved.
3. Use the broker's message counters. Message counters are the broker's
simple way of providing historical information about the queue. They
provide similar results to the previous solutions, but with less
flexibility since they only track data while the broker is up and
there's not really any good options for graphing.
When performing concurrent user admin actions (e.g. resetUser, addUser,
removeUser on ActiveMQServerControl) when using the
PropertiesLoginModule with reload=true the underlying user and role
properties files can get corrupted.
This commit fixes the issue via the following changes:
- Add synchronization to the management commands
- Add concurrency controls to underlying file access
- Change CLI user commands to use remote methods instead of modifying
the files directly. This avoids potential concurrent changes. This
change forced me to modify the names of some of the commands'
parameters to disambiguate them from connection-related parameters.
In a cluster scenario where non durable subscribers fail over to
backup while another live node forwarding messages to it,
there is a chance that the the live node keeps the old remote
binding for the subs and messages go to those
old remote bindings will result in "binding not found".
The default JAAS security manager doesn't need the address/FQQN for
authorization, but I'm putting it back into the interface because there
are other use cases which *do* need it.
Both authentication and authorization will hit the underlying security
repository (e.g. files, LDAP, etc.). For example, creating a JMS
connection and a consumer will result in 2 hits with the *same*
authentication request. This can cause unwanted (and unnecessary)
resource utilization, especially in the case of networked configuration
like LDAP.
There is already a rudimentary cache for authorization, but it is
cleared *totally* every 10 seconds by default (controlled via the
security-invalidation-interval setting), and it must be populated
initially which still results in duplicate auth requests.
This commit optimizes authentication and authorization via the following
changes:
- Replace our home-grown cache with Google Guava's cache. This provides
simple caching with both time-based and size-based LRU eviction. See more
at https://github.com/google/guava/wiki/CachesExplained. I also thought
about using Caffeine, but we already have a dependency on Guava and the
cache implementions look to be negligibly different for this use-case.
- Add caching for authentication. Both successful and unsuccessful
authentication attempts will be cached to spare the underlying security
repository as much as possible. Authenticated Subjects will be cached
and re-used whenever possible.
- Authorization will used Subjects cached during authentication. If the
required Subject is not in the cache it will be fetched from the
underlying security repo.
- Caching can be disabled by setting the security-invalidation-interval
to 0.
- Cache sizes are configurable.
- Management operations exist to inspect cache sizes at runtime.
This is allowing journal appends to happen in burst
during replication, by batching replication response
into the network at the end of the append burst.
I couldn't reproduce this with a test, but static code analysis led me
to this solution which is similar to the fix done for ARTEMIS-2592 via
e397a17796.
Due to a lack of concurrency protections it's possible to create a
consumer on a closed session. I've not been able to reproduce this with
a test, but I've seen it in the wild. Static code analysis points to a
need for better concurrency controls around closing the session and
creating consumers.
This is almost a NO-JIRA, but since I'm fixing ManagementServiceImplTest.testGetResources broken after this
I'm commiting this change associated with ARTEMIS-2828
Since getDiskStoreUsage on the ActiveMQServerControl is converting a
double to a long the value is always 0 in the management API. It should
return a double instead.
Adding this metric required moving the meter registration code from the
AddressInfo class to the ManagementService in order to get clean access
to both the AddressInfo and AddressControl classes.
The calculation used by
ActiveMQServerControlImpl.getDiskStoreUsagePercentage() is incorrect. It
uses disk space info with global-max-size which is for address memory.
Also, the existing getDiskStoreUsage() method *already* returns a
percentage of total disk store usage so this method seems redundant.
Now it is possible to reset queue parameters to their defaults by removing them
from broker.xml and redeploying the configuration.
Originally this PR covered the "filter" parameter only.
ORIG message propertes like _AMQ_ORIG_ADDRESS are added to messages
during various broker operations (e.g. diverting a message, expiring a
message, etc.). However, if multiple operations try to set these
properties on the same message (e.g. administratively moving a message
which eventually gets sent to a dead-letter address) then important
details can be lost. This is particularly problematic when using
auto-created dead-letter or expiry resources which use filters based on
_AMQ_ORIG_ADDRESS and can lead to message loss.
This commit simply over-writes the existing ORIG properties rather than
preserving them so that the most recent information is available.
- when sending messages to DLQ or Expiry we now use x-opt legal names
- we now support filtering thorugh annotations if using m. as a prefix.
- enabling hyphenated_props: to allow m. as a prefix
DivertBindings are now properly cleaned up when a queue binding is
removed that matches the divert. The correct key is now used to remove
the queue address from the set and the correct address is now used to
remove the remote consumer.
This one should improve eventual failures on ClusterConnectionControlTest
to validate this, run ClusterConnectionControlTest::testNotifications in a loop
you will see eventually the test taking longer to shutdown the executor as the call could be blocked.
This won't be an issue on a real server (Production system)
however, on the testsuite or while embedded this could cause issues,
if a same instance is stopped then started.
This is the reason why BackupAuthenticationTest was intermittently failing.
I also adapted the test since I would need to stop the server and reactivate it in order to change the configuration.
The previous test wasn't acting like a real server.
Currently when the broker hits the common 'Address already in use' issue
when starting the process terminates with a vague exception. The user is
left to guess which acceptor actually failed. If the broker has lots of
acceptors it is a tedious process to identify the problematic
configuration. This commit adds details to the exception message about
which acceptor failed and which host:port it was attempting to bind to.
This commit does the following:
- Deprecates existing overloaded createQueue, createSharedQueue,
createTemporaryQueue, & updateQueue methods for ClientSession,
ServerSession, ActiveMQServer, & ActiveMQServerControl where
applicable.
- Deprecates QueueAttributes, QueueConfig, & CoreQueueConfiguration.
- Deprecates existing overloaded constructors for QueueImpl.
- Implements QueueConfiguration with JavaDoc to be the single,
centralized configuration object for both client-side and broker-side
queue creation including methods to convert to & from JSON for use in
the management API.
- Implements new createQueue, createSharedQueue & updateQueue methods
with JavaDoc for ClientSession, ServerSession, ActiveMQServer, &
ActiveMQServerControl as well as a new constructor for QueueImpl all
using the new QueueConfiguration object.
- Changes all internal broker code to use the new methods.
Due to the changes in 6b5fff40cb the
config parameter message-expiry-thread-priority is no longer needed. The
code now uses a ScheduledExecutorService and a thread pool rather than
dedicating a thread 100% to the expiry scanner. The pool's size can be
controlled via scheduled-thread-pool-max-size.
This appears to have been added to the code-base by mistake over 10
years ago. It seems related to debugging and I can't see anywhere where
it is actually used so I'm removing it.
Using a property on AMQPLargeMessage instead of a ThreadLocal
This was causing issues on the journal as the message may transverse different threads on the journal.
There is no guarantee that the encodeSize size is the same in AMQP right after read.
As the protocol may add additional bytes right after decoded such as header, extra properties.. etc.
This is a Large commit where I am refactoring largeMessage Body out of CoreMessage
which is now reused with AMQP.
I had also to fix Reference Counting to fix how Large Messages are Acked
And I also had to make sure Large Messages are transversing correctly when in cluster.
- Avoid some Properties Decoding, checking if we need certain properties like scheduled delivery
- Avoid creating some unnecessary SimpleString instances
- Removed some intermediate ActiveMQBuffer allocation
- Removed some intermediate UnreleasableByteBuf allocation
Adding serialVersionUID to WildcardConfiguration since it is
serializable. Without serialVersionUID being specified a new one is
generated on each compilation which prevents object deserialization
between releases.
This is to make it possible to identify what test is leaking files whenever that is happening.
That is because future tests will report the leaks, and it's difficult to identify where it happened.
Also i'm changing NoProcessFilesBehind to show the getOpenFD propertly
Large messages can be split up using Websocket Continuation Frames.
This allows for much smaller buffer sizes to send or receive
potentially very large messages.
There is an optimization in AMQP, that properties are only parsed over demand.
It happens that after ARTEMIS-2294 (commit 2dd0671698),
every send would request for the property on the message, resulting the properties to always be parsed upon send.
Even when there's no use of application properties.
This is a surprisingly large change just to fix some log messages, but
the changes were necessary in order to get the relevant data to where it
was being logged. The fact that the data wasn't readily available is
probably why it wasn't logged in the first place.
Removed unused private constant FLUSH_TIMEOUT.
Solved some compiler warnings (missing generic type, private class can
be final). Fixed a performance related sevntu checkstyle warning
"Variable 'xyz' can be moved inside the block at line '2,864' to
restrict runtime creation.".
Add a paged message to the tail, when the QueueIterateAction doesn't handle it, to avoid removing unhandled paged message. Move the refRemoved calls from the QueueIterateActions to the iterQueue to fix the queue stats.
If a broker loses its file lock on the journal and doesn't notice (e.g.
network connection failure to an NFS mount) then it can continue to run
after its backup activates resulting in split-brain.
This commit implements periodic journal lock evaluation so that if a live
server loses its lock it will automatically restart itself.
The PageSubscriptionImpl.cleanupEntries could be locked by the queue
depage because they are executed with the same executor and the depage
could be locked by the iterQueue.
If PageSubscriptionImpl.cleanupEntries is locked, no one clean up the
JournalRecord and PagePositionImpl instances created during iterQueue.
So removing all messages from a huge queue, causes the retention of too
JournalRecord and PagePositionImpl instances until an OOM.
To avoid to lock the PageSubscriptionImpl.cleanupEntries the depage is
executed only if the queue isn't iterating.
This commit introduces the ability to configure a downstream connection
for federation. This works by sending information to the remote broker
and that broker will parse the message and create a new upstream back
to the original broker.
Add the config parameter `page-sync-timeout` to set a customized value,
because if the broker is configured to use ASYNCIO journal, the timeout
has the same value of NIO default journal buffer timeout ie 3333333.
The iterQueue transaction commits are locked by the synchronization
context. So removing all messages from a huge queue causes the creation
of too locked transactions for the paged messages and so OOM.
The iteration on paged message is executed out the iterQueue
synchronization context to avoid to lock the transaction commits.
Active Directory servers are unable to handle referrals automatically.
This causes a PartialResultException to be thrown if a referral is
encountered beneath the base search DN, even if the LDAPLoginModule is
set to ignore referrals.
This option may be set to 'true' to ignore these exceptions, allowing
login to proceed with the query results received before the exception
was encountered.
Note: there are no tests for this change as I could not reproduce the
issue with the ApacheDS test server. The issue is specific to directory
servers that don't support the ManageDsaIT control such as Active
Directory.
A new feature to preserve messages sent to an address for queues that will be
created on the address in the future. This is essentially equivalent to the
"retroactive consumer" feature from 5.x. However, it's implemented in a way
that fits with the address model of Artemis.
Improve wildcard support for the key attribute in the roles access
match element and whitelist entry element, allowing prefix match for
the mBean properties.
In LargeMessageImpl.copy(long) it need to open the underlying
file in order to read and copy bytes into the new copied message.
However there is a chance that another thread can come in and close
the file in the middle, making the copy failed
with "channel is null" error.
This is happening in cases where a large message is sent to a jms
topic (multicast address). During delivery it to multiple
subscribers, some consumer is doing delivery and closed the
underlying file after. Some other consumer is rolling back
the messages and eventually move it to DLQ (which will call
the above copy method). So there is a chance this bug being hit on.
The crititical analyser trigger the broker shutdown if try to
removeAllMessages with a huge queue. The iterQueue is split so as
not to keep the lock too time.
The LocalMonitor tick log is very useful to establish a "heartbeat" log
statement. It is moved into its own logger from PagingManager logger,
which is too verbose to leave activated indefinitely in production.
After a node is scaled down to a target node, the sf queue in the
target node is not deleted.
Normally this is fine because may be reused when the scaled down
node is back up.
However in cloud environment many drainer pods can be created and
then shutdown in order to drain the messages to a live node (pod).
Each drainer pod will have a different node-id. Over time the sf
queues in the target broker node grows and those sf queues are
no longer reused.
Although use can use management API/console to manually delete
them, it would be nice to have an option to automatically delete
those sf queue/address resources after scale down.
In this PR it added a boolean configuration parameter called
cleanup-sf-queue to scale down policy so that if the parameter
is "true" the broker will send a message to the
target broker signalling that the SF queue is no longer
needed and should be deleted.
If the parameter is not defined (default) or is "false"
the scale down won't remove the sf queue.
By default, such a cancelled task is not automatically removed from the
work queue until its delay elapses. It may cause unbounded retention of
cancelled tasks. To avoid this, set remove on cancel policy to true.
The core server session tracks details about producers like what
addresses have had messages sent to them, the most recent message ID
sent to each address, and the number of messages sent to each address.
This information is made available to users via the
listProducersInfoAsJSON method on the various management interfaces
(JMX, web console, etc.). However, in situations where a server session
is long lived (e.g. in a pool) and is used to send to many different
addresses (e.g. randomly named temporary JMS queues) this info can
accumulate to a problematic degree. Therefore, we should limit the
amount of producer details saved by the session.
Certain devices or file systems won't support record level locking.
For that reason I am changing FileLockNodeManager to use separate files (one for each position) instead of using tryLock(position);
A good example for this would be cephFS where channel.tryLock or channel.tryLock works but it fails at a record level.
Wait netty event loop group shutdown to avoid too many opened FDs after
server stops, when netty configuration is used. Clear server
activateCallbacks to avoid reactivation of previous nodeManager and
consequent FD leaks on restart. Fix LargeServerMessageImpl.copy to avoid
FD leaks when a large message expiry or it is sent to DLA. Terminate
HawtDispatcher global queue to avoid pipes and eventpolls leaks after a
MQTT test.
cherry-picking commit 9617058ba0649af4eea15ce8793f86de827c4b7f
NO-JIRA adding check for open FD on the testsuite
cherry-picking commit 0facb7ddf4d3baa14a3add4290684aff7fd46053
NO-JIRA addressing connections leaks on integration tests
If a jms client (be it openwire, amqp, or core jms) receives a message that
is from a different protocol, the JMSMessageID maybe null when the
jms client expects it.
Add max record size check before adding a record to prevent that the
broker shuts down, when there is one really large header sent with the
message. Add message size check before allocating large message resource
if it can't be stored.
AbstractJournalStorageManager::performCachedLargeMessageDeletes
must enforce acquisition of manager write lock (as documented)
to avoid unlucky racing calls of stopReplication while stopping
to deadlock.
FileConfigurationParserTest was creating a data folder.
This is simply disabling persistence from the configuration used by the server on this test as it is not needed.
There are a few issues with prefixing and compatibility.
This is basically an issue when integrated with Wildfly or any other case
where prefix is activated
and playing with older versions.
Page::read is allocating a new ChannelBufferWrapper on each
paged message read: to reduce the allocation rate, it could be
reused until a new wrapped ByteBuffer is created
This test has been failing as part of the main testsuite
and it should really be a smoke test as it is using a real test.
so, I'm moving it as smoke-test
This is fixing these tests:
- org.apache.activemq.artemis.tests.integration.paging.PagingOrderTest#testPagingOverCreatedDestinationQueues
- org.apache.activemq.artemis.tests.integration.paging.PagingOrderTest#testPagingOverCreatedDestinationTopics
No additional tests are needed as this change is covereted by the current testsuite
Historically the broker has read the XML configuration file as a String,
substituted system properties, and then parsed that String into an XML
document. However, this method won't substitute system properties in the
files which are imported via xinclude. In order to substitue system
properties in xincluded files the substitution needs to be performed
after the file is parsed into an XML document. This commit implements
that change and refactors the XMLUtil class a bit to eliminate redundant
code, obsolete comments, etc.
When auto-creation is off then older clients consuming messages from an
FQQN won't work. This commit fixes that problem and adds a compatibility
test to verify.
The changes from ARTEMIS-2189 mean that
o.a.a.a.c.s.i.ServerSessionImpl#deleteQueue
is no longer called from the same ServerSessionImpl instance that
created it which means that TempQueueCleanerUpper instances will leak.
To resolve the leak the client will only create a new session when
necessary instead of every time delete() is invoked.
Implement using the ActiveMQ5 JMSXGroupFirstForConsumer, property as default, but make it possible for future to make it configurable easily. (Not this PR)
Add test
In adding auto-delete queue level feature, its been noticed as some feature bits were added during hot fix branch, that there's api break with the 2.6.x hotfix branch.
This addresses that by fixing this in 2.7.x
LocalMonitor::under on PagingManagerImpl won't log anymore with a
warning message if the producers got unblocked and with info
if disk it getting freed
Performing direct deliveries of management messages could enter
a code path on QueueImpl::addTail with a NULL pageIterator: performing
a null check will avoid it to throw NPE.
The Audit log allows user to log some important actions,
such as ones performed via management APIs or clients,
like queue management, sending messages, etc.
The log tries to record who (the user if any) doing what
(like deleting a queue) with arguments (if any) and timestamps.
By default the audit log is disabled. Through configuration can
be easily turned on.
Multiple consumers using the same clientId in the cluster, the last consumer connection should close the previous consumer connection!
ARTEMIS-2226 last consumer connection should close the previous consumer connection
to address apache-rat-plugin:0.12:check
ARTEMIS-2226 last consumer connection should close the previous consumer connection
to address checkstyle
ARTEMIS-2226 last consumer connection should close the previous consumer connection
adjust the code structure
ARTEMIS-2226 last consumer connection should close the previous consumer connection
adjust the code structure
ARTEMIS-2226 last consumer connection should close the previous consumer connection
adjust the code structure
ARTEMIS-2226 last consumer connection should close the previous consumer connection
adjust the code structure
ARTEMIS-2226 last consumer connection should close the previous consumer connection
adjust the code structure
ARTEMIS-2226 last consumer connection should close the previous consumer connection
add javadoc
Direct and async deliveries lock QueueImpl::this and
ServerConsumerImpl::this in different order causing deadlock:
has been introduced a deliverLock to prevent both type of delivers
to concurrently happen, making irrelevant the lock ordering.
Added test reproducer and changed Queue::isDurableMessage usages into
Queue::isDurable to allow acks to hit the journal and being
correctly replicated across nodes.
Add ability to configure when creating auto created queues at the queue level
Add support for configuring message count check
Add test cases
Update docs
Support using group buckets on a queue for better local group scaling
Support disabling message groups on a queue
Support rebalancing groups when a consumer is added.
This is simply fixing the example under examples/features/standard/divert
Other tests are passing.
No additional tests are needed as the example on this case acts like a test.
Push isDirectDeliver method from netty impl, to the Connection interface
Add support to InVMConnection for isDirectDeliver flag and ability to set via config, defaulting to false, to keep current default behavior.
Extend DirectDeliverTest to check InVM as well.
Any checkProperties();<usage of this.properties> pattern has been
replaced by an atomic checkProperties().<usage of returned properties>
to help both performance and consistency.
The cleanup is now performed into CoreTypedProperties both
for performance reasons (avoid lock/unlock many times)
and consistency, given that the operation is now atomic.
In Page.write(final PagedMessage message) if the page file is closed
it returns silently. The caller has no way to know that if the message
is paged to file or not. It should throw an exception so that the
caller can handle it correctly.
This causes random failure PagingTest#testExpireLargeMessageOnPaging().
The test shows that when the server stops it closes the page file.
In the mean time a message is expired to the expiry queue and if
the expiry queue is in paging mode, it goes to Page.write() and
returns without any error. The result is that the message is removed
from the original queue and not added to the expiry queue.
If we throw exception here it makes the expiration failed, the message
will not be removed from the orginal queue. Next time broker is started,
the message will be reloaded and expired again. no message lost.
Add consumer priority support
Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs
There's a *slight* semantic change with the behavior of the queue query
and binding query to make them consistent with the address query, namely
that they will return the name of the queue and the name of the address
in every case and the returned names will be not use the FQQN syntax but
will be parsed to reflect their actual names in the broker.
There were two different but nearly identical implementations of
createQueue(). I consolidated these into a single method. There should
be no semantic differences.
When trying to get the bindings for an address the getBindingsForAddress
method will create a Bindings instance if there are no bindings for the
address. This is unnecessary in most circumstances so use the
lookupBindingsForAddress method instead and check for null.