ARTEMIS-358 topic mistakenly removed with sub
The problem here is that the management notification listener was mistakenly removing the topic itself instead of just the non-durable subscription. In general I can't see why StompProtocolManager even needs to keep track of the destinations when the broker already does that. As far as I can tell it is redundant and it's clearly error-prone. Therefore I'm removing the destination tracking from StompProtocolManager altogether.
This commit is contained in:
parent
0f9c379e99
commit
ddc95a0f28
|
@ -22,7 +22,6 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
|
@ -31,20 +30,13 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
|||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.postoffice.BindingType;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.server.management.NotificationListener;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||
|
@ -53,8 +45,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
|||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||
import org.apache.activemq.artemis.utils.TypedProperties;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
||||
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
|
||||
|
@ -62,7 +52,7 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
|
|||
/**
|
||||
* StompProtocolManager
|
||||
*/
|
||||
class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, NotificationListener {
|
||||
class StompProtocolManager implements ProtocolManager<StompFrameInterceptor> {
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
@ -78,8 +68,6 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
|
|||
// key => connection ID, value => Stomp session
|
||||
private final Map<Object, StompSession> sessions = new HashMap<>();
|
||||
|
||||
private final Set<String> destinations = new ConcurrentHashSet<>();
|
||||
|
||||
private final List<StompFrameInterceptor> incomingInterceptors;
|
||||
private final List<StompFrameInterceptor> outgoingInterceptors;
|
||||
|
||||
|
@ -94,12 +82,6 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
|
|||
this.factory = factory;
|
||||
this.server = server;
|
||||
this.executor = server.getExecutorFactory().getExecutor();
|
||||
ManagementService service = server.getManagementService();
|
||||
if (service != null) {
|
||||
//allow management message to pass
|
||||
destinations.add(service.getManagementAddress().toString());
|
||||
service.addNotificationListener(this);
|
||||
}
|
||||
this.incomingInterceptors = incomingInterceptors;
|
||||
this.outgoingInterceptors = outgoingInterceptors;
|
||||
}
|
||||
|
@ -422,45 +404,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
|
|||
}
|
||||
|
||||
public boolean destinationExists(String destination) {
|
||||
return destinations.contains(destination);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNotification(Notification notification) {
|
||||
if (!(notification.getType() instanceof CoreNotificationType))
|
||||
return;
|
||||
|
||||
CoreNotificationType type = (CoreNotificationType) notification.getType();
|
||||
|
||||
TypedProperties props = notification.getProperties();
|
||||
|
||||
switch (type) {
|
||||
case BINDING_ADDED: {
|
||||
if (!props.containsProperty(ManagementHelper.HDR_BINDING_TYPE)) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.bindingTypeNotSpecified();
|
||||
}
|
||||
|
||||
Integer bindingType = props.getIntProperty(ManagementHelper.HDR_BINDING_TYPE);
|
||||
|
||||
if (bindingType == BindingType.DIVERT_INDEX) {
|
||||
return;
|
||||
}
|
||||
|
||||
SimpleString address = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
|
||||
|
||||
destinations.add(address.toString());
|
||||
|
||||
break;
|
||||
}
|
||||
case BINDING_REMOVED: {
|
||||
SimpleString address = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
|
||||
destinations.remove(address.toString());
|
||||
break;
|
||||
}
|
||||
default:
|
||||
//ignore all others
|
||||
break;
|
||||
}
|
||||
return server.getPostOffice().getAddresses().contains(SimpleString.toSimpleString(destination));
|
||||
}
|
||||
|
||||
public ActiveMQServer getServer() {
|
||||
|
|
Loading…
Reference in New Issue