ARTEMIS-2313 Accumulation in HierarchicalObjectRepository cache

This commit is contained in:
Justin Bertram 2019-04-18 13:34:01 -05:00 committed by Clebert Suconic
parent 8477dd95ee
commit f155838626
4 changed files with 146 additions and 107 deletions

View File

@ -725,61 +725,64 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
server.callBrokerBindingPlugins(plugin -> plugin.beforeRemoveBinding(uniqueName, tx, deleteData));
}
addressSettingsRepository.clearCache();
try {
Binding binding = addressManager.removeBinding(uniqueName, tx);
Binding binding = addressManager.removeBinding(uniqueName, tx);
if (binding == null) {
throw new ActiveMQNonExistentQueueException();
}
if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) {
pagingManager.deletePageStore(binding.getAddress());
deleteDuplicateCache(binding.getAddress());
}
if (binding.getType() == BindingType.LOCAL_QUEUE) {
Queue queue = (Queue) binding.getBindable();
managementService.unregisterQueue(uniqueName, binding.getAddress(), queue.getRoutingType());
} else if (binding.getType() == BindingType.DIVERT) {
managementService.unregisterDivert(uniqueName, binding.getAddress());
}
AddressInfo addressInfo = getAddressInfo(binding.getAddress());
if (addressInfo != null) {
addressInfo.setBindingRemovedTimestamp(System.currentTimeMillis());
}
if (binding.getType() != BindingType.DIVERT) {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
props.putLongProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
if (binding.getFilter() == null) {
props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, null);
} else {
props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, binding.getFilter().getFilterString());
if (binding == null) {
throw new ActiveMQNonExistentQueueException();
}
managementService.sendNotification(new Notification(null, CoreNotificationType.BINDING_REMOVED, props));
if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) {
pagingManager.deletePageStore(binding.getAddress());
deleteDuplicateCache(binding.getAddress());
}
if (binding.getType() == BindingType.LOCAL_QUEUE) {
Queue queue = (Queue) binding.getBindable();
managementService.unregisterQueue(uniqueName, binding.getAddress(), queue.getRoutingType());
} else if (binding.getType() == BindingType.DIVERT) {
managementService.unregisterDivert(uniqueName, binding.getAddress());
}
AddressInfo addressInfo = getAddressInfo(binding.getAddress());
if (addressInfo != null) {
addressInfo.setBindingRemovedTimestamp(System.currentTimeMillis());
}
if (binding.getType() != BindingType.DIVERT) {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
props.putLongProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
if (binding.getFilter() == null) {
props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, null);
} else {
props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, binding.getFilter().getFilterString());
}
managementService.sendNotification(new Notification(null, CoreNotificationType.BINDING_REMOVED, props));
}
binding.close();
if (server.hasBrokerBindingPlugins()) {
server.callBrokerBindingPlugins(plugin -> plugin.afterRemoveBinding(binding, tx, deleteData));
}
return binding;
} finally {
server.clearAddressCache();
}
binding.close();
if (server.hasBrokerBindingPlugins()) {
server.callBrokerBindingPlugins(plugin -> plugin.afterRemoveBinding(binding, tx, deleteData) );
}
return binding;
}
private void deleteDuplicateCache(SimpleString address) throws Exception {

View File

@ -339,6 +339,8 @@ public interface ActiveMQServer extends ServiceComponent {
PostOffice getPostOffice();
void clearAddressCache();
QueueFactory getQueueFactory();
ResourceManager getResourceManager();

View File

@ -2075,65 +2075,69 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return;
}
try {
Binding binding = postOffice.getBinding(queueName);
if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
}
SimpleString address = binding.getAddress();
Queue queue = (Queue) binding.getBindable();
if (hasBrokerQueuePlugins()) {
callBrokerQueuePlugins(plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount, removeConsumers, autoDeleteAddress));
}
if (session != null) {
if (queue.isDurable()) {
// make sure the user has privileges to delete this queue
securityStore.check(address, queueName, CheckType.DELETE_DURABLE_QUEUE, session);
} else {
securityStore.check(address, queueName, CheckType.DELETE_NON_DURABLE_QUEUE, session);
}
}
// This check is only valid if checkConsumerCount == true
if (checkConsumerCount && queue.getConsumerCount() != 0) {
throw ActiveMQMessageBundle.BUNDLE.cannotDeleteQueueWithConsumers(queue.getName(), queueName, binding.getClass().getName());
}
// This check is only valid if checkMessageCount == true
if (checkMessageCount && queue.getAutoDeleteMessageCount() != -1) {
long messageCount = queue.getMessageCount();
if (queue.getMessageCount() > queue.getAutoDeleteMessageCount()) {
throw ActiveMQMessageBundle.BUNDLE.cannotDeleteQueueWithMessages(queue.getName(), queueName, messageCount);
}
}
queue.deleteQueue(removeConsumers);
if (hasBrokerQueuePlugins()) {
callBrokerQueuePlugins(plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, removeConsumers, autoDeleteAddress));
}
AddressInfo addressInfo = getAddressInfo(address);
if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address.toString()) && addressSettingsRepository.getMatch(address.toString()).getAutoDeleteAddressesDelay() == 0) {
try {
removeAddressInfo(address, session);
} catch (ActiveMQDeleteAddressException e) {
// Could be thrown if the address has bindings or is not deletable.
}
}
callPostQueueDeletionCallbacks(address, queueName);
} finally {
clearAddressCache();
}
}
@Override
public void clearAddressCache() {
securityRepository.clearCache();
addressSettingsRepository.clearCache();
Binding binding = postOffice.getBinding(queueName);
if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
}
SimpleString address = binding.getAddress();
Queue queue = (Queue) binding.getBindable();
if (hasBrokerQueuePlugins()) {
callBrokerQueuePlugins(plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount,
removeConsumers, autoDeleteAddress));
}
if (session != null) {
if (queue.isDurable()) {
// make sure the user has privileges to delete this queue
securityStore.check(address, queueName, CheckType.DELETE_DURABLE_QUEUE, session);
} else {
securityStore.check(address, queueName, CheckType.DELETE_NON_DURABLE_QUEUE, session);
}
}
// This check is only valid if checkConsumerCount == true
if (checkConsumerCount && queue.getConsumerCount() != 0) {
throw ActiveMQMessageBundle.BUNDLE.cannotDeleteQueueWithConsumers(queue.getName(), queueName, binding.getClass().getName());
}
// This check is only valid if checkMessageCount == true
if (checkMessageCount && queue.getAutoDeleteMessageCount() != -1) {
long messageCount = queue.getMessageCount();
if (queue.getMessageCount() > queue.getAutoDeleteMessageCount()) {
throw ActiveMQMessageBundle.BUNDLE.cannotDeleteQueueWithMessages(queue.getName(), queueName, messageCount);
}
}
queue.deleteQueue(removeConsumers);
if (hasBrokerQueuePlugins()) {
callBrokerQueuePlugins(plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
removeConsumers, autoDeleteAddress));
}
AddressInfo addressInfo = getAddressInfo(address);
if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated() && !isAddressBound(address.toString()) && addressSettingsRepository.getMatch(address.toString()).getAutoDeleteAddressesDelay() == 0) {
try {
removeAddressInfo(address, session);
} catch (ActiveMQDeleteAddressException e) {
// Could be thrown if the address has bindings or is not deletable.
}
}
callPostQueueDeletionCallbacks(address, queueName);
}
@Override

View File

@ -24,12 +24,16 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Before;
import org.junit.Test;
@ -222,4 +226,30 @@ public class TemporaryDestinationTest extends JMSTestBase {
}
}
}
@Test
public void testForSecurityCacheLeak() throws Exception {
server.getSecurityStore().setSecurityEnabled(true);
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
securityManager.getConfiguration().addUser("IDo", "Exist");
securityManager.getConfiguration().addRole("IDo", "myrole");
Role myRole = new Role("myrole", true, true, true, true, true, true, true, true, true, true);
Set<Role> anySet = new HashSet<>();
anySet.add(myRole);
server.getSecurityRepository().addMatch("#", anySet);
try {
conn = addConnection(cf.createConnection("IDo", "Exist"));
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 10; i++) {
TemporaryQueue temporaryQueue = s.createTemporaryQueue();
temporaryQueue.delete();
}
assertEquals(0, server.getSecurityRepository().getCacheSize());
} finally {
if (conn != null) {
conn.close();
}
}
}
}