ARTEMIS-463 Refactoring on Openwire

https://issues.apache.org/jira/browse/ARTEMIS-463

This was a team effort from Clebert Suconic and Howard Gao
This commit is contained in:
Clebert Suconic 2016-04-01 16:29:55 -04:00 committed by jbertram
parent 2e66673048
commit 6ddf486f8f
52 changed files with 1841 additions and 2611 deletions

View File

@ -100,6 +100,10 @@ public class NettyConnection implements Connection {
}
// Connection implementation ----------------------------
@Override
public void setAutoRead(boolean autoRead) {
channel.config().setAutoRead(autoRead);
}
@Override
public synchronized boolean isWritable(ReadyListener callback) {

View File

@ -104,6 +104,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return transportConnection.getID();
}
public String getLocalAddress() {
return transportConnection.getLocalAddress();
}
@Override
public String getRemoteAddress() {
return transportConnection.getRemoteAddress();

View File

@ -43,6 +43,12 @@ public interface Connection {
void fireReady(boolean ready);
/**
* This will disable reading from the channel.
* This is basically the same as blocking the reading.
* */
void setAutoRead(boolean autoRead);
/**
* returns the unique id of this wire.
*

View File

@ -1047,28 +1047,32 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
private boolean internalCreateQueue(final String queueName,
final String selectorString,
final boolean durable) throws Exception {
if (queues.get(queueName) != null) {
return false;
}
else {
ActiveMQQueue activeMQQueue = ActiveMQDestination.createQueue(queueName);
// Convert from JMS selector to core filter
String coreFilterString = null;
if (selectorString != null) {
coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
// TODO: there was an openwire test failng because of this
// is this really needed for FailoverClusterTest ?
synchronized (queues) {
if (queues.get(queueName) != null) {
return false;
}
else {
ActiveMQQueue activeMQQueue = ActiveMQDestination.createQueue(queueName);
Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false);
// Convert from JMS selector to core filter
String coreFilterString = null;
queues.put(queueName, activeMQQueue);
if (selectorString != null) {
coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
}
this.recoverregistryBindings(queueName, PersistedType.Queue);
Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false);
jmsManagementService.registerQueue(activeMQQueue, queue);
queues.put(queueName, activeMQQueue);
return true;
this.recoverregistryBindings(queueName, PersistedType.Queue);
jmsManagementService.registerQueue(activeMQQueue, queue);
return true;
}
}
}

View File

@ -91,6 +91,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
((ServerConsumer) consumer).receiveCredits(-1);
}
@Override
public void browserFinished(ServerConsumer consumer) {
}
@Override
public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {

View File

@ -82,6 +82,11 @@ public class MQTTSessionCallback implements SessionCallback {
}
@Override
public void browserFinished(ServerConsumer consumer) {
}
@Override
public boolean hasCredits(ServerConsumer consumerID) {
return true;

View File

@ -28,22 +28,10 @@ public class AMQTransactionImpl extends TransactionImpl {
private boolean rollbackForClose = false;
public AMQTransactionImpl(StorageManager storageManager, int timeoutSeconds) {
super(storageManager, timeoutSeconds);
}
public AMQTransactionImpl(StorageManager storageManager) {
super(storageManager);
}
public AMQTransactionImpl(Xid xid, StorageManager storageManager, int timeoutSeconds) {
super(xid, storageManager, timeoutSeconds);
}
public AMQTransactionImpl(long id, Xid xid, StorageManager storageManager) {
super(id, xid, storageManager);
}
@Override
public RefsOperation createRefsOperation(Queue queue) {
return new AMQrefsOperation(queue, storageManager);
@ -55,6 +43,8 @@ public class AMQTransactionImpl extends TransactionImpl {
super(queue, storageManager);
}
// This is because the Rollbacks happen through the consumer, not through the server's
@Override
public void afterRollback(Transaction tx) {
if (rollbackForClose) {

View File

@ -96,10 +96,11 @@ public class OpenWireMessageConverter implements MessageConverter {
private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
@Override
public ServerMessage inbound(Object message) {
// TODO: implement this
return null;
private final WireFormat marshaller;
public OpenWireMessageConverter(WireFormat marshaller) {
this.marshaller = marshaller;
}
@Override
@ -108,10 +109,13 @@ public class OpenWireMessageConverter implements MessageConverter {
return null;
}
//convert an ActiveMQ Artemis message to coreMessage
public static void toCoreMessage(ServerMessageImpl coreMessage,
Message messageSend,
WireFormat marshaller) throws IOException {
@Override
public ServerMessage inbound(Object message) throws Exception {
Message messageSend = (Message)message;
ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize());
String type = messageSend.getType();
if (type != null) {
coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
@ -391,6 +395,15 @@ public class OpenWireMessageConverter implements MessageConverter {
coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
}
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
ActiveMQDestination origDest = messageSend.getOriginalDestination();
if (origDest != null) {
ByteSequence origDestBytes = marshaller.marshal(origDest);
origDestBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
}
return coreMessage;
}
private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) {
@ -430,7 +443,7 @@ public class OpenWireMessageConverter implements MessageConverter {
public static MessageDispatch createMessageDispatch(ServerMessage message,
int deliveryCount,
AMQConsumer consumer) throws IOException, JMSException {
ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination());
ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
MessageDispatch md = new MessageDispatch();
md.setConsumerId(consumer.getId());

View File

@ -17,16 +17,13 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.InvalidClientIDException;
import java.util.ArrayList;
import java.util.Collections;
import javax.transaction.xa.XAException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
@ -37,24 +34,16 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@ -64,40 +53,31 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;
public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener {
public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
private static final IdGenerator ID_GENERATOR = new IdGenerator();
@ -109,32 +89,36 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
private OpenWireFormatFactory wireFactory;
private boolean tightEncodingEnabled = true;
private boolean prefixPacketSize = true;
private BrokerId brokerId;
protected final ProducerId advisoryProducerId = new ProducerId();
// from broker
protected final Map<ConnectionId, OpenWireConnection> brokerConnectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, OpenWireConnection>());
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<>();
private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<>();
// TODO-NOW: this can probably go away
private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>();
private String brokerName;
private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
// Clebert: Artemis already has a Resource Manager. Need to remove this..
// The TransactionID extends XATransactionID, so all we need is to convert the XID here
private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<>();
private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
private final Map<String, TopologyMember> topologyMap = new ConcurrentHashMap<>();
private final LinkedList<TopologyMember> members = new LinkedList<>();
private final ScheduledExecutorService scheduledPool;
//bean properties
//http://activemq.apache.org/failover-transport-reference.html
private boolean rebalanceClusterClients = false;
private boolean updateClusterClients = false;
private boolean updateClusterClientsOnRemove = false;
private final OpenWireMessageConverter messageConverter;
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory;
this.server = server;
@ -142,12 +126,82 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
// preferred prop, should be done via config
wireFactory.setCacheEnabled(false);
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
ManagementService service = server.getManagementService();
scheduledPool = server.getScheduledPool();
if (service != null) {
service.addNotificationListener(this);
this.messageConverter = new OpenWireMessageConverter(wireFactory.createWireFormat());
final ClusterManager clusterManager = this.server.getClusterManager();
// TODO-NOW: use a property name for the cluster connection
ClusterConnection cc = clusterManager.getDefaultConnection(null);
if (cc != null) {
cc.addClusterTopologyListener(this);
}
}
public OpenWireFormat getNewWireFormat() {
return (OpenWireFormat)wireFactory.createWireFormat();
}
@Override
public void nodeUP(TopologyMember member, boolean last) {
if (topologyMap.put(member.getNodeId(), member) == null) {
updateClientClusterInfo();
}
}
public void nodeDown(long eventUID, String nodeID) {
if (topologyMap.remove(nodeID) != null) {
updateClientClusterInfo();
}
}
public void removeConnection(ConnectionInfo info,
Throwable error) throws InvalidClientIDException {
synchronized (clientIdSet) {
String clientId = info.getClientId();
if (clientId != null) {
AMQConnectionContext context = this.clientIdSet.get(clientId);
if (context != null && context.decRefCount() == 0) {
//connection is still there and need to close
context.getConnection().disconnect(error != null);
this.connections.remove(this);//what's that for?
this.clientIdSet.remove(clientId);
}
}
else {
throw new InvalidClientIDException("No clientID specified for connection disconnect request");
}
}
}
public ScheduledExecutorService getScheduledPool() {
return scheduledPool;
}
public ActiveMQServer getServer() {
return server;
}
private void updateClientClusterInfo() {
synchronized (members) {
members.clear();
members.addAll(topologyMap.values());
}
for (OpenWireConnection c : this.connections) {
ConnectionControl control = newConnectionControl();
try {
c.updateClient(control);
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
c.sendException(e);
}
}
}
@Override
@ -169,20 +223,20 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
owConn.init();
OpenWireConnection owConn = new OpenWireConnection(connection, server.getExecutorFactory().getExecutor(), this, wf);
owConn.sendHandshake();
// TODO CLEBERT What is this constant here? we should get it from TTL initial pings
return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000);
}
@Override
public MessageConverter getConverter() {
return new OpenWireMessageConverter();
return messageConverter;
}
@Override
public void removeHandler(String name) {
// TODO Auto-generated method stub
}
@Override
@ -225,119 +279,60 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
// TODO Auto-generated method stub
}
public void handleCommand(OpenWireConnection openWireConnection, Object command) throws Exception {
Command amqCmd = (Command) command;
byte type = amqCmd.getDataStructureType();
switch (type) {
case CommandTypes.CONNECTION_INFO:
break;
case CommandTypes.CONNECTION_CONTROL:
/** The ConnectionControl packet sent from client informs the broker that is capable of supporting dynamic
* failover and load balancing. These features are not yet implemented for Artemis OpenWire. Instead we
* simply drop the packet. See: ACTIVEMQ6-108 */
break;
case CommandTypes.MESSAGE_PULL:
MessagePull messagePull = (MessagePull) amqCmd;
openWireConnection.processMessagePull(messagePull);
break;
case CommandTypes.CONSUMER_CONTROL:
break;
default:
throw new IllegalStateException("Cannot handle command: " + command);
}
}
public void sendReply(final OpenWireConnection connection, final Command command) {
server.getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
}
@Override
public void done() {
send(connection, command);
}
});
}
public boolean send(final OpenWireConnection connection, final Command command) {
if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
ActiveMQServerLogger.LOGGER.trace("sending " + command);
}
synchronized (connection) {
if (connection.isDestroyed()) {
return false;
}
try {
connection.physicalSend(command);
}
catch (Exception e) {
return false;
}
catch (Throwable t) {
return false;
}
return true;
}
}
public void addConnection(AMQConnectionContext context, ConnectionInfo info) throws Exception {
public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
String username = info.getUserName();
String password = info.getPassword();
if (!this.validateUser(username, password)) {
throw new SecurityException("User name [" + username + "] or password is invalid.");
}
String clientId = info.getClientId();
if (clientId == null) {
throw new InvalidClientIDException("No clientID specified for connection request");
}
synchronized (clientIdSet) {
AMQConnectionContext oldContext = clientIdSet.get(clientId);
if (oldContext != null) {
if (context.isAllowLinkStealing()) {
clientIdSet.remove(clientId);
if (oldContext.getConnection() != null) {
OpenWireConnection connection = oldContext.getConnection();
connection.disconnect(true);
}
else {
// log error
}
AMQConnectionContext context;
context = clientIdSet.get(clientId);
if (context != null) {
if (info.isFailoverReconnect()) {
OpenWireConnection oldConnection = context.getConnection();
oldConnection.disconnect(true);
connections.remove(oldConnection);
connection.reconnect(context, info);
}
else {
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + oldContext.getConnection().getRemoteAddress());
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
}
}
else {
//new connection
context = connection.initContext(info);
clientIdSet.put(clientId, context);
}
connections.add(connection);
ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
// do not distribute passwords in advisory messages. usernames okay
ConnectionInfo copy = info.copy();
copy.setPassword("");
fireAdvisory(context, topic, copy);
// init the conn
context.getConnection().addSessions( context.getConnectionState().getSessionIds());
}
connections.add(context.getConnection());
ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
// do not distribute passwords in advisory messages. usernames okay
ConnectionInfo copy = info.copy();
copy.setPassword("");
fireAdvisory(context, topic, copy);
connectionInfos.put(copy.getConnectionId(), copy);
// init the conn
addSessions(context.getConnection(), context.getConnectionState().getSessionIds());
}
private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
public void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
this.fireAdvisory(context, topic, copy, null);
}
public BrokerId getBrokerId() {
// TODO: Use the Storage ID here...
if (brokerId == null) {
brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
}
@ -347,7 +342,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
/*
* See AdvisoryBroker.fireAdvisory()
*/
private void fireAdvisory(AMQConnectionContext context,
public void fireAdvisory(AMQConnectionContext context,
ActiveMQTopic topic,
Command command,
ConsumerId targetConsumerId) throws Exception {
@ -372,13 +367,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
boolean originalFlowControl = context.isProducerFlowControl();
final AMQProducerBrokerExchange producerExchange = new AMQProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
try {
context.setProducerFlowControl(false);
AMQSession sess = context.getConnection().getAdvisorySession();
if (sess != null) {
sess.send(producerExchange, advisoryMessage, false);
sess.send(producerExchange.getProducerState().getInfo(), advisoryMessage, false);
}
}
finally {
@ -392,12 +386,46 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH);
}
catch (Exception e) {
brokerName = "localhost";
brokerName = server.getNodeID().toString();
}
}
return brokerName;
}
protected ConnectionControl newConnectionControl() {
ConnectionControl control = new ConnectionControl();
String uri = generateMembersURI(rebalanceClusterClients);
control.setConnectedBrokers(uri);
control.setRebalanceConnection(rebalanceClusterClients);
return control;
}
private String generateMembersURI(boolean flip) {
String uri;
StringBuffer connectedBrokers = new StringBuffer();
String separator = "";
synchronized (members) {
if (members.size() > 0) {
for (TopologyMember member : members) {
connectedBrokers.append(separator).append(member.toURI());
separator = ",";
}
// The flip exists to guarantee even distribution of URIs when sent to the client
// in case of failures you won't get all the connections failing to a single server.
if (flip && members.size() > 1) {
members.addLast(members.removeFirst());
}
}
}
uri = connectedBrokers.toString();
return uri;
}
public boolean isFaultTolerantConfiguration() {
return false;
}
@ -420,192 +448,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
public boolean isStopping() {
return false;
}
public void addProducer(OpenWireConnection theConn, ProducerInfo info) throws Exception {
SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
ConnectionState cs = theConn.getState();
if (cs == null) {
throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId);
}
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
}
// Avoid replaying dup commands
if (!ss.getProducerIds().contains(info.getProducerId())) {
AMQSession amqSession = sessions.get(sessionId);
if (amqSession == null) {
throw new IllegalStateException("Session not exist! : " + sessionId);
}
ActiveMQDestination destination = info.getDestination();
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) {
throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection());
}
if (destination.isQueue()) {
OpenWireUtil.validateDestination(destination, amqSession);
}
DestinationInfo destInfo = new DestinationInfo(theConn.getConext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
this.addDestination(theConn, destInfo);
}
amqSession.createProducer(info);
try {
ss.addProducer(info);
}
catch (IllegalStateException e) {
amqSession.removeProducer(info);
}
}
}
public void addConsumer(OpenWireConnection theConn, ConsumerInfo info) throws Exception {
// Todo: add a destination interceptors holder here (amq supports this)
SessionId sessionId = info.getConsumerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
ConnectionState cs = theConn.getState();
if (cs == null) {
throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + connectionId);
}
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException(this.server + " Cannot add a consumer to a session that had not been registered: " + sessionId);
}
// Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) {
ActiveMQDestination destination = info.getDestination();
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
if (theConn.getConsumerCount() >= theConn.getMaximumConsumersAllowedPerConnection()) {
throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumConsumersAllowedPerConnection());
}
}
AMQSession amqSession = sessions.get(sessionId);
if (amqSession == null) {
throw new IllegalStateException("Session not exist! : " + sessionId);
}
amqSession.createConsumer(info, amqSession);
ss.addConsumer(info);
}
}
public void addSessions(OpenWireConnection theConn, Set<SessionId> sessionSet) {
Iterator<SessionId> iter = sessionSet.iterator();
while (iter.hasNext()) {
SessionId sid = iter.next();
addSession(theConn, theConn.getState().getSessionState(sid).getInfo(), true);
}
}
public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss) {
return addSession(theConn, ss, false);
}
public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss, boolean internal) {
AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, server, theConn, scheduledPool, this);
amqSession.initialize();
amqSession.setInternal(internal);
sessions.put(ss.getSessionId(), amqSession);
sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
return amqSession;
}
public void removeConnection(AMQConnectionContext context, ConnectionInfo info, Throwable error) {
// todo roll back tx
this.connections.remove(context.getConnection());
this.connectionInfos.remove(info.getConnectionId());
String clientId = info.getClientId();
if (clientId != null) {
this.clientIdSet.remove(clientId);
}
}
public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception {
AMQSession session = sessions.remove(info.getSessionId());
if (session != null) {
session.close();
}
}
public void removeProducer(ProducerId id) {
SessionId sessionId = id.getParentId();
AMQSession session = sessions.get(sessionId);
session.removeProducer(id);
}
public AMQSession getSession(SessionId sessionId) {
return sessions.get(sessionId);
}
public void removeDestination(OpenWireConnection connection, ActiveMQDestination dest) throws Exception {
if (dest.isQueue()) {
SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
this.server.destroyQueue(qName);
}
else {
Bindings bindings = this.server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
Iterator<Binding> iterator = bindings.getBindings().iterator();
while (iterator.hasNext()) {
Queue b = (Queue) iterator.next().getBindable();
if (b.getConsumerCount() > 0) {
throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
}
if (b.isDurable()) {
throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
}
b.deleteQueue();
}
}
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
AMQConnectionContext context = connection.getConext();
DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
fireAdvisory(context, topic, advInfo);
}
}
public void addDestination(OpenWireConnection connection, DestinationInfo info) throws Exception {
ActiveMQDestination dest = info.getDestination();
if (dest.isQueue()) {
SimpleString qName = OpenWireUtil.toCoreAddress(dest);
QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
if (binding == null) {
if (connection.getState().getInfo() != null) {
CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
server.getSecurityStore().check(qName, checkType, connection);
server.checkQueueCreationLimit(connection.getUsername());
}
ConnectionInfo connInfo = connection.getState().getInfo();
this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
}
if (dest.isTemporary()) {
connection.registerTempQueue(dest);
}
}
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
AMQConnectionContext context = connection.getConext();
DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
fireAdvisory(context, topic, advInfo);
}
}
public void endTransaction(TransactionInfo info) throws Exception {
AMQSession txSession = transactions.get(info.getTransactionId());
@ -645,20 +487,15 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
if (txSession != null) {
txSession.rollback(info);
}
transactions.remove(info.getTransactionId());
}
public TransactionId[] recoverTransactions(Set<SessionId> sIds) {
List<TransactionId> recovered = new ArrayList<>();
if (sIds != null) {
for (SessionId sid : sIds) {
AMQSession s = this.sessions.get(sid);
if (s != null) {
s.recover(recovered);
}
}
else if (info.getTransactionId().isLocalTransaction()) {
//during a broker restart, recovered local transaction may not be registered
//in that case we ignore and let the tx removed silently by connection.
//see AMQ1925Test.testAMQ1925_TXBegin
}
return recovered.toArray(new TransactionId[0]);
else {
throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA);
}
transactions.remove(info.getTransactionId());
}
public boolean validateUser(String login, String passcode) {
@ -681,64 +518,63 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
transactions.remove(xid);
}
/**
* TODO: remove this, use the regular ResourceManager from the Server's
*/
public void registerTx(TransactionId txId, AMQSession amqSession) {
transactions.put(txId, amqSession);
}
//advisory support
@Override
public void onNotification(Notification notif) {
try {
if (notif.getType() instanceof CoreNotificationType) {
CoreNotificationType type = (CoreNotificationType) notif.getType();
switch (type) {
case CONSUMER_SLOW:
fireSlowConsumer(notif);
break;
default:
break;
}
}
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.error("Failed to send notification " + notif, e);
}
}
private void fireSlowConsumer(Notification notif) throws Exception {
SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME);
Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME);
SessionId sessionId = sessionIdMap.get(coreSessionId.toString());
AMQSession session = sessions.get(sessionId);
AMQConsumer consumer = session.getConsumer(coreConsumerId);
ActiveMQDestination destination = consumer.getDestination();
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
ConnectionId connId = sessionId.getParentId();
OpenWireConnection cc = this.brokerConnectionStates.get(connId);
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
fireAdvisory(cc.getConext(), topic, advisoryMessage, consumer.getId());
}
}
public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
server.destroyQueue(subQueueName);
}
public void sendBrokerInfo(OpenWireConnection connection) {
public void sendBrokerInfo(OpenWireConnection connection) throws Exception {
BrokerInfo brokerInfo = new BrokerInfo();
brokerInfo.setBrokerName(server.getIdentity());
brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
brokerInfo.setBrokerName(getBrokerName());
brokerInfo.setBrokerId(new BrokerId("" + server.getNodeID()));
brokerInfo.setPeerBrokerInfos(null);
brokerInfo.setFaultTolerantConfiguration(false);
brokerInfo.setBrokerURL(connection.getLocalAddress());
//cluster support yet to support
brokerInfo.setPeerBrokerInfos(null);
connection.dispatchAsync(brokerInfo);
connection.dispatch(brokerInfo);
}
public void setRebalanceClusterClients(boolean rebalance) {
this.rebalanceClusterClients = rebalance;
}
public boolean isRebalanceClusterClients() {
return this.rebalanceClusterClients;
}
public void setUpdateClusterClients(boolean updateClusterClients) {
this.updateClusterClients = updateClusterClients;
}
public boolean isUpdateClusterClients() {
return this.updateClusterClients;
}
public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
}
public boolean isUpdateClusterClientsOnRemove() {
return this.updateClusterClientsOnRemove;
}
public void setBrokerName(String name) {
this.brokerName = name;
}
public static XAException newXAException(String s, int errorCode) {
XAException xaException = new XAException(s + " " + "xaErrorCode:" + errorCode);
xaException.errorCode = errorCode;
return xaException;
}
}

View File

@ -18,16 +18,12 @@ package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.artemis.api.core.SimpleString;
public class OpenWireUtil {
@ -64,23 +60,6 @@ public class OpenWireUtil {
}
}
/**
* Checks to see if this destination exists. If it does not throw an invalid destination exception.
*
* @param destination
* @param amqSession
*/
public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception {
if (destination.isQueue()) {
AMQServerSession coreSession = amqSession.getCoreSession();
SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
BindingQueryResult result = coreSession.executeBindingQuery(physicalName);
if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
}
}
}
/*
*This util converts amq wildcards to compatible core wildcards
*The conversion is like this:

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
public class SendingResult {
private boolean blockNextSend;
private PagingStoreImpl blockPagingStore;
private SimpleString blockingAddress;
public void setBlockNextSend(boolean block) {
this.blockNextSend = block;
}
public boolean isBlockNextSend() {
return this.blockNextSend;
}
public void setBlockPagingStore(PagingStoreImpl store) {
this.blockPagingStore = store;
}
public PagingStoreImpl getBlockPagingStore() {
return this.blockPagingStore;
}
public void setBlockingAddress(SimpleString address) {
this.blockingAddress = address;
}
public SimpleString getBlockingAddress() {
return this.blockingAddress;
}
public boolean isSendFailIfNoSpace() {
AddressFullMessagePolicy policy = this.blockPagingStore.getAddressFullMessagePolicy();
return policy == AddressFullMessagePolicy.FAIL;
}
}

View File

@ -20,15 +20,20 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessagePull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchange {
private final Map<ActiveMQDestination, AMQConsumer> consumerMap;
public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap) {
public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, List<AMQConsumer> consumerList) {
super(amqSession);
this.consumerMap = consumerMap;
this.consumerMap = new HashMap<>();
for (AMQConsumer consumer : consumerList) {
consumerMap.put(consumer.getOpenwireDestination(), consumer);
}
}
@Override

View File

@ -17,9 +17,11 @@
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.WireFormatInfo;
@ -47,6 +49,8 @@ public class AMQConnectionContext {
private boolean clientMaster = true;
private ConnectionState connectionState;
private XATransactionId xid;
private AtomicInteger refCount = new AtomicInteger(1);
private Command lastCommand;
public AMQConnectionContext() {
this.messageEvaluationContext = new MessageEvaluationContext();
@ -248,4 +252,19 @@ public class AMQConnectionContext {
return false;
}
public void incRefCount() {
refCount.incrementAndGet();
}
public int decRefCount() {
return refCount.decrementAndGet();
}
public void setLastCommand(Command lastCommand) {
this.lastCommand = lastCommand;
}
public Command getLastCommand() {
return this.lastCommand;
}
}

View File

@ -27,7 +27,15 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
@ -36,23 +44,15 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
public class AMQConsumer implements BrowserListener {
public class AMQConsumer {
private AMQSession session;
private org.apache.activemq.command.ActiveMQDestination actualDest;
private org.apache.activemq.command.ActiveMQDestination openwireDestination;
private ConsumerInfo info;
private final ScheduledExecutorService scheduledPool;
private long nativeId = -1;
private SimpleString subQueueName = null;
private final int prefetchSize;
private int prefetchSize;
private AtomicInteger windowAvailable;
private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<>();
private long messagePullSequence = 0;
@ -63,7 +63,7 @@ public class AMQConsumer implements BrowserListener {
ConsumerInfo info,
ScheduledExecutorService scheduledPool) {
this.session = amqSession;
this.actualDest = d;
this.openwireDestination = d;
this.info = info;
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize();
@ -73,75 +73,102 @@ public class AMQConsumer implements BrowserListener {
}
}
public void init() throws Exception {
AMQServerSession coreSession = session.getCoreSession();
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
this.nativeId = nativeId;
AMQServerConsumer serverConsumer = createServerConsumer(info, slowConsumerDetectionListener);
serverConsumer.setAmqConsumer(this);
}
private AMQServerConsumer createServerConsumer(ConsumerInfo info, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
nativeId = session.getCoreServer().getStorageManager().generateID();
String physicalName = OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName());
SimpleString address = new SimpleString(this.actualDest.getPhysicalName());
SimpleString address;
if (this.actualDest.isTopic()) {
String physicalName = this.actualDest.getPhysicalName();
if (physicalName.contains(".>")) {
//wildcard
physicalName = OpenWireUtil.convertWildcard(physicalName);
}
// on recreate we don't need to create queues
if (openwireDestination.isTopic()) {
address = new SimpleString("jms.topic." + physicalName);
if (info.isDurable()) {
subQueueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, info.getClientId(), info.getSubscriptionName()));
QueueQueryResult result = coreSession.executeQueueQuery(subQueueName);
if (result.isExists()) {
// Already exists
if (result.getConsumerCount() > 0) {
throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
}
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
SimpleString oldFilterString = result.getFilterString();
boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector);
SimpleString oldTopicName = result.getAddress();
boolean topicChanged = !oldTopicName.equals(address);
if (selectorChanged || topicChanged) {
// Delete the old durable sub
coreSession.deleteQueue(subQueueName);
// Create the new one
coreSession.createQueue(address, subQueueName, selector, false, true);
}
}
else {
coreSession.createQueue(address, subQueueName, selector, false, true);
}
}
else {
subQueueName = new SimpleString(UUID.randomUUID().toString());
coreSession.createQueue(address, subQueueName, selector, true, false);
}
coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
return serverConsumer;
}
else {
SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
}
SimpleString queueName = new SimpleString("jms.queue." + physicalName);
AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
if (addrSettings != null) {
//see PolicyEntry
if (info.getPrefetchSize() != 0 && addrSettings.getQueuePrefetch() == 0) {
//sends back a ConsumerControl
ConsumerControl cc = new ConsumerControl();
cc.setConsumerId(info.getConsumerId());
cc.setPrefetch(0);
session.getConnection().dispatch(cc);
}
}
return serverConsumer;
if (info.isBrowser()) {
AMQServerConsumer coreConsumer = coreSession.getConsumer(nativeId);
coreConsumer.setBrowserListener(this);
}
}
private SimpleString createTopicSubscription(boolean isDurable,
String clientID,
String physicalName,
String subscriptionName,
SimpleString selector,
SimpleString address) throws Exception {
SimpleString queueName;
if (isDurable) {
queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName));
QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
if (result.isExists()) {
// Already exists
if (result.getConsumerCount() > 0) {
throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
}
SimpleString oldFilterString = result.getFilterString();
boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector);
SimpleString oldTopicName = result.getAddress();
boolean topicChanged = !oldTopicName.equals(address);
if (selectorChanged || topicChanged) {
// Delete the old durable sub
session.getCoreSession().deleteQueue(queueName);
// Create the new one
session.getCoreSession().createQueue(address, queueName, selector, false, true);
}
}
else {
session.getCoreSession().createQueue(address, queueName, selector, false, true);
}
}
else {
queueName = new SimpleString(UUID.randomUUID().toString());
session.getCoreSession().createQueue(address, queueName, selector, true, false);
}
return queueName;
}
public long getNativeId() {
return this.nativeId;
}
@ -189,7 +216,7 @@ public class AMQConsumer implements BrowserListener {
public void handleDeliverNullDispatch() {
MessageDispatch md = new MessageDispatch();
md.setConsumerId(getId());
md.setDestination(actualDest);
md.setDestination(openwireDestination);
session.deliverMessage(md);
windowAvailable.decrementAndGet();
}
@ -210,9 +237,16 @@ public class AMQConsumer implements BrowserListener {
mi = iter.next();
if (mi.amqId.equals(lastm)) {
n++;
iter.remove();
session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
session.getCoreSession().commit();
if (!isLocalTx) {
iter.remove();
session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
}
else {
mi.setLocalAcked(true);
}
if (tid == null) {
session.getCoreSession().commit();
}
break;
}
}
@ -220,7 +254,7 @@ public class AMQConsumer implements BrowserListener {
else if (ack.isRedeliveredAck()) {
//client tells that this message is for redlivery.
//do nothing until poisoned.
n = 1;
n = ack.getMessageCount();
}
else if (ack.isPoisonAck()) {
//send to dlq
@ -251,7 +285,7 @@ public class AMQConsumer implements BrowserListener {
}
else if (ack.isDeliveredAck() || ack.isExpiredAck()) {
//ToDo: implement with tests
n = 1;
n = ack.getMessageCount();
}
else {
Iterator<MessageInfo> iter = deliveringRefs.iterator();
@ -294,7 +328,6 @@ public class AMQConsumer implements BrowserListener {
acquireCredit(n);
}
@Override
public void browseFinished() {
MessageDispatch md = new MessageDispatch();
md.setConsumerId(info.getConsumerId());
@ -304,11 +337,6 @@ public class AMQConsumer implements BrowserListener {
session.deliverMessage(md);
}
public boolean handledTransactionalMsg() {
// TODO Auto-generated method stub
return false;
}
//this is called before session commit a local tx
public void finishTx() throws Exception {
MessageInfo lastMi = null;
@ -346,10 +374,6 @@ public class AMQConsumer implements BrowserListener {
}
}
public org.apache.activemq.command.ActiveMQDestination getDestination() {
return actualDest;
}
public ConsumerInfo getInfo() {
return info;
}
@ -370,10 +394,22 @@ public class AMQConsumer implements BrowserListener {
session.removeConsumer(nativeId);
}
public org.apache.activemq.command.ActiveMQDestination getActualDestination() {
return actualDest;
public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
return openwireDestination;
}
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
this.windowAvailable.set(prefetchSize);
this.info.setPrefetchSize(prefetchSize);
if (this.prefetchSize > 0) {
session.getCoreSession().promptDelivery(nativeId);
}
}
/**
* The MessagePullHandler is used with slow consumer policies.
* */
private class MessagePullHandler {
private long next = -1;

View File

@ -22,41 +22,11 @@ import org.apache.activemq.command.MessagePull;
public abstract class AMQConsumerBrokerExchange {
protected final AMQSession amqSession;
private AMQConnectionContext connectionContext;
private boolean wildcard;
public AMQConsumerBrokerExchange(AMQSession amqSession) {
this.amqSession = amqSession;
}
/**
* @return the connectionContext
*/
public AMQConnectionContext getConnectionContext() {
return this.connectionContext;
}
/**
* @param connectionContext the connectionContext to set
*/
public void setConnectionContext(AMQConnectionContext connectionContext) {
this.connectionContext = connectionContext;
}
/**
* @return the wildcard
*/
public boolean isWildcard() {
return this.wildcard;
}
/**
* @param wildcard the wildcard to set
*/
public void setWildcard(boolean wildcard) {
this.wildcard = wildcard;
}
public abstract void acknowledge(MessageAck ack) throws Exception;
public abstract void processMessagePull(MessagePull messagePull) throws Exception;

View File

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
public class AMQProducer {
private AMQSession amqSession;
private ProducerInfo info;
public AMQProducer(AMQSession amqSession, ProducerInfo info) {
this.amqSession = amqSession;
this.info = info;
}
public void init() throws Exception {
// If the destination is specified check that it exists.
if (info.getDestination() != null) {
OpenWireUtil.validateDestination(info.getDestination(), amqSession);
}
}
}

View File

@ -16,34 +16,16 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.state.ProducerState;
public class AMQProducerBrokerExchange {
private AMQConnectionContext connectionContext;
private ProducerState producerState;
private boolean mutable = true;
private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
private boolean auditProducerSequenceIds;
private boolean isNetworkProducer;
private final FlowControlInfo flowControlInfo = new FlowControlInfo();
public AMQProducerBrokerExchange() {
}
public AMQProducerBrokerExchange copy() {
AMQProducerBrokerExchange rc = new AMQProducerBrokerExchange();
rc.connectionContext = connectionContext.copy();
rc.producerState = producerState;
rc.mutable = mutable;
return rc;
}
/**
* @return the connectionContext
*/
@ -58,20 +40,6 @@ public class AMQProducerBrokerExchange {
this.connectionContext = connectionContext;
}
/**
* @return the mutable
*/
public boolean isMutable() {
return this.mutable;
}
/**
* @param mutable the mutable to set
*/
public void setMutable(boolean mutable) {
this.mutable = mutable;
}
/**
* @return the producerState
*/
@ -86,119 +54,6 @@ public class AMQProducerBrokerExchange {
this.producerState = producerState;
}
/**
* Enforce duplicate suppression using info from persistence adapter
*
* @return false if message should be ignored as a duplicate
*/
public boolean canDispatch(Message messageSend) {
boolean canDispatch = true;
if (auditProducerSequenceIds && messageSend.isPersistent()) {
final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
if (isNetworkProducer) {
// messages are multiplexed on this producer so we need to query the
// persistenceAdapter
long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
if (producerSequenceId <= lastStoredForMessageProducer) {
canDispatch = false;
}
}
else if (producerSequenceId <= lastSendSequenceNumber.get()) {
canDispatch = false;
if (messageSend.isInTransaction()) {
}
else {
}
}
else {
// track current so we can suppress duplicates later in the stream
lastSendSequenceNumber.set(producerSequenceId);
}
}
return canDispatch;
}
private long getStoredSequenceIdForMessage(MessageId messageId) {
return -1;
}
public void setLastStoredSequenceId(long l) {
}
public void incrementSend() {
flowControlInfo.incrementSend();
}
public void blockingOnFlowControl(boolean blockingOnFlowControl) {
flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
}
public boolean isBlockedForFlowControl() {
return flowControlInfo.isBlockingOnFlowControl();
}
public void resetFlowControl() {
flowControlInfo.reset();
}
public long getTotalTimeBlocked() {
return flowControlInfo.getTotalTimeBlocked();
}
public int getPercentageBlocked() {
double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
return (int) value * 100;
}
public static class FlowControlInfo {
private AtomicBoolean blockingOnFlowControl = new AtomicBoolean();
private AtomicLong totalSends = new AtomicLong();
private AtomicLong sendsBlocked = new AtomicLong();
private AtomicLong totalTimeBlocked = new AtomicLong();
public boolean isBlockingOnFlowControl() {
return blockingOnFlowControl.get();
}
public void setBlockingOnFlowControl(boolean blockingOnFlowControl) {
this.blockingOnFlowControl.set(blockingOnFlowControl);
if (blockingOnFlowControl) {
incrementSendBlocked();
}
}
public long getTotalSends() {
return totalSends.get();
}
public void incrementSend() {
this.totalSends.incrementAndGet();
}
public long getSendsBlocked() {
return sendsBlocked.get();
}
public void incrementSendBlocked() {
this.sendsBlocked.incrementAndGet();
}
public long getTotalTimeBlocked() {
return totalTimeBlocked.get();
}
public void incrementTimeBlocked(long time) {
this.totalTimeBlocked.addAndGet(time);
}
public void reset() {
blockingOnFlowControl.set(false);
totalSends.set(0);
sendsBlocked.set(0);
totalTimeBlocked.set(0);
}
}
}

View File

@ -23,8 +23,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@ -34,6 +32,18 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public class AMQServerConsumer extends ServerConsumerImpl {
// TODO-NOW: remove this once unified
AMQConsumer amqConsumer;
public AMQConsumer getAmqConsumer() {
return amqConsumer;
}
/** TODO-NOW: remove this once unified */
public void setAmqConsumer(AMQConsumer amqConsumer) {
this.amqConsumer = amqConsumer;
}
public AMQServerConsumer(long consumerID,
AMQServerSession serverSession,
QueueBinding binding,
@ -51,81 +61,6 @@ public class AMQServerConsumer extends ServerConsumerImpl {
super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
}
public void setBrowserListener(BrowserListener listener) {
AMQBrowserDeliverer newBrowserDeliverer = new AMQBrowserDeliverer(this.browserDeliverer);
newBrowserDeliverer.listener = listener;
this.browserDeliverer = newBrowserDeliverer;
}
private class AMQBrowserDeliverer extends BrowserDeliverer {
private BrowserListener listener = null;
public AMQBrowserDeliverer(final BrowserDeliverer other) {
super(other.iterator);
}
@Override
public synchronized void run() {
// if the reference was busy during the previous iteration, handle it now
if (current != null) {
try {
HandleStatus status = handle(current);
if (status == HandleStatus.BUSY) {
return;
}
if (status == HandleStatus.HANDLED) {
proceedDeliver(current);
}
current = null;
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, current);
return;
}
}
MessageReference ref = null;
HandleStatus status;
while (true) {
try {
ref = null;
synchronized (messageQueue) {
if (!iterator.hasNext()) {
//here we need to send a null for amq browsers
if (listener != null) {
listener.browseFinished();
}
break;
}
ref = iterator.next();
status = handle(ref);
}
if (status == HandleStatus.HANDLED) {
proceedDeliver(ref);
}
else if (status == HandleStatus.BUSY) {
// keep a reference on the current message reference
// to handle it next time the browser deliverer is executed
current = ref;
break;
}
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, ref);
break;
}
}
}
}
public void amqPutBackToDeliveringList(final List<MessageReference> refs) {
synchronized (this.deliveringRefs) {
for (MessageReference ref : refs) {

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -90,6 +91,12 @@ public class AMQServerSession extends ServerSessionImpl {
@Override
protected void doClose(final boolean failed) throws Exception {
Set<ServerConsumer> consumersClone = new HashSet<>(consumers.values());
for (ServerConsumer consumer : consumersClone) {
AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer;
amqConsumer.setStarted(false);
}
synchronized (this) {
if (tx != null && tx.getXid() == null) {
((AMQTransactionImpl) tx).setRollbackForClose();
@ -143,6 +150,8 @@ public class AMQServerSession extends ServerSessionImpl {
}
//amq specific behavior
// TODO: move this to AMQSession
public void amqRollback(Set<Long> acked) throws Exception {
if (tx == null) {
// Might be null if XA
@ -218,7 +227,9 @@ public class AMQServerSession extends ServerSessionImpl {
final boolean supportLargeMessage,
final Integer credits) throws Exception {
if (this.internal) {
//internal sessions doesn't check security
// Clebert TODO: PQP!!!!!!!!!!!!!!!!!!!!
//internal sessions doesn't check security:: Why??? //// what's the reason for that? Where a link?
Binding binding = postOffice.getBinding(queueName);
@ -309,6 +320,8 @@ public class AMQServerSession extends ServerSessionImpl {
return queue;
}
// Clebert TODO: Get rid of these mthods
@Override
protected void doSend(final ServerMessage msg, final boolean direct) throws Exception {
if (!this.internal) {

View File

@ -32,6 +32,15 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public class AMQServerSessionFactory implements ServerSessionFactory {
private static final AMQServerSessionFactory singleInstance = new AMQServerSessionFactory();
public static AMQServerSessionFactory getInstance() {
return singleInstance;
}
private AMQServerSessionFactory() {
}
@Override
public ServerSessionImpl createCoreSession(String name,
String username,

View File

@ -16,9 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import javax.jms.ResourceAllocationException;
import javax.transaction.xa.Xid;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -26,52 +25,56 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
import org.apache.activemq.artemis.core.protocol.openwire.SendingResult;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback {
private AMQServerSession coreSession;
// ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
protected final IDGenerator idGenerator = new SimpleIDGenerator(0);
private ConnectionInfo connInfo;
private AMQServerSession coreSession;
private SessionInfo sessInfo;
private ActiveMQServer server;
private OpenWireConnection connection;
private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<>();
private Map<Long, AMQProducer> producers = new HashMap<>();
private AtomicBoolean started = new AtomicBoolean(false);
private TransactionId txId = null;
@ -82,6 +85,11 @@ public class AMQSession implements SessionCallback {
private OpenWireProtocolManager manager;
// The sessionWireformat used by the session
// this object is meant to be used per thread / session
// so we make a new one per AMQSession
private final OpenWireMessageConverter converter;
public AMQSession(ConnectionInfo connInfo,
SessionInfo sessInfo,
ActiveMQServer server,
@ -90,10 +98,18 @@ public class AMQSession implements SessionCallback {
OpenWireProtocolManager manager) {
this.connInfo = connInfo;
this.sessInfo = sessInfo;
this.server = server;
this.connection = connection;
this.scheduledPool = scheduledPool;
this.manager = manager;
OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
this.converter = new OpenWireMessageConverter(marshaller.copy());
}
public OpenWireMessageConverter getConverter() {
return converter;
}
public void initialize() {
@ -106,7 +122,7 @@ public class AMQSession implements SessionCallback {
// now
try {
coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, new AMQServerSessionFactory(), true);
coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, AMQServerSessionFactory.getInstance(), true);
long sessionId = sessInfo.getSessionId().getValue();
if (sessionId == -1) {
@ -119,7 +135,9 @@ public class AMQSession implements SessionCallback {
}
public void createConsumer(ConsumerInfo info, AMQSession amqSession) throws Exception {
public List<AMQConsumer> createConsumer(ConsumerInfo info,
AMQSession amqSession,
SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
//check destination
ActiveMQDestination dest = info.getDestination();
ActiveMQDestination[] dests = null;
@ -129,28 +147,45 @@ public class AMQSession implements SessionCallback {
else {
dests = new ActiveMQDestination[]{dest};
}
Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
for (ActiveMQDestination d : dests) {
if (d.isQueue()) {
SimpleString queueName = OpenWireUtil.toCoreAddress(d);
// Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
List<AMQConsumer> consumersList = new java.util.LinkedList<>();
for (ActiveMQDestination openWireDest : dests) {
if (openWireDest.isQueue()) {
SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
getCoreServer().getJMSQueueCreator().create(queueName);
}
AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool);
consumer.init();
consumerMap.put(d, consumer);
AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
consumer.init(slowConsumerDetectionListener, idGenerator.generateID());
consumersList.add(consumer);
consumers.put(consumer.getNativeId(), consumer);
}
connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap);
return consumersList;
}
public void start() {
coreSession.start();
started.set(true);
}
// rename actualDest to destination
@Override
public void afterDelivery() throws Exception {
}
@Override
public void browserFinished(ServerConsumer consumer) {
AMQConsumer theConsumer = ((AMQServerConsumer) consumer).getAmqConsumer();
if (theConsumer != null) {
theConsumer.browseFinished();
}
}
@Override
public boolean isWritable(ReadyListener callback) {
return connection.isWritable(callback);
@ -197,8 +232,15 @@ public class AMQSession implements SessionCallback {
@Override
public boolean hasCredits(ServerConsumer consumerID) {
AMQConsumer amqConsumer = consumers.get(consumerID.getID());
return amqConsumer.hasCredits();
AMQConsumer amqConsumer;
amqConsumer = consumers.get(consumerID.getID());
if (amqConsumer != null) {
return amqConsumer.hasCredits();
}
return false;
}
@Override
@ -207,6 +249,133 @@ public class AMQSession implements SessionCallback {
}
public void send(final ProducerInfo producerInfo,
final Message messageSend,
boolean sendProducerAck) throws Exception {
TransactionId tid = messageSend.getTransactionId();
if (tid != null) {
resetSessionTx(tid);
}
messageSend.setBrokerInTime(System.currentTimeMillis());
ActiveMQDestination destination = messageSend.getDestination();
ActiveMQDestination[] actualDestinations = null;
if (destination.isComposite()) {
actualDestinations = destination.getCompositeDestinations();
messageSend.setOriginalDestination(destination);
}
else {
actualDestinations = new ActiveMQDestination[]{destination};
}
ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
* the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
* message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */
if (connection.getContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
}
Runnable runnable;
if (sendProducerAck) {
runnable = new Runnable() {
public void run() {
try {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
connection.dispatchSync(ack);
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
}
};
}
else {
final Connection transportConnection = connection.getTransportConnection();
// new Exception("Setting to false").printStackTrace();
if (transportConnection == null) {
// I don't think this could happen, but just in case, avoiding races
runnable = null;
}
else {
runnable = new Runnable() {
public void run() {
transportConnection.setAutoRead(true);
}
};
}
}
internalSend(actualDestinations, originalCoreMsg, runnable);
}
private void internalSend(ActiveMQDestination[] actualDestinations,
ServerMessage originalCoreMsg,
final Runnable onComplete) throws Exception {
Runnable runToUse;
if (actualDestinations.length <= 1 || onComplete == null) {
// if onComplete is null, this will be null ;)
runToUse = onComplete;
}
else {
final AtomicInteger count = new AtomicInteger(actualDestinations.length);
runToUse = new Runnable() {
@Override
public void run() {
if (count.decrementAndGet() == 0) {
onComplete.run();
}
}
};
}
SimpleString[] addresses = new SimpleString[actualDestinations.length];
PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
// We fillup addresses, pagingStores and we will throw failure if that's the case
for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i];
addresses[i] = OpenWireUtil.toCoreAddress(dest);
pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
throw new ResourceAllocationException("Queue is full");
}
}
for (int i = 0; i < actualDestinations.length; i++) {
ServerMessage coreMsg = originalCoreMsg.copy();
coreMsg.setAddress(addresses[i]);
PagingStore store = pagingStores[i];
if (store.isFull()) {
connection.getTransportConnection().setAutoRead(false);
}
getCoreSession().send(coreMsg, false);
if (runToUse != null) {
// if the timeout is >0, it will wait this much milliseconds
// before running the the runToUse
// this will eventually unblock blocked destinations
// playing flow control
store.checkMemory(runToUse);
}
}
}
public AMQServerSession getCoreSession() {
return this.coreSession;
}
@ -222,72 +391,6 @@ public class AMQSession implements SessionCallback {
consumers.remove(consumerId);
}
public void createProducer(ProducerInfo info) throws Exception {
AMQProducer producer = new AMQProducer(this, info);
producer.init();
producers.put(info.getProducerId().getValue(), producer);
}
public void removeProducer(ProducerInfo info) {
removeProducer(info.getProducerId());
}
public void removeProducer(ProducerId id) {
producers.remove(id.getValue());
}
public SendingResult send(AMQProducerBrokerExchange producerExchange,
Message messageSend,
boolean sendProducerAck) throws Exception {
SendingResult result = new SendingResult();
TransactionId tid = messageSend.getTransactionId();
if (tid != null) {
resetSessionTx(tid);
}
messageSend.setBrokerInTime(System.currentTimeMillis());
ActiveMQDestination destination = messageSend.getDestination();
ActiveMQDestination[] actualDestinations = null;
if (destination.isComposite()) {
actualDestinations = destination.getCompositeDestinations();
}
else {
actualDestinations = new ActiveMQDestination[]{destination};
}
for (ActiveMQDestination dest : actualDestinations) {
ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024);
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
* the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
* message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */
if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
coreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
}
OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, connection.getMarshaller());
SimpleString address = OpenWireUtil.toCoreAddress(dest);
coreMsg.setAddress(address);
PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
if (store.isFull()) {
result.setBlockNextSend(true);
result.setBlockPagingStore(store);
result.setBlockingAddress(address);
//now we hold this message send until the store has space.
//we do this by put it in a scheduled task
ScheduledExecutorService scheduler = server.getScheduledPool();
Runnable sendRetryTask = new SendRetryTask(coreMsg, producerExchange, sendProducerAck, messageSend.getSize(), messageSend.getCommandId());
scheduler.schedule(sendRetryTask, 10, TimeUnit.MILLISECONDS);
}
else {
coreSession.send(coreMsg, false);
}
}
return result;
}
public WireFormat getMarshaller() {
return this.connection.getMarshaller();
}
@ -449,87 +552,17 @@ public class AMQSession implements SessionCallback {
return consumers.get(coreConsumerId);
}
private class SendRetryTask implements Runnable {
private ServerMessage coreMsg;
private AMQProducerBrokerExchange producerExchange;
private boolean sendProducerAck;
private int msgSize;
private int commandId;
public SendRetryTask(ServerMessage coreMsg,
AMQProducerBrokerExchange producerExchange,
boolean sendProducerAck,
int msgSize,
int commandId) {
this.coreMsg = coreMsg;
this.producerExchange = producerExchange;
this.sendProducerAck = sendProducerAck;
this.msgSize = msgSize;
this.commandId = commandId;
}
@Override
public void run() {
synchronized (AMQSession.this) {
try {
// check pageStore
SimpleString address = coreMsg.getAddress();
PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
if (store.isFull()) {
// if store is still full, schedule another
server.getScheduledPool().schedule(this, 10, TimeUnit.MILLISECONDS);
}
else {
// now send the message again.
coreSession.send(coreMsg, false);
if (sendProducerAck) {
ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), msgSize);
connection.dispatchAsync(ack);
}
else {
Response response = new Response();
response.setCorrelationId(commandId);
connection.dispatchAsync(response);
}
}
}
catch (Exception e) {
ExceptionResponse response = new ExceptionResponse(e);
response.setCorrelationId(commandId);
connection.dispatchAsync(response);
}
public void updateConsumerPrefetchSize(ConsumerId consumerId, int prefetch) {
Iterator<AMQConsumer> iterator = consumers.values().iterator();
while (iterator.hasNext()) {
AMQConsumer consumer = iterator.next();
if (consumer.getId().equals(consumerId)) {
consumer.setPrefetchSize(prefetch);
}
}
}
public void blockingWaitForSpace(AMQProducerBrokerExchange producerExchange,
SendingResult result) throws IOException {
long start = System.currentTimeMillis();
long nextWarn = start;
producerExchange.blockingOnFlowControl(true);
AMQConnectionContext context = producerExchange.getConnectionContext();
PagingStoreImpl store = result.getBlockPagingStore();
//Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL
long blockedProducerWarningInterval = 30000;
ProducerId producerId = producerExchange.getProducerState().getInfo().getProducerId();
while (store.isFull()) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
long now = System.currentTimeMillis();
if (now >= nextWarn) {
ActiveMQServerLogger.LOGGER.memoryLimitReached(producerId.toString(), result.getBlockingAddress().toString(), ((now - start) / 1000));
nextWarn = now + blockedProducerWarningInterval;
}
}
producerExchange.blockingOnFlowControl(false);
public OpenWireConnection getConnection() {
return connection;
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.util;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.util.ByteSequence;
public class OpenWireUtil {
public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length);
buffer.writeBytes(bytes.data, bytes.offset, bytes.length);
return buffer;
}
public static SimpleString toCoreAddress(ActiveMQDestination dest) {
if (dest.isQueue()) {
return new SimpleString("jms.queue." + dest.getPhysicalName());
}
else {
return new SimpleString("jms.topic." + dest.getPhysicalName());
}
}
/**
* We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
* destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was
* set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the
* consumer
*/
public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
String address = message.getAddress().toString();
String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", "");
if (actualDestination.isQueue()) {
return new ActiveMQQueue(strippedAddress);
}
else {
return new ActiveMQTopic(strippedAddress);
}
}
/*
*This util converts amq wildcards to compatible core wildcards
*The conversion is like this:
*AMQ * wildcard --> Core * wildcard (no conversion)
*AMQ > wildcard --> Core # wildcard
*/
public static String convertWildcard(String physicalName) {
return physicalName.replaceAll("(\\.>)+", ".#");
}
public static XidImpl toXID(TransactionId xaXid) {
return toXID((XATransactionId)xaXid);
}
public static XidImpl toXID(XATransactionId xaXid) {
return new XidImpl(xaXid.getBranchQualifier(), xaXid.getFormatId(), xaXid.getGlobalTransactionId());
}
}

View File

@ -112,6 +112,11 @@ public class StompSession implements SessionCallback {
}
}
@Override
public void browserFinished(ServerConsumer consumer) {
}
@Override
public int sendMessage(ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
LargeServerMessageImpl largeMessage = null;

View File

@ -947,4 +947,8 @@ public interface Configuration {
StoreConfiguration getStoreConfiguration();
Configuration setStoreConfiguration(StoreConfiguration storeConfiguration);
/** It will return all the connectors in a toString manner for debug purposes. */
String debugConnectors();
}

View File

@ -21,7 +21,9 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.lang.reflect.Array;
import java.net.URI;
import java.security.AccessController;
@ -1299,6 +1301,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
public TransportConfiguration[] getTransportConfigurations(final List<String> connectorNames) {
TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size());
int count = 0;
System.out.println(debugConnectors());
for (String connectorName : connectorNames) {
TransportConfiguration connector = getConnectorConfigurations().get(connectorName);
@ -1314,6 +1318,21 @@ public class ConfigurationImpl implements Configuration, Serializable {
return tcConfigs;
}
public String debugConnectors() {
StringWriter stringWriter = new StringWriter();
PrintWriter writer = new PrintWriter(stringWriter);
for (Map.Entry<String, TransportConfiguration> connector : getConnectorConfigurations().entrySet()) {
writer.println("Connector::" + connector.getKey() + " value = " + connector.getValue());
}
writer.close();
return stringWriter.toString();
}
@Override
public boolean isResolveProtocols() {
return resolveProtocols;

View File

@ -126,6 +126,8 @@ public interface PagingStore extends ActiveMQComponent {
boolean checkMemory(Runnable runnable);
boolean isFull();
/**
* Write lock the PagingStore.
*

View File

@ -98,6 +98,10 @@ public final class CoreSessionCallback implements SessionCallback {
channel.send(packet);
}
@Override
public void browserFinished(ServerConsumer consumer) {
}
@Override
public void afterDelivery() throws Exception {

View File

@ -137,6 +137,12 @@ public class InVMConnection implements Connection {
}
}
@Override
public void setAutoRead(boolean autoRead) {
// nothing to be done on the INVM.
// maybe we could eventually implement something, but not needed now
}
@Override
public ActiveMQBuffer createTransportBuffer(final int size) {
return ActiveMQBuffers.dynamicBuffer(size);

View File

@ -169,6 +169,8 @@ public class NettyAcceptor extends AbstractAcceptor {
private final long connectionsAllowed;
private Map<String, Object> extraConfigs;
public NettyAcceptor(final String name,
final ClusterConnection clusterConnection,
final Map<String, Object> configuration,

View File

@ -146,7 +146,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
this.flushExecutor = flushExecutor;
ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName());
// this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
// this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory);
if (config.isResolveProtocols()) {
@ -206,8 +206,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-remoting-threads-" + server.toString() +
"-" +
System.identityHashCode(this), false, Thread.currentThread().getContextClassLoader());
"-" +
System.identityHashCode(this), false, Thread.currentThread().getContextClassLoader());
}
});

View File

@ -243,6 +243,10 @@ public interface ActiveMQServer extends ActiveMQComponent {
Queue locateQueue(SimpleString queueName);
BindingQueryResult bindingQuery(SimpleString address) throws Exception;
QueueQueryResult queueQuery(SimpleString name) throws Exception;
void destroyQueue(SimpleString queueName) throws Exception;
void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception;

View File

@ -25,6 +25,12 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*/
public interface ServerConsumer extends Consumer {
void setlowConsumerDetection(SlowConsumerDetectionListener listener);
SlowConsumerDetectionListener getSlowConsumerDetecion();
void fireSlowConsumer();
/**
* @param protocolContext
* @see #getProtocolContext()

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
interface BrowserListener {
package org.apache.activemq.artemis.core.server;
void browseFinished();
public interface SlowConsumerDetectionListener {
void onSlowConsumer(ServerConsumer consumer);
}

View File

@ -69,6 +69,11 @@ public class EmbeddedActiveMQ {
* @return
*/
public boolean waitClusterForming(long timeWait, TimeUnit unit, int iterations, int servers) throws Exception {
if (activeMQServer.getClusterManager().getClusterConnections() == null ||
activeMQServer.getClusterManager().getClusterConnections().size() == 0) {
return servers == 0;
}
for (int i = 0; i < iterations; i++) {
for (ClusterConnection connection : activeMQServer.getClusterManager().getClusterConnections()) {
if (connection.getTopology().getMembers().size() == servers) {

View File

@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
@ -76,6 +77,8 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
@ -97,6 +100,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
@ -105,6 +109,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.ServerSessionFactory;
@ -544,6 +549,72 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return postOffice.isAddressBound(SimpleString.toSimpleString(address));
}
@Override
public BindingQueryResult bindingQuery(SimpleString address) throws Exception {
if (address == null) {
throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
}
boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
List<SimpleString> names = new ArrayList<>();
// make an exception for the management address (see HORNETQ-29)
ManagementService managementService = getManagementService();
if (managementService != null) {
if (address.equals(managementService.getManagementAddress())) {
return new BindingQueryResult(true, names, autoCreateJmsQueues);
}
}
Bindings bindings = getPostOffice().getMatchingBindings(address);
for (Binding binding : bindings.getBindings()) {
if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
names.add(binding.getUniqueName());
}
}
return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
}
@Override
public QueueQueryResult queueQuery(SimpleString name) {
if (name == null) {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
}
boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
QueueQueryResult response;
Binding binding = getPostOffice().getBinding(name);
SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
Queue queue = (Queue) binding.getBindable();
Filter filter = queue.getFilter();
SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
}
// make an exception for the management address (see HORNETQ-29)
else if (name.equals(managementAddress)) {
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
}
else if (autoCreateJmsQueues) {
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
}
else {
response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
}
return response;
}
@Override
public void threadDump() {
StringWriter str = new StringWriter();

View File

@ -2930,6 +2930,8 @@ public class QueueImpl implements Queue {
}
}
serverConsumer.fireSlowConsumer();
if (connection != null) {
ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
if (policy.equals(SlowConsumerPolicy.KILL)) {

View File

@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -90,6 +91,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private final ActiveMQServer server;
private SlowConsumerDetectionListener slowConsumerListener;
/**
* We get a readLock when a message is handled, and return the readLock when the message is finally delivered
* When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
@ -227,6 +230,23 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// ServerConsumer implementation
// ----------------------------------------------------------------------
@Override
public void setlowConsumerDetection(SlowConsumerDetectionListener listener) {
this.slowConsumerListener = listener;
}
@Override
public SlowConsumerDetectionListener getSlowConsumerDetecion() {
return slowConsumerListener;
}
@Override
public void fireSlowConsumer() {
if (slowConsumerListener != null) {
slowConsumerListener.onSlowConsumer(this);
}
}
@Override
public Object getProtocolContext() {
return protocolContext;
@ -546,12 +566,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
else {
refs.add(ref);
if (!failed) {
// We don't decrement delivery count if the client failed, since there's a possibility that refs
// were actually delivered but we just didn't get any acks for them
// before failure
ref.decrementDeliveryCount();
}
updateDeliveryCountForCanceledRef(ref, failed);
}
if (isTrace) {
@ -566,6 +581,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
return refs;
}
protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) {
if (!failed) {
// We don't decrement delivery count if the client failed, since there's a possibility that refs
// were actually delivered but we just didn't get any acks for them
// before failure
ref.decrementDeliveryCount();
}
}
@Override
public void setStarted(final boolean started) {
synchronized (lock) {
@ -1191,6 +1215,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref = null;
synchronized (messageQueue) {
if (!iterator.hasNext()) {
callback.browserFinished(ServerConsumerImpl.this);
break;
}

View File

@ -50,7 +50,6 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CloseListener;
@ -623,63 +622,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception {
if (name == null) {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
}
boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
QueueQueryResult response;
Binding binding = postOffice.getBinding(name);
if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
Queue queue = (Queue) binding.getBindable();
Filter filter = queue.getFilter();
SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
}
// make an exception for the management address (see HORNETQ-29)
else if (name.equals(managementAddress)) {
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
}
else if (autoCreateJmsQueues) {
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
}
else {
response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
}
return response;
return server.queueQuery(name);
}
@Override
public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
if (address == null) {
throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
}
boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
List<SimpleString> names = new ArrayList<>();
// make an exception for the management address (see HORNETQ-29)
if (address.equals(managementAddress)) {
return new BindingQueryResult(true, names, autoCreateJmsQueues);
}
Bindings bindings = postOffice.getMatchingBindings(address);
for (Binding binding : bindings.getBindings()) {
if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
names.add(binding.getUniqueName());
}
}
return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
return server.bindingQuery(address);
}
@Override

View File

@ -70,6 +70,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final SlowConsumerPolicy DEFAULT_SLOW_CONSUMER_POLICY = SlowConsumerPolicy.NOTIFY;
public static final int DEFAULT_QUEUE_PREFETCH = 1000;
private AddressFullMessagePolicy addressFullMessagePolicy = null;
private Long maxSizeBytes = null;
@ -114,6 +116,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
//from amq5
//make it transient
private transient Integer queuePrefetch = null;
public AddressSettings(AddressSettings other) {
this.addressFullMessagePolicy = other.addressFullMessagePolicy;
this.maxSizeBytes = other.maxSizeBytes;
@ -137,6 +143,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.autoCreateJmsQueues = other.autoCreateJmsQueues;
this.autoDeleteJmsQueues = other.autoDeleteJmsQueues;
this.managementBrowsePageSize = other.managementBrowsePageSize;
this.queuePrefetch = other.queuePrefetch;
}
public AddressSettings() {
@ -333,6 +340,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
public int getQueuePrefetch() {
return queuePrefetch != null ? queuePrefetch : AddressSettings.DEFAULT_QUEUE_PREFETCH;
}
public AddressSettings setQueuePrefetch(int queuePrefetch) {
this.queuePrefetch = queuePrefetch;
return this;
}
/**
* merge 2 objects in to 1
*
@ -403,6 +419,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (managementBrowsePageSize == null) {
managementBrowsePageSize = merged.managementBrowsePageSize;
}
if (queuePrefetch == null) {
queuePrefetch = merged.queuePrefetch;
}
}
@Override
@ -569,6 +588,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode());
result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode());
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
return result;
}
@ -718,6 +738,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
}
else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize))
return false;
if (queuePrefetch == null) {
if (other.queuePrefetch != null)
return false;
}
else if (!queuePrefetch.equals(other.queuePrefetch))
return false;
return true;
}

View File

@ -50,4 +50,7 @@ public interface SessionCallback {
void disconnect(ServerConsumer consumerId, String queueName);
boolean isWritable(ReadyListener callback);
/** Some protocols (Openwire) needs a special message with the browser is finished. */
void browserFinished(ServerConsumer consumer);
}

View File

@ -17,7 +17,9 @@
package org.apache.activemq.artemis.tests.util;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
@ -26,6 +28,7 @@ import org.junit.rules.ExternalResource;
* This is useful to make sure you won't have leaking threads between tests
*/
public class ThreadLeakCheckRule extends ExternalResource {
private static Set<String> extraThreads = new HashSet<String>();
boolean enabled = true;
@ -94,6 +97,11 @@ public class ThreadLeakCheckRule extends ExternalResource {
}
public static void addExtraThreads(String... threads) {
for (String th : threads) {
extraThreads.add(th);
}
}
private boolean checkThread() {
boolean failedThread = false;
@ -183,6 +191,9 @@ public class ThreadLeakCheckRule extends ExternalResource {
// Static workers used by MQTT client.
return true;
}
else if (extraThreads.contains(threadName)) {
return true;
}
else {
for (StackTraceElement element : thread.getStackTrace()) {
if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) {
@ -194,4 +205,7 @@ public class ThreadLeakCheckRule extends ExternalResource {
}
public static void clearExtraThreads() {
extraThreads.clear();
}
}

View File

@ -116,7 +116,7 @@ public class JmsRollbackRedeliveryTest {
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = (TextMessage) consumer.receive(5000);
TextMessage msg = (TextMessage) consumer.receive(6000000);
if (msg != null) {
if (rolledback.put(msg.getText(), Boolean.TRUE) != null) {
LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());

View File

@ -54,12 +54,12 @@ public class SoWriteTimeoutClientTest extends OpenwireArtemisBaseTest {
@BeforeClass
public static void beforeTest() throws Exception {
//this thread keeps alive in original test too. Exclude it.
ThreadLeakCheckRule.addKownThread("WriteTimeoutFilter-Timeout");
ThreadLeakCheckRule.addExtraThreads("WriteTimeoutFilter-Timeout-1");
}
@AfterClass
public static void afterTest() throws Exception {
ThreadLeakCheckRule.removeKownThread("WriteTimeoutFilter-Timeout");
ThreadLeakCheckRule.clearExtraThreads();
}
@Before

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -38,7 +39,10 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.jboss.byteman.contrib.bmunit.BMRule;
@ -83,8 +87,8 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
targetLocation = "ENTRY",
action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"), @BMRule(
name = "stop broker before commit",
targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
targetMethod = "processCommitTransactionOnePhase",
targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
targetMethod = "commit",
targetLocation = "ENTRY",
action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()"),})
public void testFailoverConsumerDups() throws Exception {
@ -177,10 +181,10 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
@BMRule(
name = "stop broker before commit",
targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
targetMethod = "processCommitTransactionOnePhase",
targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
targetMethod = "commit",
targetLocation = "ENTRY",
action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return null")})
action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return")})
public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception {
doTestFailoverConsumerOutstandingSendTx(false);
}
@ -194,8 +198,8 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
targetLocation = "ENTRY",
action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"), @BMRule(
name = "stop broker after commit",
targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
targetMethod = "processCommitTransactionOnePhase",
targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
targetMethod = "commit",
targetLocation = "AT EXIT",
action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()")})
public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception {
@ -232,13 +236,11 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
testConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
LOG.info("consume one: " + message);
LOG.info("consume one and commit: " + message);
assertNotNull("got message", message);
receivedMessages.add((TextMessage) message);
try {
LOG.info("send one");
produceMessage(consumerSession, signalDestination, 1);
LOG.info("commit session");
consumerSession.commit();
}
catch (JMSException e) {
@ -270,8 +272,8 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
// will be stopped by the plugin
brokerStopLatch.await();
doByteman.set(false);
server.stop();
doByteman.set(false);
server = createBroker();
server.start();

View File

@ -519,31 +519,31 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
Assert.assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
}
// @Test
// @BMRules(
// rules = {
// @BMRule(
// name = "set no return response and stop the broker",
// targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
// targetMethod = "processMessageAck",
// targetLocation = "ENTRY",
// action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker($0)")
// }
// )
// public void testFailoverConsumerAckLost() throws Exception {
// LOG.info(this + " running test testFailoverConsumerAckLost");
// // as failure depends on hash order of state tracker recovery, do a few times
// for (int i = 0; i < 3; i++) {
// try {
// LOG.info("Iteration: " + i);
// doTestFailoverConsumerAckLost(i);
// }
// finally {
// stopBroker();
// }
// }
// }
//
@Test
@BMRules(
rules = {
@BMRule(
name = "set no return response and stop the broker",
targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
targetMethod = "processMessageAck",
targetLocation = "ENTRY",
action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker($0)")
}
)
public void testFailoverConsumerAckLost() throws Exception {
LOG.info(this + " running test testFailoverConsumerAckLost");
// as failure depends on hash order of state tracker recovery, do a few times
for (int i = 0; i < 3; i++) {
try {
LOG.info("Iteration: " + i);
doTestFailoverConsumerAckLost(i);
}
finally {
stopBroker();
}
}
}
@SuppressWarnings("unchecked")
public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
broker = createBroker();
@ -567,12 +567,12 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
connection = cf.createConnection();
connection.start();
connections.add(connection);
final Session consumerSession1 = connection.createSession(true, Session.SESSION_TRANSACTED);
final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
connection = cf.createConnection();
connection.start();
connections.add(connection);
final Session consumerSession2 = connection.createSession(true, Session.SESSION_TRANSACTED);
final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
@ -583,7 +583,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
final Vector<Message> receivedMessages = new Vector<>();
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
Thread t = new Thread("doTestFailoverConsumerAckLost(" + pauseSeconds + ")") {
new Thread() {
public void run() {
LOG.info("doing async commit after consume...");
try {
@ -630,16 +630,10 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
e.printStackTrace();
}
}
};
t.start();
}.start();
// will be stopped by the plugin
brokerStopLatch.await(60, TimeUnit.SECONDS);
t.join(30000);
if (t.isAlive()) {
t.interrupt();
Assert.fail("Thread " + t.getName() + " is still alive");
}
broker = createBroker();
broker.start();
doByteman.set(false);
@ -1062,10 +1056,8 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
new Thread() {
public void run() {
try {
if (broker != null) {
broker.stop();
broker = null;
}
broker.stop();
broker = null;
LOG.info("broker stopped.");
}
catch (Exception e) {

View File

@ -50,10 +50,10 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSessionFactory;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@ -507,7 +507,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
*/
@Override
public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
inCall.countDown();
try {
callbackSemaphore.acquire();
@ -518,7 +518,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
try {
return targetCallback.sendMessage(ref, message, consumer, deliveryCount);
return targetCallback.sendMessage(message, consumer, deliveryCount);
}
finally {
callbackSemaphore.release();
@ -530,8 +530,8 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
*/
@Override
public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount);
public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
return targetCallback.sendLargeMessage(message, consumer, bodySize, deliveryCount);
}
/* (non-Javadoc)
@ -581,6 +581,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
String defaultAddress,
SessionCallback callback,
OperationContext context,
ServerSessionFactory sessionFactory,
boolean autoCreateQueue) throws Exception {
return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null);
}

View File

@ -26,7 +26,6 @@ import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -119,7 +118,7 @@ public class BasicSecurityTest extends BasicOpenWireTest {
}
@Test
public void testSendnReceiveAuthorization() throws Exception {
public void testSendnReceiveAuthorization() throws Exception {
Connection sendingConn = null;
Connection receivingConn = null;
@ -153,18 +152,16 @@ public class BasicSecurityTest extends BasicOpenWireTest {
producer = sendingSession.createProducer(dest);
producer.send(message);
MessageConsumer consumer;
MessageConsumer consumer = null;
try {
consumer = sendingSession.createConsumer(dest);
Assert.fail("exception expected");
}
catch (JMSSecurityException e) {
e.printStackTrace();
//expected
}
consumer = receivingSession.createConsumer(dest);
TextMessage received = (TextMessage) consumer.receive(5000);
TextMessage received = (TextMessage) consumer.receive();
assertNotNull(received);
assertEquals("Hello World", received.getText());

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import static org.junit.Assert.assertEquals;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
import org.junit.Test;
public class OpenWireUtilTest {

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -26,27 +27,24 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class SimpleOpenWireTest extends BasicOpenWireTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
@Override
@Before
public void setUp() throws Exception {
@ -54,158 +52,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
super.setUp();
}
@Test
public void testSimple() throws Exception {
Connection connection = factory.createConnection();
Collection<Session> sessions = new LinkedList<>();
for (int i = 0; i < 10; i++) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
sessions.add(session);
}
connection.close();
}
@Test
public void testTransactionalSimple() throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
System.out.println("Queue:" + queue);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
producer.send(session.createTextMessage("test"));
session.commit();
Assert.assertNull(consumer.receive(100));
connection.start();
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertEquals("test", message.getText());
Assert.assertNotNull(message);
message.acknowledge();
}
}
@Test
public void testXASimple() throws Exception {
XAConnection connection = xaFactory.createXAConnection();
Collection<Session> sessions = new LinkedList<>();
for (int i = 0; i < 10; i++) {
XASession session = connection.createXASession();
session.getXAResource().start(newXID(), XAResource.TMNOFLAGS);
sessions.add(session);
}
connection.close();
}
@Test
public void testClientACK() throws Exception {
try {
Connection connection = factory.createConnection();
Collection<Session> sessions = new LinkedList<>();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
System.out.println("Queue:" + queue);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
producer.send(session.createTextMessage("test"));
Assert.assertNull(consumer.receive(100));
connection.start();
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
message.acknowledge();
connection.close();
System.err.println("Done!!!");
}
catch (Throwable e) {
e.printStackTrace();
}
}
@Test
public void testRollback() throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
System.out.println("Queue:" + queue);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
producer.send(session.createTextMessage("test"));
producer.send(session.createTextMessage("test2"));
connection.start();
Assert.assertNull(consumer.receiveNoWait());
session.rollback();
producer.send(session.createTextMessage("test2"));
Assert.assertNull(consumer.receiveNoWait());
session.commit();
TextMessage msg = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(msg);
Assert.assertEquals("test2", msg.getText());
}
}
@Test
public void testAutoAck() throws Exception {
Connection connection = factory.createConnection();
Collection<Session> sessions = new LinkedList<>();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
System.out.println("Queue:" + queue);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage msg = session.createTextMessage("test");
msg.setStringProperty("abc", "testAutoACK");
producer.send(msg);
Assert.assertNull(consumer.receive(100));
connection.start();
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
connection.close();
System.err.println("Done!!!");
}
@Test
public void testProducerFlowControl() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
factory.setProducerWindowSize(1024 * 64);
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("test"));
connection.close();
}
@Test
public void testSimpleQueue() throws Exception {
connection.start();
@ -242,11 +88,12 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
session.close();
}
// @Test -- ignored for now
@Test
public void testKeepAlive() throws Exception {
connection.start();
Thread.sleep(30000);
Thread.sleep(125000);
connection.createSession(false, 1);
}
@ -390,11 +237,9 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("foo");
try {
session.createProducer(queue);
}
catch (JMSException expected) {
}
thrown.expect(InvalidDestinationException.class);
thrown.expect(JMSException.class);
session.createProducer(queue);
session.close();
}
@ -545,6 +390,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
/**
* This is the example shipped with the distribution
*
@ -627,6 +473,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
// simple test sending openwire, consuming core
@Test
public void testMixedOpenWireExample2() throws Exception {
@ -666,396 +513,5 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
conn2.close();
}
@Test
public void testXAConsumer() throws Exception {
Queue queue;
try (Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
queue = session.createQueue(queueName);
System.out.println("Queue:" + queue);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage msg = session.createTextMessage("test" + i);
msg.setStringProperty("myobj", "test" + i);
producer.send(msg);
}
session.close();
}
try (XAConnection xaconnection = xaFactory.createXAConnection()) {
Xid xid = newXID();
XASession session = xaconnection.createXASession();
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
MessageConsumer consumer = session.createConsumer(queue);
xaconnection.start();
for (int i = 0; i < 5; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("test" + i, message.getText());
}
session.getXAResource().end(xid, XAResource.TMSUCCESS);
session.getXAResource().rollback(xid);
consumer.close();
xaconnection.close();
}
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
// Assert.assertEquals("test" + i, message.getText());
System.out.println("Message " + message.getText());
}
checkDuplicate(consumer);
System.out.println("Queue:" + queue);
session.close();
}
System.err.println("Done!!!");
}
@Test
public void testXASameConsumerRollback() throws Exception {
Queue queue;
try (Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
queue = session.createQueue(queueName);
System.out.println("Queue:" + queue);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage msg = session.createTextMessage("test" + i);
msg.setStringProperty("myobj", "test" + i);
producer.send(msg);
}
session.close();
}
try (XAConnection xaconnection = xaFactory.createXAConnection()) {
Xid xid = newXID();
XASession session = xaconnection.createXASession();
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
MessageConsumer consumer = session.createConsumer(queue);
xaconnection.start();
for (int i = 0; i < 5; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("test" + i, message.getText());
}
session.getXAResource().end(xid, XAResource.TMSUCCESS);
session.getXAResource().rollback(xid);
xid = newXID();
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("test" + i, message.getText());
}
checkDuplicate(consumer);
session.getXAResource().end(xid, XAResource.TMSUCCESS);
session.getXAResource().commit(xid, true);
}
}
@Test
public void testXAPrepare() throws Exception {
try {
XAConnection connection = xaFactory.createXAConnection();
XASession xasession = connection.createXASession();
Xid xid = newXID();
xasession.getXAResource().start(xid, XAResource.TMNOFLAGS);
Queue queue = xasession.createQueue(queueName);
MessageProducer producer = xasession.createProducer(queue);
producer.send(xasession.createTextMessage("hello"));
producer.send(xasession.createTextMessage("hello"));
xasession.getXAResource().end(xid, XAResource.TMSUCCESS);
xasession.getXAResource().prepare(xid);
connection.close();
System.err.println("Done!!!");
}
catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testAutoSend() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("testXX" + i));
}
connection.start();
for (int i = 0; i < 10; i++) {
TextMessage txt = (TextMessage) consumer.receive(5000);
Assert.assertEquals("testXX" + i, txt.getText());
}
}
@Test
public void testCommitCloseConsumerBefore() throws Exception {
testCommitCloseConsumer(true);
}
@Test
public void testCommitCloseConsumerAfter() throws Exception {
testCommitCloseConsumer(false);
}
private void testCommitCloseConsumer(boolean closeBefore) throws Exception {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage msg = session.createTextMessage("testXX" + i);
msg.setStringProperty("count", "str " + i);
producer.send(msg);
}
session.commit();
connection.start();
for (int i = 0; i < 5; i++) {
TextMessage txt = (TextMessage) consumer.receive(5000);
Assert.assertEquals("testXX" + i, txt.getText());
}
if (closeBefore) {
consumer.close();
}
session.commit();
// we're testing two scenarios.
// closing the consumer before commit or after commit
if (!closeBefore) {
consumer.close();
}
consumer = session.createConsumer(queue);
// Assert.assertNull(consumer.receiveNoWait());
for (int i = 5; i < 10; i++) {
TextMessage txt = (TextMessage) consumer.receive(5000);
Assert.assertEquals("testXX" + i, txt.getText());
}
Assert.assertNull(consumer.receiveNoWait());
}
@Test
public void testRollbackWithAcked() throws Exception {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage msg = session.createTextMessage("testXX" + i);
msg.setStringProperty("count", "str " + i);
producer.send(msg);
}
session.commit();
connection.start();
for (int i = 0; i < 5; i++) {
TextMessage txt = (TextMessage) consumer.receive(5000);
Assert.assertEquals("testXX" + i, txt.getText());
}
session.rollback();
consumer.close();
consumer = session.createConsumer(queue);
// Assert.assertNull(consumer.receiveNoWait());
for (int i = 0; i < 10; i++) {
TextMessage txt = (TextMessage) consumer.receive(5000);
// System.out.println("TXT::" + txt);
Assert.assertNotNull(txt);
System.out.println("TXT " + txt.getText());
// Assert.assertEquals("testXX" + i, txt.getText());
}
session.commit();
checkDuplicate(consumer);
}
@Test
public void testRollbackLocal() throws Exception {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage msg = session.createTextMessage("testXX" + i);
msg.setStringProperty("count", "str " + i);
producer.send(msg);
}
session.commit();
connection.start();
for (int i = 0; i < 5; i++) {
TextMessage txt = (TextMessage) consumer.receive(500);
Assert.assertEquals("testXX" + i, txt.getText());
}
session.rollback();
for (int i = 0; i < 10; i++) {
TextMessage txt = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(txt);
System.out.println("TXT " + txt.getText());
Assert.assertEquals("testXX" + i, txt.getText());
}
checkDuplicate(consumer);
session.commit();
}
private void checkDuplicate(MessageConsumer consumer) throws JMSException {
boolean duplicatedMessages = false;
while (true) {
TextMessage txt = (TextMessage) consumer.receiveNoWait();
if (txt == null) {
break;
}
else {
duplicatedMessages = true;
System.out.println("received in duplicate:" + txt.getText());
}
}
Assert.assertFalse("received messages in duplicate", duplicatedMessages);
}
@Test
public void testIndividualAck() throws Exception {
connection.start();
Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage msg = session.createTextMessage("testXX" + i);
msg.setStringProperty("count", "str " + i);
producer.send(msg);
}
connection.start();
for (int i = 0; i < 5; i++) {
TextMessage txt = (TextMessage) consumer.receive(5000);
if (i == 4) {
txt.acknowledge();
}
Assert.assertEquals("testXX" + i, txt.getText());
}
consumer.close();
consumer = session.createConsumer(queue);
// Assert.assertNull(consumer.receiveNoWait());
for (int i = 0; i < 4; i++) {
TextMessage txt = (TextMessage) consumer.receive(5000);
txt.acknowledge();
Assert.assertEquals("testXX" + i, txt.getText());
}
for (int i = 5; i < 10; i++) {
TextMessage txt = (TextMessage) consumer.receive(5000);
txt.acknowledge();
Assert.assertEquals("testXX" + i, txt.getText());
}
checkDuplicate(consumer);
Assert.assertNull(consumer.receiveNoWait());
}
@Test
public void testCommitCloseConsumeXA() throws Exception {
Queue queue;
{
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage msg = session.createTextMessage("testXX" + i);
msg.setStringProperty("count", "str " + i);
producer.send(msg);
}
session.commit();
}
try (XAConnection xaconnection = xaFactory.createXAConnection()) {
xaconnection.start();
XASession xasession = xaconnection.createXASession();
Xid xid = newXID();
xasession.getXAResource().start(xid, XAResource.TMNOFLAGS);
MessageConsumer consumer = xasession.createConsumer(queue);
for (int i = 0; i < 5; i++) {
TextMessage txt = (TextMessage) consumer.receive(5000);
Assert.assertEquals("testXX" + i, txt.getText());
}
consumer.close();
xasession.getXAResource().end(xid, XAResource.TMSUCCESS);
xasession.getXAResource().prepare(xid);
xasession.getXAResource().commit(xid, false);
xaconnection.close();
}
{
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
try (MessageConsumer consumer = session.createConsumer(queue)) {
for (int i = 5; i < 10; i++) {
TextMessage txt = (TextMessage) consumer.receive(5000);
Assert.assertEquals("testXX" + i, txt.getText());
}
}
}
}
}

View File

@ -104,16 +104,6 @@ public class BindingsImplTest extends ActiveMQTestBase {
private final class FakeTransaction implements Transaction {
@Override
public Object getProtocolData() {
return null;
}
@Override
public void setProtocolData(Object data) {
}
@Override
public void addOperation(final TransactionOperation sync) {

View File

@ -41,11 +41,6 @@ public class FakeQueue implements Queue {
return false;
}
@Override
public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
}
@Override
public void deleteQueue(boolean removeConsumers) throws Exception {
}