This commit is contained in:
Clebert Suconic 2017-03-24 17:46:01 -04:00
commit f2e0891b0e
3 changed files with 37 additions and 22 deletions

View File

@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
@ -50,12 +56,6 @@ import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
public class AMQPConnectionContext extends ProtonInitializable {
private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
@ -210,7 +210,6 @@ public class AMQPConnectionContext extends ProtonInitializable {
} else {
Sender sender = (Sender) link;
protonSession.addSender(sender);
sender.offer(1);
}
}
@ -421,8 +420,10 @@ public class AMQPConnectionContext extends ProtonInitializable {
@Override
public void onRemoteClose(Link link) throws Exception {
link.close();
link.free();
synchronized (getLock()) {
link.close();
link.free();
}
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) {
linkContext.close(true);
@ -431,8 +432,12 @@ public class AMQPConnectionContext extends ProtonInitializable {
@Override
public void onRemoteDetach(Link link) throws Exception {
link.detach();
link.free();
synchronized (getLock()) {
link.detach();
link.free();
}
flush();
}
@Override

View File

@ -147,8 +147,10 @@ public class AMQPSessionContext extends ProtonInitializable {
coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
receiver.setContext(transactionHandler);
receiver.open();
receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
synchronized (connection.getLock()) {
receiver.open();
receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
}
}
public void addSender(Sender sender) throws Exception {
@ -161,13 +163,17 @@ public class AMQPSessionContext extends ProtonInitializable {
senders.put(sender, protonSender);
serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
sender.setContext(protonSender);
sender.open();
synchronized (connection.getLock()) {
sender.open();
}
protonSender.start();
} catch (ActiveMQAMQPException e) {
senders.remove(sender);
sender.setSource(null);
sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
sender.close();
synchronized (connection.getLock()) {
sender.close();
}
}
}
@ -185,12 +191,16 @@ public class AMQPSessionContext extends ProtonInitializable {
protonReceiver.initialise();
receivers.put(receiver, protonReceiver);
receiver.setContext(protonReceiver);
receiver.open();
synchronized (connection.getLock()) {
receiver.open();
}
} catch (ActiveMQAMQPException e) {
receivers.remove(receiver);
receiver.setTarget(null);
receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
receiver.close();
synchronized (connection.getLock()) {
receiver.close();
}
}
}
}

View File

@ -52,7 +52,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
/*
The maximum number of credits we will allocate to clients.
This number is also used by the broker when refresh client credits.
*/
*/
private static int maxCreditAllocation = 100;
// Used by the broker to decide when to refresh clients credit. This is not used when client requests credit.
@ -170,8 +170,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
condition.setCondition(Symbol.valueOf("failed"));
condition.setDescription(e.getMessage());
rejected.setError(condition);
delivery.disposition(rejected);
delivery.settle();
synchronized (connection.getLock()) {
delivery.disposition(rejected);
delivery.settle();
}
}
}
@ -204,7 +206,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
connection.flush();
}
}
public void drain(int credits) {
@ -221,5 +222,4 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
public boolean isDraining() {
return receiver.draining();
}
}