ARTEMIS-1111 Avoid deadlock on AMQP delivery during close
This commit is contained in:
parent
3ff9057ac4
commit
930df5b663
|
@ -92,10 +92,6 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
private final AtomicBoolean draining = new AtomicBoolean(false);
|
||||
|
||||
public Object getProtonLock() {
|
||||
return connection.getLock();
|
||||
}
|
||||
|
||||
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
|
||||
ProtonProtocolManager manager,
|
||||
AMQPConnectionContext connection,
|
||||
|
@ -203,19 +199,31 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false);
|
||||
}
|
||||
|
||||
public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception {
|
||||
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false);
|
||||
public void createTemporaryQueue(String address,
|
||||
String queueName,
|
||||
RoutingType routingType,
|
||||
String filter) throws Exception {
|
||||
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false);
|
||||
}
|
||||
|
||||
public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
|
||||
public void createUnsharedDurableQueue(String address,
|
||||
RoutingType routingType,
|
||||
String queueName,
|
||||
String filter) throws Exception {
|
||||
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false);
|
||||
}
|
||||
|
||||
public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
|
||||
public void createSharedDurableQueue(String address,
|
||||
RoutingType routingType,
|
||||
String queueName,
|
||||
String filter) throws Exception {
|
||||
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false);
|
||||
}
|
||||
|
||||
public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
|
||||
public void createSharedVolatileQueue(String address,
|
||||
RoutingType routingType,
|
||||
String queueName,
|
||||
String filter) throws Exception {
|
||||
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true);
|
||||
}
|
||||
|
||||
|
@ -250,7 +258,9 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
return bindingQueryResult.isExists();
|
||||
}
|
||||
|
||||
public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception {
|
||||
public AddressQueryResult addressQuery(String addressName,
|
||||
RoutingType routingType,
|
||||
boolean autoCreate) throws Exception {
|
||||
AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
|
||||
|
||||
if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
|
||||
|
@ -395,9 +405,13 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
condition.setDescription(errorMessage);
|
||||
Rejected rejected = new Rejected();
|
||||
rejected.setError(condition);
|
||||
synchronized (connection.getLock()) {
|
||||
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(rejected);
|
||||
delivery.settle();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
|
@ -415,7 +429,8 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
if (delivery.getRemoteState() instanceof TransactionalState) {
|
||||
TransactionalState txAccepted = new TransactionalState();
|
||||
txAccepted.setOutcome(Accepted.getInstance());
|
||||
|
@ -426,15 +441,20 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
delivery.disposition(Accepted.getInstance());
|
||||
}
|
||||
delivery.settle();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
|
||||
connection.flush();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -449,9 +469,12 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
final Receiver receiver) {
|
||||
try {
|
||||
if (address == null) {
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
receiver.flow(credits);
|
||||
connection.flush();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -505,9 +528,12 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
try {
|
||||
return plugSender.deliverMessage(ref, deliveryCount);
|
||||
} catch (Exception e) {
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
|
||||
connection.flush();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
throw new IllegalStateException("Can't deliver message " + e, e);
|
||||
}
|
||||
|
@ -538,13 +564,14 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
@Override
|
||||
public void disconnect(ServerConsumer consumer, String queueName) {
|
||||
ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
|
||||
connection.lock();
|
||||
try {
|
||||
synchronized (connection.getLock()) {
|
||||
((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
|
||||
connection.flush();
|
||||
}
|
||||
((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
|
||||
connection.flush();
|
||||
} catch (ActiveMQAMQPException e) {
|
||||
logger.error("Error closing link for " + consumer.getQueue().getAddress());
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -567,18 +594,18 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
return protonSPI.newTransaction();
|
||||
}
|
||||
|
||||
|
||||
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
|
||||
return serverSession.getMatchingQueue(address, routingType);
|
||||
}
|
||||
|
||||
|
||||
public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
|
||||
public SimpleString getMatchingQueue(SimpleString address,
|
||||
SimpleString queueName,
|
||||
RoutingType routingType) throws Exception {
|
||||
return serverSession.getMatchingQueue(address, queueName, routingType);
|
||||
}
|
||||
|
||||
public AddressInfo getAddress(SimpleString address) {
|
||||
return serverSession.getAddress(address);
|
||||
return serverSession.getAddress(address);
|
||||
}
|
||||
|
||||
public void removeTemporaryQueue(String address) throws Exception {
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.UUID;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
|
@ -128,10 +129,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
return false;
|
||||
}
|
||||
|
||||
public Object getLock() {
|
||||
public ReentrantLock getLock() {
|
||||
return handler.getLock();
|
||||
}
|
||||
|
||||
public void lock() {
|
||||
handler.getLock().lock();
|
||||
}
|
||||
|
||||
public void unlock() {
|
||||
handler.getLock().unlock();
|
||||
}
|
||||
|
||||
public int capacity() {
|
||||
return handler.capacity();
|
||||
}
|
||||
|
@ -319,7 +328,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
handler.flushBytes();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void pushBytes(ByteBuf bytes) {
|
||||
connectionCallback.onTransport(bytes, this);
|
||||
|
@ -327,7 +335,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
|
||||
@Override
|
||||
public void onRemoteOpen(Connection connection) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
lock();
|
||||
try {
|
||||
try {
|
||||
initInternal();
|
||||
} catch (Exception e) {
|
||||
|
@ -342,6 +351,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
|
||||
connection.open();
|
||||
}
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
initialise();
|
||||
|
||||
|
@ -367,9 +378,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
|
||||
@Override
|
||||
public void onRemoteClose(Connection connection) {
|
||||
synchronized (getLock()) {
|
||||
lock();
|
||||
try {
|
||||
connection.close();
|
||||
connection.free();
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
|
||||
for (AMQPSessionContext protonSession : sessions.values()) {
|
||||
|
@ -390,8 +404,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
@Override
|
||||
public void onRemoteOpen(Session session) throws Exception {
|
||||
getSessionExtension(session).initialise();
|
||||
synchronized (getLock()) {
|
||||
lock();
|
||||
try {
|
||||
session.open();
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -401,9 +418,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
|
||||
@Override
|
||||
public void onRemoteClose(Session session) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
lock();
|
||||
try {
|
||||
session.close();
|
||||
session.free();
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
|
||||
AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
|
||||
|
@ -428,10 +448,14 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
|
||||
@Override
|
||||
public void onRemoteClose(Link link) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
lock();
|
||||
try {
|
||||
link.close();
|
||||
link.free();
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
|
||||
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
|
||||
if (linkContext != null) {
|
||||
linkContext.close(true);
|
||||
|
@ -440,11 +464,13 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
|
||||
@Override
|
||||
public void onRemoteDetach(Link link) throws Exception {
|
||||
synchronized (getLock()) {
|
||||
lock();
|
||||
try {
|
||||
link.detach();
|
||||
link.free();
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -147,9 +147,12 @@ public class AMQPSessionContext extends ProtonInitializable {
|
|||
coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
|
||||
|
||||
receiver.setContext(transactionHandler);
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
receiver.open();
|
||||
receiver.flow(connection.getAmqpCredits());
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -163,16 +166,23 @@ public class AMQPSessionContext extends ProtonInitializable {
|
|||
senders.put(sender, protonSender);
|
||||
serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
|
||||
sender.setContext(protonSender);
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
sender.open();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
|
||||
protonSender.start();
|
||||
} catch (ActiveMQAMQPException e) {
|
||||
senders.remove(sender);
|
||||
sender.setSource(null);
|
||||
sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
sender.close();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -191,15 +201,21 @@ public class AMQPSessionContext extends ProtonInitializable {
|
|||
protonReceiver.initialise();
|
||||
receivers.put(receiver, protonReceiver);
|
||||
receiver.setContext(protonReceiver);
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
receiver.open();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
} catch (ActiveMQAMQPException e) {
|
||||
receivers.remove(receiver);
|
||||
receiver.setTarget(null);
|
||||
receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
receiver.close();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
if (remoteDesiredCapabilities != null) {
|
||||
List<Symbol> list = Arrays.asList(remoteDesiredCapabilities);
|
||||
if (list.contains(AmqpSupport.DELAYED_DELIVERY)) {
|
||||
receiver.setOfferedCapabilities(new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
|
||||
receiver.setOfferedCapabilities(new Symbol[]{AmqpSupport.DELAYED_DELIVERY});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -179,9 +179,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
condition.setCondition(Symbol.valueOf("failed"));
|
||||
condition.setDescription(e.getMessage());
|
||||
rejected.setError(condition);
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(rejected);
|
||||
delivery.settle();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -210,16 +213,22 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
if (sessionSPI != null) {
|
||||
sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
|
||||
} else {
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
receiver.flow(credits);
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
}
|
||||
|
||||
public void drain(int credits) {
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
receiver.drain(credits);
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
|
@ -95,7 +96,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
private boolean isVolatile = false;
|
||||
private String tempQueueName;
|
||||
|
||||
public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
|
||||
public ProtonServerSenderContext(AMQPConnectionContext connection,
|
||||
Sender sender,
|
||||
AMQPSessionContext protonSession,
|
||||
AMQPSessionCallback server) {
|
||||
super();
|
||||
this.connection = connection;
|
||||
this.sender = sender;
|
||||
|
@ -246,7 +250,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
}
|
||||
//check to see if the client has defined how we act
|
||||
boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
|
||||
if (clientDefined) {
|
||||
if (clientDefined) {
|
||||
multicast = hasCapabilities(TOPIC, source);
|
||||
AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
|
||||
if (!addressQueryResult.isExists()) {
|
||||
|
@ -293,9 +297,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
supportedFilters.put(filter.getKey(), filter.getValue());
|
||||
}
|
||||
|
||||
|
||||
if (queueNameToUse != null) {
|
||||
SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST );
|
||||
SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST);
|
||||
queue = matchingAnycastQueue.toString();
|
||||
}
|
||||
//if the address specifies a broker configured queue then we always use this, treat it as a queue
|
||||
|
@ -313,8 +316,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
if (result.isExists()) {
|
||||
// If a client reattaches to a durable subscription with a different no-local
|
||||
// filter value, selector or address then we must recreate the queue (JMS semantics).
|
||||
if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) ||
|
||||
(sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
|
||||
if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
|
||||
|
||||
if (result.getConsumerCount() == 0) {
|
||||
sessionSPI.deleteQueue(queue);
|
||||
|
@ -392,7 +394,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
|
||||
boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
|
||||
try {
|
||||
brokerConsumer = (Consumer)sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
|
||||
brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
|
||||
} catch (ActiveMQAMQPResourceLimitExceededException e1) {
|
||||
throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
|
||||
} catch (Exception e) {
|
||||
|
@ -404,7 +406,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
return connection.getRemoteContainer();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* close the session
|
||||
*/
|
||||
|
@ -415,8 +416,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
sender.setCondition(condition);
|
||||
}
|
||||
protonSession.removeSender(sender);
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
sender.close();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
|
||||
|
@ -442,7 +446,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
Source source = (Source) sender.getSource();
|
||||
if (source != null && source.getAddress() != null && multicast) {
|
||||
String queueName = source.getAddress();
|
||||
QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false);
|
||||
QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false);
|
||||
if (result.isExists() && source.getDynamic()) {
|
||||
sessionSPI.deleteQueue(queueName);
|
||||
} else {
|
||||
|
@ -489,8 +493,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
|
||||
DeliveryState remoteState;
|
||||
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
remoteState = delivery.getRemoteState();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
|
||||
boolean settleImmediate = true;
|
||||
|
@ -509,8 +516,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
TransactionalState txAccepted = new TransactionalState();
|
||||
txAccepted.setOutcome(Accepted.getInstance());
|
||||
txAccepted.setTxnId(txState.getTxnId());
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(txAccepted);
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
}
|
||||
// we have to individual ack as we can't guarantee we will get the delivery
|
||||
|
@ -556,7 +566,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
Modified modification = (Modified) remoteState;
|
||||
|
||||
if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
|
||||
message.rejectConsumer(((Consumer)brokerConsumer).sequentialID());
|
||||
message.rejectConsumer(((Consumer) brokerConsumer).sequentialID());
|
||||
}
|
||||
|
||||
if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
|
||||
|
@ -585,8 +595,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
}
|
||||
|
||||
public void settle(Delivery delivery) {
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.settle();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -617,10 +630,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
|
||||
int size = nettyBuffer.writerIndex();
|
||||
|
||||
synchronized (connection.getLock()) {
|
||||
if (sender.getLocalState() == EndpointState.CLOSED) {
|
||||
while (!connection.getLock().tryLock(1, TimeUnit.SECONDS)) {
|
||||
if (closed || sender.getLocalState() == EndpointState.CLOSED) {
|
||||
// If we're waiting on the connection lock, the link might be in the process of closing. If this happens
|
||||
// we return.
|
||||
return 0;
|
||||
} else {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Couldn't get lock on deliverMessage " + this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
final Delivery delivery;
|
||||
delivery = sender.delivery(tag, 0, tag.length);
|
||||
delivery.setMessageFormat((int) message.getMessageFormat());
|
||||
|
@ -636,10 +658,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
} else {
|
||||
sender.advance();
|
||||
}
|
||||
connection.flush();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
|
||||
connection.flush();
|
||||
|
||||
return size;
|
||||
} finally {
|
||||
nettyBuffer.release();
|
||||
|
@ -659,7 +682,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
return false;
|
||||
}
|
||||
|
||||
private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) {
|
||||
private static String createQueueName(String clientId,
|
||||
String pubId,
|
||||
boolean shared,
|
||||
boolean global,
|
||||
boolean isVolatile) {
|
||||
String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId;
|
||||
if (shared) {
|
||||
if (queue.contains("|")) {
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
|
@ -58,7 +59,7 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
|
||||
private Sasl serverSasl;
|
||||
|
||||
private final Object lock = new Object();
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
private final long creationTime;
|
||||
|
||||
|
@ -79,38 +80,41 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
}
|
||||
|
||||
public long tick(boolean firstTick) {
|
||||
lock.lock();
|
||||
try {
|
||||
synchronized (lock) {
|
||||
if (!firstTick) {
|
||||
try {
|
||||
if (connection.getLocalState() != EndpointState.CLOSED) {
|
||||
long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
|
||||
if (transport.isClosed()) {
|
||||
throw new IllegalStateException("Channel was inactive for to long");
|
||||
}
|
||||
return rescheduleAt;
|
||||
if (!firstTick) {
|
||||
try {
|
||||
if (connection.getLocalState() != EndpointState.CLOSED) {
|
||||
long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
|
||||
if (transport.isClosed()) {
|
||||
throw new IllegalStateException("Channel was inactive for to long");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn(e.getMessage(), e);
|
||||
transport.close();
|
||||
connection.setCondition(new ErrorCondition());
|
||||
return rescheduleAt;
|
||||
}
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
log.warn(e.getMessage(), e);
|
||||
transport.close();
|
||||
connection.setCondition(new ErrorCondition());
|
||||
}
|
||||
return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
|
||||
return 0;
|
||||
}
|
||||
return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
|
||||
} finally {
|
||||
lock.unlock();
|
||||
flushBytes();
|
||||
}
|
||||
}
|
||||
|
||||
public int capacity() {
|
||||
synchronized (lock) {
|
||||
lock.lock();
|
||||
try {
|
||||
return transport.capacity();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Object getLock() {
|
||||
public ReentrantLock getLock() {
|
||||
return lock;
|
||||
}
|
||||
|
||||
|
@ -142,7 +146,8 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
}
|
||||
|
||||
public void flushBytes() {
|
||||
synchronized (lock) {
|
||||
lock.lock();
|
||||
try {
|
||||
while (true) {
|
||||
int pending = transport.pending();
|
||||
|
||||
|
@ -161,17 +166,19 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
|
||||
transport.pop(pending);
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public SASLResult getSASLResult() {
|
||||
return saslResult;
|
||||
}
|
||||
|
||||
public void inputBuffer(ByteBuf buffer) {
|
||||
dataReceived = true;
|
||||
synchronized (lock) {
|
||||
lock.lock();
|
||||
try {
|
||||
while (buffer.readableBytes() > 0) {
|
||||
int capacity = transport.capacity();
|
||||
|
||||
|
@ -208,6 +215,8 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -224,20 +233,26 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
}
|
||||
|
||||
public void flush() {
|
||||
synchronized (lock) {
|
||||
lock.lock();
|
||||
try {
|
||||
transport.process();
|
||||
checkServerSASL();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
dispatch();
|
||||
}
|
||||
|
||||
public void close(ErrorCondition errorCondition) {
|
||||
synchronized (lock) {
|
||||
lock.lock();
|
||||
try {
|
||||
if (errorCondition != null) {
|
||||
connection.setCondition(errorCondition);
|
||||
}
|
||||
connection.close();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
flush();
|
||||
|
@ -283,7 +298,8 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
private void dispatch() {
|
||||
Event ev;
|
||||
|
||||
synchronized (lock) {
|
||||
lock.lock();
|
||||
try {
|
||||
if (inDispatch) {
|
||||
// Avoid recursion from events
|
||||
return;
|
||||
|
@ -309,6 +325,8 @@ public class ProtonHandler extends ProtonInitializable {
|
|||
} finally {
|
||||
inDispatch = false;
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
flushBytes();
|
||||
|
|
|
@ -72,7 +72,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
|||
ByteBuffer buffer;
|
||||
MessageImpl msg;
|
||||
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
// Replenish coordinator receiver credit on exhaustion so sender can continue
|
||||
// transaction declare and discahrge operations.
|
||||
if (receiver.getCredit() < amqpLowMark) {
|
||||
|
@ -94,6 +95,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
|||
receiver.advance();
|
||||
|
||||
msg = decodeMessage(buffer);
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
|
||||
Object action = ((AmqpValue) msg.getBody()).getValue();
|
||||
|
@ -102,45 +105,63 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
|||
Binary txID = sessionSPI.newTransaction();
|
||||
Declared declared = new Declared();
|
||||
declared.setTxnId(txID);
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(declared);
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
} else if (action instanceof Discharge) {
|
||||
Discharge discharge = (Discharge) action;
|
||||
|
||||
Binary txID = discharge.getTxnId();
|
||||
ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true);
|
||||
ProtonTransactionImpl tx = (ProtonTransactionImpl) sessionSPI.getTransaction(txID, true);
|
||||
tx.discharge();
|
||||
|
||||
if (discharge.getFail()) {
|
||||
tx.rollback();
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(new Accepted());
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
} else {
|
||||
tx.commit();
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(new Accepted());
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
}
|
||||
} catch (ActiveMQAMQPException amqpE) {
|
||||
log.warn(amqpE.getMessage(), amqpE);
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
} catch (Throwable e) {
|
||||
log.warn(e.getMessage(), e);
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
} finally {
|
||||
synchronized (connection.getLock()) {
|
||||
connection.lock();
|
||||
try {
|
||||
delivery.settle();
|
||||
} finally {
|
||||
connection.unlock();
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
|
|
|
@ -1584,6 +1584,45 @@ public class ProtonTest extends ProtonTestBase {
|
|||
System.out.println("taken = " + taken);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
|
||||
String name = "exampleQueue1";
|
||||
|
||||
int numMessages = 50;
|
||||
|
||||
System.out.println("1. Send messages into queue");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue queue = session.createQueue(name);
|
||||
MessageProducer p = session.createProducer(queue);
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
TextMessage message = session.createTextMessage();
|
||||
message.setText("Message temporary");
|
||||
p.send(message);
|
||||
}
|
||||
p.close();
|
||||
session.close();
|
||||
|
||||
System.out.println("2. Receive one by one, each in its own session");
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
queue = session.createQueue(name);
|
||||
MessageConsumer c = session.createConsumer(queue);
|
||||
Message m = c.receive(1000);
|
||||
p.close();
|
||||
session.close();
|
||||
}
|
||||
|
||||
System.out.println("3. Try to receive 10 in the same session");
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
queue = session.createQueue(name);
|
||||
MessageConsumer c = session.createConsumer(queue);
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
Message m = c.receive(1000);
|
||||
}
|
||||
p.close();
|
||||
session.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleObject() throws Throwable {
|
||||
final int numMessages = 1;
|
||||
|
|
Loading…
Reference in New Issue