Openwire clients create consumers to advisory topics to receive
notifications. As a result there are internal queues created
on advisory topics. Those consumer shouldn't be exposed via
management APIs which are used by the Console
To fix that the broker doesn't register any queues from
advisory addresses.
Also refactors a code to remove Openwire specific contants
from AddressInfo class.
I'm doing an overal improvement on large message support for AMQP
However this commit is just about a Bug on the converter.
It will be moot after all the changes I'm making, but I would rather keep this separate
as a way to cherry-pick on previous versions eventually.
This test starts 2 servers and send messages to
a queue until it enters into paging state. Then
it changes the address max-size to -1, restarts
the 2 servers again and consumes all the messages.
It verifies that even if the max-size has changed
all the paged messages will be depaged and consumed.
No stuck messages after restarting.
The tests is there to guard a case where messages
won't be depaged on server restart after the max-size
is changed to -1. This issue has been fixed into
master along with the fix for ARTEMIS-581, particularly
the changes to the method PagingStoreImpl.getMaxSize().
Server.stop is currently waiting completions on Sessions just because of test cases.
With the recent changes made into the Executors this is not needed any longer
Instead of flushing we just need to make sure there are no more calls into
page executors as we stop the PageManager.
This will avoid any possible starvations or deadlocks here.
It fixes the NPE on server start due to:
- missing SqlProviderFactory
- missing executor factory/scheduled pool (ie using exclusive scheduled pools)
It fixes the WARNINGS due to wrong slowness detection while renewing JdbcLeaseLock.
Openwire consumer is listed twice below "consumers" tab.
First it shows correctly the requested queue consume.
Second it shows consumer from multicast queue ActiveMQ.Advisory.
The second one is internal and should be hidden.
Update Tranformer to be able to handle initiation via propertiers (map<string, string>)
Update Configuration to have more specific transfromer configuration type, and to take properties.
Support back compatibility.
Add AddHeadersTransformer which is a main use case, and can act as example also.
Update Control's to expose new property configuration
Add test cases
Update examples for new transformer config style
- it is now possible to disable the TimedBuffer
- this is increasing the default on libaio maxAIO to 4k
- The Auto Tuning on the journal will use asynchronous writes to simulate what would happen on faster disks
- If you set datasync=false on the CLI, the system will suggest mapped and disable the buffer timeout
This closes#1436
This commit superseeds #1436 since it's now disabling the timed buffer through the CLI
add the adaptTransportConfiguration() method to the
ClientProtocolManagerFactory so that transport configurations used by
the ClientProtocolManager have an opportunity to adapt their transport
configuration.
This allows the HornetQClientProtocolManagerFactory to adapt the
transport configuration received by remote HornetQ broker to replace the
HornetQ-based NettyConnectorFactory by the Artemis-based one.
JIRA: https://issues.apache.org/jira/browse/ARTEMIS-1431
By default, every openwire connection will create a queue
under the multicast address ActiveMQ.Advisory.TempQueue.
If a openwire client is create temporary queues these queues
will fill up with messages for as long as the associated
openwire connection is alive. It appears these messages
do not get consumed from the queues.
The reason behind is that advisory messages don't require
acknowledgement so the messages stay at the queue.
Added integration test, to prove issue, and assert fix.
Fix PersistentQueueBindingEncoding to return value, not false.
Fix some method arg name to align with class interface arg name
With NFSv4 it is now necessary to lock/unlock the byte of the server
lock file where the state information is written so that the
information is then flushed to the other clients looking at the file.
The regular expressions for wildcard matching now properly respects the last
delimiter in a pattern and will not match a pattern missing the
delimiter by mistake
The timeout logic is changed to use System::nanoTime, less sensible to OS clock changes.
The volatile set on CriticalMeasure are changed with cheaper lazySet.
An Openwire connection creates an internal session used to track
transaction status, it doesn't have a session callback. When
the connection is closed, the core session should check if
callback is null to avoid NPE.
Add support to update Queue config via reload using existing updateQueue method at runtime.
Add/extend unit test cases to include testing reload of queue config.
Add new error in message bundle to include queue
update security check to support taking optional queue
update code that is operating on queues to pass the queue name during check so queue name could be in the error log if security issue.
There is a leak on replication tokens in the moment when a backup is
shutdowned or killed and the ReplicationManager is stopped. If there
are some tasks (holding replication tokens) in the executor, these
tokens are simply ignored and replicationDone method isn't called on
them. Because of this, some tasks in OperationContextImpl cannot be
finished.
Instead of wait to flush an executor,
I have added a method isFlushed() which will just translate to the
state on the OrderedExecutor.
In the case another executor is provided (for tests) there's a delegate
into normal executors.
delegate to the jdk saslServer. Allow acceptor configuration of supported mechanismis; saslMechanisms=<a,b>
and allow login config scope for krb5 to be configured via saslLoginConfigScope=x
On completion of drain the response is not flushed and the
client can wait a few seconds before another broker task
flushes the work. Flush the connection after updating the
linked as being drained. Also perform the work with the
connection lock held to prevent conccurent update of proton
state.
This is replacing an executor on ServerSessionPacketHandler
by a this actor.
This is to avoid creating a new runnable per packet received.
Instead of creating new Runnable, this will use a single static runnable
and the packet will be send by a message, which will be treated by a listener.
Look at ServerSessionPacketHandler on this commit for more information on how it works.
Add krb5sslloginmodule that will populate userPrincipal that can be mapped to roles independently
Generalised callback handlers to take a connection and pull certs or peerprincipal based on
callback. This bubbled up into api change in securitystore and security manager
If replication blocked anything on the journal
the processing from clients would be blocked
and nothing would work.
As part of this fix I am using an executor on ServerSessionPacketHandler
which will also scale better as the reader from Netty would be feed immediately.
Core client with netty connector and acceptor doing kerberos
jaas.doAs around sslengine init such that the SSL handshake can do kerberos ticket
generaton and validation.
The kerberos authenticated user is then validated with the security manager before
being populated into the message userId.
The feature is enabled with the kerb5Config property. When lowercase it is the
principal. With a leading uppercase char it is the login.config entry to use.
The MAPPED journal refactoring include:
- simplified lifecycle and logic (eg fixed file size with single mmap memory region)
- supports for the TimedBuffer to coalesce msyncs (via Decorator pattern)
- TLAB pooling of direct ByteBuffer like the NIO journal
- remove of old benchmarks and benchmark dependencies
The default id-cache-size is 20000 and the default
confirmation-window-size is 1MB. It turns out the 1MB
size is too small for id-cache-size.
To fix it we adjust the confirmation-window-size to 10MB. Also
a test is added to guarantee it won't break this rule when this
default value is to be changed to any new value.
When a large message is replicated to backup, a pendingID is generated
when the large message is finished. This pendingID is generated by a
BatchingIDGenerator at backup.
It is possible that a pendingID generated at backup may be a duplicate
to an ID generated at live server.
This can cause a problem when a large message with a messageID that is
the same as another largemessage's pendingID is replicated and stored
in the backup's journal, and then a deleteRecord for the pendingID
is appended. If backup becomes live and loads the journal, it will
drop the large message add record because there is a deleteRecord of
the same ID (even though it is a pendingID of another message).
As a result the expecting client will never get this large message.
So in summary, the root cause is that the pendingIDs for large
messages are generated at backup while backup is not alive.
The solution to this is that instead of the backup generating
the pendingID, we make them all be generated in advance
at live server and let them replicated to backup whereever needed.
The ID generater at backup only works when backup becomes live
(when it is properly initialized from journal).
It fixes compatibility issues with JMS Core clients using the old address model, allowing the client to query JMS temporary queues too.
you would eventually see this issue when using older clients:
AMQ119019: Queue already exists
This method name would clash with ServiceComponent
As the real meaning here on this method is just to failover
So I've renamed the method to avoid the clash with my next commit
(I've done this on a separate commit as you may need to redo this
commit from scratch again in other branches instead of lots of clashes on cherry-pick)
When a large message is being diverted, a new copy of the original
message is created and replicated (if there is a backup) to the backup.
In LargeServerMessageImpl.copy(long) it reuse a byte array to copy
message body. It is possible that one block of date is read into
the byte array before the previous read has been replicated,
causing the replicated bytes to corrupt.
If we make a copy of the byte array before replication, the corruption
of data will be avoided.
Add extra configuration to address-settings to be able to
control / enable address/queue deletion by pattern,
rather than a global toggle.
Add support in the reload logic to remove address
and/or queues if the address matches an address setting,
where it is enabled.