diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 58d51db7d3..2682e0facf 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -92,10 +92,6 @@ public class AMQPSessionCallback implements SessionCallback { private final AtomicBoolean draining = new AtomicBoolean(false); - public Object getProtonLock() { - return connection.getLock(); - } - public AMQPSessionCallback(AMQPConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, @@ -203,19 +199,31 @@ public class AMQPSessionCallback implements SessionCallback { serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false); } - public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false); + public void createTemporaryQueue(String address, + String queueName, + RoutingType routingType, + String filter) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false); } - public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception { + public void createUnsharedDurableQueue(String address, + RoutingType routingType, + String queueName, + String filter) throws Exception { serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false); } - public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception { + public void createSharedDurableQueue(String address, + RoutingType routingType, + String queueName, + String filter) throws Exception { serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false); } - public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception { + public void createSharedVolatileQueue(String address, + RoutingType routingType, + String queueName, + String filter) throws Exception { serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true); } @@ -250,7 +258,9 @@ public class AMQPSessionCallback implements SessionCallback { return bindingQueryResult.isExists(); } - public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception { + public AddressQueryResult addressQuery(String addressName, + RoutingType routingType, + boolean autoCreate) throws Exception { AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) { @@ -395,9 +405,13 @@ public class AMQPSessionCallback implements SessionCallback { condition.setDescription(errorMessage); Rejected rejected = new Rejected(); rejected.setError(condition); - synchronized (connection.getLock()) { + + connection.lock(); + try { delivery.disposition(rejected); delivery.settle(); + } finally { + connection.unlock(); } connection.flush(); } @@ -415,7 +429,8 @@ public class AMQPSessionCallback implements SessionCallback { manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { @Override public void done() { - synchronized (connection.getLock()) { + connection.lock(); + try { if (delivery.getRemoteState() instanceof TransactionalState) { TransactionalState txAccepted = new TransactionalState(); txAccepted.setOutcome(Accepted.getInstance()); @@ -426,15 +441,20 @@ public class AMQPSessionCallback implements SessionCallback { delivery.disposition(Accepted.getInstance()); } delivery.settle(); + } finally { + connection.unlock(); } connection.flush(); } @Override public void onError(int errorCode, String errorMessage) { - synchronized (connection.getLock()) { + connection.lock(); + try { receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); connection.flush(); + } finally { + connection.unlock(); } } }); @@ -449,9 +469,12 @@ public class AMQPSessionCallback implements SessionCallback { final Receiver receiver) { try { if (address == null) { - synchronized (connection.getLock()) { + connection.lock(); + try { receiver.flow(credits); connection.flush(); + } finally { + connection.unlock(); } return; } @@ -505,9 +528,12 @@ public class AMQPSessionCallback implements SessionCallback { try { return plugSender.deliverMessage(ref, deliveryCount); } catch (Exception e) { - synchronized (connection.getLock()) { + connection.lock(); + try { plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage())); connection.flush(); + } finally { + connection.unlock(); } throw new IllegalStateException("Can't deliver message " + e, e); } @@ -538,13 +564,14 @@ public class AMQPSessionCallback implements SessionCallback { @Override public void disconnect(ServerConsumer consumer, String queueName) { ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName); + connection.lock(); try { - synchronized (connection.getLock()) { - ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec); - connection.flush(); - } + ((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec); + connection.flush(); } catch (ActiveMQAMQPException e) { logger.error("Error closing link for " + consumer.getQueue().getAddress()); + } finally { + connection.unlock(); } } @@ -567,18 +594,18 @@ public class AMQPSessionCallback implements SessionCallback { return protonSPI.newTransaction(); } - public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { return serverSession.getMatchingQueue(address, routingType); } - - public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception { + public SimpleString getMatchingQueue(SimpleString address, + SimpleString queueName, + RoutingType routingType) throws Exception { return serverSession.getMatchingQueue(address, queueName, routingType); } public AddressInfo getAddress(SimpleString address) { - return serverSession.getAddress(address); + return serverSession.getAddress(address); } public void removeTemporaryQueue(String address) throws Exception { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index a884f0da39..2c968c7571 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -24,6 +24,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; @@ -128,10 +129,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH return false; } - public Object getLock() { + public ReentrantLock getLock() { return handler.getLock(); } + public void lock() { + handler.getLock().lock(); + } + + public void unlock() { + handler.getLock().unlock(); + } + public int capacity() { return handler.capacity(); } @@ -319,7 +328,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH handler.flushBytes(); } - @Override public void pushBytes(ByteBuf bytes) { connectionCallback.onTransport(bytes, this); @@ -327,7 +335,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void onRemoteOpen(Connection connection) throws Exception { - synchronized (getLock()) { + lock(); + try { try { initInternal(); } catch (Exception e) { @@ -342,6 +351,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); connection.open(); } + } finally { + unlock(); } initialise(); @@ -367,9 +378,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void onRemoteClose(Connection connection) { - synchronized (getLock()) { + lock(); + try { connection.close(); connection.free(); + } finally { + unlock(); } for (AMQPSessionContext protonSession : sessions.values()) { @@ -390,8 +404,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void onRemoteOpen(Session session) throws Exception { getSessionExtension(session).initialise(); - synchronized (getLock()) { + lock(); + try { session.open(); + } finally { + unlock(); } } @@ -401,9 +418,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void onRemoteClose(Session session) throws Exception { - synchronized (getLock()) { + lock(); + try { session.close(); session.free(); + } finally { + unlock(); } AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext(); @@ -428,10 +448,14 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void onRemoteClose(Link link) throws Exception { - synchronized (getLock()) { + lock(); + try { link.close(); link.free(); + } finally { + unlock(); } + ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); if (linkContext != null) { linkContext.close(true); @@ -440,11 +464,13 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH @Override public void onRemoteDetach(Link link) throws Exception { - synchronized (getLock()) { + lock(); + try { link.detach(); link.free(); + } finally { + unlock(); } - } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index c2c1f2d060..72833e3711 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -147,9 +147,12 @@ public class AMQPSessionContext extends ProtonInitializable { coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn")); receiver.setContext(transactionHandler); - synchronized (connection.getLock()) { + connection.lock(); + try { receiver.open(); receiver.flow(connection.getAmqpCredits()); + } finally { + connection.unlock(); } } @@ -163,16 +166,23 @@ public class AMQPSessionContext extends ProtonInitializable { senders.put(sender, protonSender); serverSenders.put(protonSender.getBrokerConsumer(), protonSender); sender.setContext(protonSender); - synchronized (connection.getLock()) { + connection.lock(); + try { sender.open(); + } finally { + connection.unlock(); } + protonSender.start(); } catch (ActiveMQAMQPException e) { senders.remove(sender); sender.setSource(null); sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - synchronized (connection.getLock()) { + connection.lock(); + try { sender.close(); + } finally { + connection.unlock(); } } } @@ -191,15 +201,21 @@ public class AMQPSessionContext extends ProtonInitializable { protonReceiver.initialise(); receivers.put(receiver, protonReceiver); receiver.setContext(protonReceiver); - synchronized (connection.getLock()) { + connection.lock(); + try { receiver.open(); + } finally { + connection.unlock(); } } catch (ActiveMQAMQPException e) { receivers.remove(receiver); receiver.setTarget(null); receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - synchronized (connection.getLock()) { + connection.lock(); + try { receiver.close(); + } finally { + connection.unlock(); } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 20ef1dfc9c..26064828a4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -117,7 +117,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (remoteDesiredCapabilities != null) { List list = Arrays.asList(remoteDesiredCapabilities); if (list.contains(AmqpSupport.DELAYED_DELIVERY)) { - receiver.setOfferedCapabilities(new Symbol[] {AmqpSupport.DELAYED_DELIVERY}); + receiver.setOfferedCapabilities(new Symbol[]{AmqpSupport.DELAYED_DELIVERY}); } } } @@ -179,9 +179,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements condition.setCondition(Symbol.valueOf("failed")); condition.setDescription(e.getMessage()); rejected.setError(condition); - synchronized (connection.getLock()) { + connection.lock(); + try { delivery.disposition(rejected); delivery.settle(); + } finally { + connection.unlock(); } } } @@ -210,16 +213,22 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (sessionSPI != null) { sessionSPI.offerProducerCredit(address, credits, threshold, receiver); } else { - synchronized (connection.getLock()) { + connection.lock(); + try { receiver.flow(credits); + } finally { + connection.unlock(); } connection.flush(); } } public void drain(int credits) { - synchronized (connection.getLock()) { + connection.lock(); + try { receiver.drain(credits); + } finally { + connection.unlock(); } connection.flush(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index ca14f979b8..756a3d930c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; @@ -95,7 +96,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private boolean isVolatile = false; private String tempQueueName; - public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) { + public ProtonServerSenderContext(AMQPConnectionContext connection, + Sender sender, + AMQPSessionContext protonSession, + AMQPSessionCallback server) { super(); this.connection = connection; this.sender = sender; @@ -246,7 +250,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } //check to see if the client has defined how we act boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source); - if (clientDefined) { + if (clientDefined) { multicast = hasCapabilities(TOPIC, source); AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true); if (!addressQueryResult.isExists()) { @@ -293,9 +297,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr supportedFilters.put(filter.getKey(), filter.getValue()); } - if (queueNameToUse != null) { - SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST ); + SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST); queue = matchingAnycastQueue.toString(); } //if the address specifies a broker configured queue then we always use this, treat it as a queue @@ -313,8 +316,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (result.isExists()) { // If a client reattaches to a durable subscription with a different no-local // filter value, selector or address then we must recreate the queue (JMS semantics). - if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || - (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { + if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { if (result.getConsumerCount() == 0) { sessionSPI.deleteQueue(queue); @@ -392,7 +394,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); try { - brokerConsumer = (Consumer)sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly); + brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly); } catch (ActiveMQAMQPResourceLimitExceededException e1) { throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage()); } catch (Exception e) { @@ -404,7 +406,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return connection.getRemoteContainer(); } - /* * close the session */ @@ -415,8 +416,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr sender.setCondition(condition); } protonSession.removeSender(sender); - synchronized (connection.getLock()) { + connection.lock(); + try { sender.close(); + } finally { + connection.unlock(); } connection.flush(); @@ -442,7 +446,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr Source source = (Source) sender.getSource(); if (source != null && source.getAddress() != null && multicast) { String queueName = source.getAddress(); - QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false); + QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false); if (result.isExists() && source.getDynamic()) { sessionSPI.deleteQueue(queueName); } else { @@ -489,8 +493,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr DeliveryState remoteState; - synchronized (connection.getLock()) { + connection.lock(); + try { remoteState = delivery.getRemoteState(); + } finally { + connection.unlock(); } boolean settleImmediate = true; @@ -509,8 +516,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr TransactionalState txAccepted = new TransactionalState(); txAccepted.setOutcome(Accepted.getInstance()); txAccepted.setTxnId(txState.getTxnId()); - synchronized (connection.getLock()) { + connection.lock(); + try { delivery.disposition(txAccepted); + } finally { + connection.unlock(); } } // we have to individual ack as we can't guarantee we will get the delivery @@ -556,7 +566,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr Modified modification = (Modified) remoteState; if (Boolean.TRUE.equals(modification.getUndeliverableHere())) { - message.rejectConsumer(((Consumer)brokerConsumer).sequentialID()); + message.rejectConsumer(((Consumer) brokerConsumer).sequentialID()); } if (Boolean.TRUE.equals(modification.getDeliveryFailed())) { @@ -585,8 +595,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } public void settle(Delivery delivery) { - synchronized (connection.getLock()) { + connection.lock(); + try { delivery.settle(); + } finally { + connection.unlock(); } } @@ -617,10 +630,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr int size = nettyBuffer.writerIndex(); - synchronized (connection.getLock()) { - if (sender.getLocalState() == EndpointState.CLOSED) { + while (!connection.getLock().tryLock(1, TimeUnit.SECONDS)) { + if (closed || sender.getLocalState() == EndpointState.CLOSED) { + // If we're waiting on the connection lock, the link might be in the process of closing. If this happens + // we return. return 0; + } else { + if (log.isDebugEnabled()) { + log.debug("Couldn't get lock on deliverMessage " + this); + } } + } + + try { final Delivery delivery; delivery = sender.delivery(tag, 0, tag.length); delivery.setMessageFormat((int) message.getMessageFormat()); @@ -636,10 +658,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } else { sender.advance(); } + connection.flush(); + } finally { + connection.unlock(); } - connection.flush(); - return size; } finally { nettyBuffer.release(); @@ -659,7 +682,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return false; } - private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) { + private static String createQueueName(String clientId, + String pubId, + boolean shared, + boolean global, + boolean isVolatile) { String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId; if (shared) { if (queue.contains("|")) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 91b252b33f..fc6cbf64f2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; @@ -58,7 +59,7 @@ public class ProtonHandler extends ProtonInitializable { private Sasl serverSasl; - private final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); private final long creationTime; @@ -79,38 +80,41 @@ public class ProtonHandler extends ProtonInitializable { } public long tick(boolean firstTick) { + lock.lock(); try { - synchronized (lock) { - if (!firstTick) { - try { - if (connection.getLocalState() != EndpointState.CLOSED) { - long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); - if (transport.isClosed()) { - throw new IllegalStateException("Channel was inactive for to long"); - } - return rescheduleAt; + if (!firstTick) { + try { + if (connection.getLocalState() != EndpointState.CLOSED) { + long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); + if (transport.isClosed()) { + throw new IllegalStateException("Channel was inactive for to long"); } - } catch (Exception e) { - log.warn(e.getMessage(), e); - transport.close(); - connection.setCondition(new ErrorCondition()); + return rescheduleAt; } - return 0; + } catch (Exception e) { + log.warn(e.getMessage(), e); + transport.close(); + connection.setCondition(new ErrorCondition()); } - return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); + return 0; } + return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); } finally { + lock.unlock(); flushBytes(); } } public int capacity() { - synchronized (lock) { + lock.lock(); + try { return transport.capacity(); + } finally { + lock.unlock(); } } - public Object getLock() { + public ReentrantLock getLock() { return lock; } @@ -142,7 +146,8 @@ public class ProtonHandler extends ProtonInitializable { } public void flushBytes() { - synchronized (lock) { + lock.lock(); + try { while (true) { int pending = transport.pending(); @@ -161,17 +166,19 @@ public class ProtonHandler extends ProtonInitializable { transport.pop(pending); } + } finally { + lock.unlock(); } } - public SASLResult getSASLResult() { return saslResult; } public void inputBuffer(ByteBuf buffer) { dataReceived = true; - synchronized (lock) { + lock.lock(); + try { while (buffer.readableBytes() > 0) { int capacity = transport.capacity(); @@ -208,6 +215,8 @@ public class ProtonHandler extends ProtonInitializable { break; } } + } finally { + lock.unlock(); } } @@ -224,20 +233,26 @@ public class ProtonHandler extends ProtonInitializable { } public void flush() { - synchronized (lock) { + lock.lock(); + try { transport.process(); checkServerSASL(); + } finally { + lock.unlock(); } dispatch(); } public void close(ErrorCondition errorCondition) { - synchronized (lock) { + lock.lock(); + try { if (errorCondition != null) { connection.setCondition(errorCondition); } connection.close(); + } finally { + lock.unlock(); } flush(); @@ -283,7 +298,8 @@ public class ProtonHandler extends ProtonInitializable { private void dispatch() { Event ev; - synchronized (lock) { + lock.lock(); + try { if (inDispatch) { // Avoid recursion from events return; @@ -309,6 +325,8 @@ public class ProtonHandler extends ProtonInitializable { } finally { inDispatch = false; } + } finally { + lock.unlock(); } flushBytes(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index f817ed4420..4579f1c361 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -72,7 +72,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { ByteBuffer buffer; MessageImpl msg; - synchronized (connection.getLock()) { + connection.lock(); + try { // Replenish coordinator receiver credit on exhaustion so sender can continue // transaction declare and discahrge operations. if (receiver.getCredit() < amqpLowMark) { @@ -94,6 +95,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { receiver.advance(); msg = decodeMessage(buffer); + } finally { + connection.unlock(); } Object action = ((AmqpValue) msg.getBody()).getValue(); @@ -102,45 +105,63 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { Binary txID = sessionSPI.newTransaction(); Declared declared = new Declared(); declared.setTxnId(txID); - synchronized (connection.getLock()) { + connection.lock(); + try { delivery.disposition(declared); + } finally { + connection.unlock(); } } else if (action instanceof Discharge) { Discharge discharge = (Discharge) action; Binary txID = discharge.getTxnId(); - ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true); + ProtonTransactionImpl tx = (ProtonTransactionImpl) sessionSPI.getTransaction(txID, true); tx.discharge(); if (discharge.getFail()) { tx.rollback(); - synchronized (connection.getLock()) { + connection.lock(); + try { delivery.disposition(new Accepted()); + } finally { + connection.unlock(); } connection.flush(); } else { tx.commit(); - synchronized (connection.getLock()) { + connection.lock(); + try { delivery.disposition(new Accepted()); + } finally { + connection.unlock(); } connection.flush(); } } } catch (ActiveMQAMQPException amqpE) { log.warn(amqpE.getMessage(), amqpE); - synchronized (connection.getLock()) { + connection.lock(); + try { delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); + } finally { + connection.unlock(); } connection.flush(); } catch (Throwable e) { log.warn(e.getMessage(), e); - synchronized (connection.getLock()) { + connection.lock(); + try { delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); + } finally { + connection.unlock(); } connection.flush(); } finally { - synchronized (connection.getLock()) { + connection.lock(); + try { delivery.settle(); + } finally { + connection.unlock(); } connection.flush(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 09a44ddd29..4ee94c2a9d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -1584,6 +1584,45 @@ public class ProtonTest extends ProtonTestBase { System.out.println("taken = " + taken); } + @Test + public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable { + String name = "exampleQueue1"; + + int numMessages = 50; + + System.out.println("1. Send messages into queue"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(name); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < numMessages; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message temporary"); + p.send(message); + } + p.close(); + session.close(); + + System.out.println("2. Receive one by one, each in its own session"); + for (int i = 0; i < numMessages; i++) { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue(name); + MessageConsumer c = session.createConsumer(queue); + Message m = c.receive(1000); + p.close(); + session.close(); + } + + System.out.println("3. Try to receive 10 in the same session"); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue(name); + MessageConsumer c = session.createConsumer(queue); + for (int i = 0; i < numMessages; i++) { + Message m = c.receive(1000); + } + p.close(); + session.close(); + } + @Test public void testSimpleObject() throws Throwable { final int numMessages = 1;