This commit is contained in:
Clebert Suconic 2017-04-12 12:35:26 -04:00
commit 851803daa1
8 changed files with 275 additions and 92 deletions

View File

@ -92,10 +92,6 @@ public class AMQPSessionCallback implements SessionCallback {
private final AtomicBoolean draining = new AtomicBoolean(false);
public Object getProtonLock() {
return connection.getLock();
}
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
@ -203,19 +199,31 @@ public class AMQPSessionCallback implements SessionCallback {
serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false);
}
public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception {
public void createTemporaryQueue(String address,
String queueName,
RoutingType routingType,
String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false);
}
public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
public void createUnsharedDurableQueue(String address,
RoutingType routingType,
String queueName,
String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false);
}
public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
public void createSharedDurableQueue(String address,
RoutingType routingType,
String queueName,
String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false);
}
public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception {
public void createSharedVolatileQueue(String address,
RoutingType routingType,
String queueName,
String filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true);
}
@ -250,7 +258,9 @@ public class AMQPSessionCallback implements SessionCallback {
return bindingQueryResult.isExists();
}
public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception {
public AddressQueryResult addressQuery(String addressName,
RoutingType routingType,
boolean autoCreate) throws Exception {
AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
@ -395,9 +405,13 @@ public class AMQPSessionCallback implements SessionCallback {
condition.setDescription(errorMessage);
Rejected rejected = new Rejected();
rejected.setError(condition);
synchronized (connection.getLock()) {
connection.lock();
try {
delivery.disposition(rejected);
delivery.settle();
} finally {
connection.unlock();
}
connection.flush();
}
@ -415,7 +429,8 @@ public class AMQPSessionCallback implements SessionCallback {
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void done() {
synchronized (connection.getLock()) {
connection.lock();
try {
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
@ -426,15 +441,20 @@ public class AMQPSessionCallback implements SessionCallback {
delivery.disposition(Accepted.getInstance());
}
delivery.settle();
} finally {
connection.unlock();
}
connection.flush();
}
@Override
public void onError(int errorCode, String errorMessage) {
synchronized (connection.getLock()) {
connection.lock();
try {
receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
connection.flush();
} finally {
connection.unlock();
}
}
});
@ -449,9 +469,12 @@ public class AMQPSessionCallback implements SessionCallback {
final Receiver receiver) {
try {
if (address == null) {
synchronized (connection.getLock()) {
connection.lock();
try {
receiver.flow(credits);
connection.flush();
} finally {
connection.unlock();
}
return;
}
@ -505,9 +528,12 @@ public class AMQPSessionCallback implements SessionCallback {
try {
return plugSender.deliverMessage(ref, deliveryCount);
} catch (Exception e) {
synchronized (connection.getLock()) {
connection.lock();
try {
plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
connection.flush();
} finally {
connection.unlock();
}
throw new IllegalStateException("Can't deliver message " + e, e);
}
@ -538,13 +564,14 @@ public class AMQPSessionCallback implements SessionCallback {
@Override
public void disconnect(ServerConsumer consumer, String queueName) {
ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
connection.lock();
try {
synchronized (connection.getLock()) {
((ProtonServerSenderContext) consumer.getProtocolContext()).close(ec);
connection.flush();
}
} catch (ActiveMQAMQPException e) {
logger.error("Error closing link for " + consumer.getQueue().getAddress());
} finally {
connection.unlock();
}
}
@ -567,13 +594,13 @@ public class AMQPSessionCallback implements SessionCallback {
return protonSPI.newTransaction();
}
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
return serverSession.getMatchingQueue(address, routingType);
}
public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception {
public SimpleString getMatchingQueue(SimpleString address,
SimpleString queueName,
RoutingType routingType) throws Exception {
return serverSession.getMatchingQueue(address, queueName, routingType);
}

View File

@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@ -128,10 +129,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return false;
}
public Object getLock() {
public ReentrantLock getLock() {
return handler.getLock();
}
public void lock() {
handler.getLock().lock();
}
public void unlock() {
handler.getLock().unlock();
}
public int capacity() {
return handler.capacity();
}
@ -319,7 +328,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
handler.flushBytes();
}
@Override
public void pushBytes(ByteBuf bytes) {
connectionCallback.onTransport(bytes, this);
@ -327,7 +335,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteOpen(Connection connection) throws Exception {
synchronized (getLock()) {
lock();
try {
try {
initInternal();
} catch (Exception e) {
@ -342,6 +351,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
connection.open();
}
} finally {
unlock();
}
initialise();
@ -367,9 +378,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteClose(Connection connection) {
synchronized (getLock()) {
lock();
try {
connection.close();
connection.free();
} finally {
unlock();
}
for (AMQPSessionContext protonSession : sessions.values()) {
@ -390,8 +404,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteOpen(Session session) throws Exception {
getSessionExtension(session).initialise();
synchronized (getLock()) {
lock();
try {
session.open();
} finally {
unlock();
}
}
@ -401,9 +418,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteClose(Session session) throws Exception {
synchronized (getLock()) {
lock();
try {
session.close();
session.free();
} finally {
unlock();
}
AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
@ -428,10 +448,14 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteClose(Link link) throws Exception {
synchronized (getLock()) {
lock();
try {
link.close();
link.free();
} finally {
unlock();
}
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) {
linkContext.close(true);
@ -440,11 +464,13 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
@Override
public void onRemoteDetach(Link link) throws Exception {
synchronized (getLock()) {
lock();
try {
link.detach();
link.free();
} finally {
unlock();
}
}
@Override

View File

@ -147,9 +147,12 @@ public class AMQPSessionContext extends ProtonInitializable {
coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
receiver.setContext(transactionHandler);
synchronized (connection.getLock()) {
connection.lock();
try {
receiver.open();
receiver.flow(connection.getAmqpCredits());
} finally {
connection.unlock();
}
}
@ -163,16 +166,23 @@ public class AMQPSessionContext extends ProtonInitializable {
senders.put(sender, protonSender);
serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
sender.setContext(protonSender);
synchronized (connection.getLock()) {
connection.lock();
try {
sender.open();
} finally {
connection.unlock();
}
protonSender.start();
} catch (ActiveMQAMQPException e) {
senders.remove(sender);
sender.setSource(null);
sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
synchronized (connection.getLock()) {
connection.lock();
try {
sender.close();
} finally {
connection.unlock();
}
}
}
@ -191,15 +201,21 @@ public class AMQPSessionContext extends ProtonInitializable {
protonReceiver.initialise();
receivers.put(receiver, protonReceiver);
receiver.setContext(protonReceiver);
synchronized (connection.getLock()) {
connection.lock();
try {
receiver.open();
} finally {
connection.unlock();
}
} catch (ActiveMQAMQPException e) {
receivers.remove(receiver);
receiver.setTarget(null);
receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
synchronized (connection.getLock()) {
connection.lock();
try {
receiver.close();
} finally {
connection.unlock();
}
}
}

View File

@ -179,9 +179,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
condition.setCondition(Symbol.valueOf("failed"));
condition.setDescription(e.getMessage());
rejected.setError(condition);
synchronized (connection.getLock()) {
connection.lock();
try {
delivery.disposition(rejected);
delivery.settle();
} finally {
connection.unlock();
}
}
}
@ -210,16 +213,22 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (sessionSPI != null) {
sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
} else {
synchronized (connection.getLock()) {
connection.lock();
try {
receiver.flow(credits);
} finally {
connection.unlock();
}
connection.flush();
}
}
public void drain(int credits) {
synchronized (connection.getLock()) {
connection.lock();
try {
receiver.drain(credits);
} finally {
connection.unlock();
}
connection.flush();
}

View File

@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
@ -95,7 +96,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private boolean isVolatile = false;
private String tempQueueName;
public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
public ProtonServerSenderContext(AMQPConnectionContext connection,
Sender sender,
AMQPSessionContext protonSession,
AMQPSessionCallback server) {
super();
this.connection = connection;
this.sender = sender;
@ -293,7 +297,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
supportedFilters.put(filter.getKey(), filter.getValue());
}
if (queueNameToUse != null) {
SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST);
queue = matchingAnycastQueue.toString();
@ -313,8 +316,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (result.isExists()) {
// If a client reattaches to a durable subscription with a different no-local
// filter value, selector or address then we must recreate the queue (JMS semantics).
if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) ||
(sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);
@ -404,7 +406,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return connection.getRemoteContainer();
}
/*
* close the session
*/
@ -415,8 +416,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
sender.setCondition(condition);
}
protonSession.removeSender(sender);
synchronized (connection.getLock()) {
connection.lock();
try {
sender.close();
} finally {
connection.unlock();
}
connection.flush();
@ -489,8 +493,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
DeliveryState remoteState;
synchronized (connection.getLock()) {
connection.lock();
try {
remoteState = delivery.getRemoteState();
} finally {
connection.unlock();
}
boolean settleImmediate = true;
@ -509,8 +516,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(txState.getTxnId());
synchronized (connection.getLock()) {
connection.lock();
try {
delivery.disposition(txAccepted);
} finally {
connection.unlock();
}
}
// we have to individual ack as we can't guarantee we will get the delivery
@ -585,8 +595,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
public void settle(Delivery delivery) {
synchronized (connection.getLock()) {
connection.lock();
try {
delivery.settle();
} finally {
connection.unlock();
}
}
@ -617,10 +630,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
int size = nettyBuffer.writerIndex();
synchronized (connection.getLock()) {
if (sender.getLocalState() == EndpointState.CLOSED) {
while (!connection.getLock().tryLock(1, TimeUnit.SECONDS)) {
if (closed || sender.getLocalState() == EndpointState.CLOSED) {
// If we're waiting on the connection lock, the link might be in the process of closing. If this happens
// we return.
return 0;
} else {
if (log.isDebugEnabled()) {
log.debug("Couldn't get lock on deliverMessage " + this);
}
}
}
try {
final Delivery delivery;
delivery = sender.delivery(tag, 0, tag.length);
delivery.setMessageFormat((int) message.getMessageFormat());
@ -636,9 +658,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else {
sender.advance();
}
}
connection.flush();
} finally {
connection.unlock();
}
return size;
} finally {
@ -659,7 +682,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return false;
}
private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) {
private static String createQueueName(String clientId,
String pubId,
boolean shared,
boolean global,
boolean isVolatile) {
String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId;
if (shared) {
if (queue.contains("|")) {

View File

@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
@ -58,7 +59,7 @@ public class ProtonHandler extends ProtonInitializable {
private Sasl serverSasl;
private final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final long creationTime;
@ -79,8 +80,8 @@ public class ProtonHandler extends ProtonInitializable {
}
public long tick(boolean firstTick) {
lock.lock();
try {
synchronized (lock) {
if (!firstTick) {
try {
if (connection.getLocalState() != EndpointState.CLOSED) {
@ -98,19 +99,22 @@ public class ProtonHandler extends ProtonInitializable {
return 0;
}
return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
}
} finally {
lock.unlock();
flushBytes();
}
}
public int capacity() {
synchronized (lock) {
lock.lock();
try {
return transport.capacity();
} finally {
lock.unlock();
}
}
public Object getLock() {
public ReentrantLock getLock() {
return lock;
}
@ -142,7 +146,8 @@ public class ProtonHandler extends ProtonInitializable {
}
public void flushBytes() {
synchronized (lock) {
lock.lock();
try {
while (true) {
int pending = transport.pending();
@ -161,17 +166,19 @@ public class ProtonHandler extends ProtonInitializable {
transport.pop(pending);
}
} finally {
lock.unlock();
}
}
public SASLResult getSASLResult() {
return saslResult;
}
public void inputBuffer(ByteBuf buffer) {
dataReceived = true;
synchronized (lock) {
lock.lock();
try {
while (buffer.readableBytes() > 0) {
int capacity = transport.capacity();
@ -208,6 +215,8 @@ public class ProtonHandler extends ProtonInitializable {
break;
}
}
} finally {
lock.unlock();
}
}
@ -224,20 +233,26 @@ public class ProtonHandler extends ProtonInitializable {
}
public void flush() {
synchronized (lock) {
lock.lock();
try {
transport.process();
checkServerSASL();
} finally {
lock.unlock();
}
dispatch();
}
public void close(ErrorCondition errorCondition) {
synchronized (lock) {
lock.lock();
try {
if (errorCondition != null) {
connection.setCondition(errorCondition);
}
connection.close();
} finally {
lock.unlock();
}
flush();
@ -283,7 +298,8 @@ public class ProtonHandler extends ProtonInitializable {
private void dispatch() {
Event ev;
synchronized (lock) {
lock.lock();
try {
if (inDispatch) {
// Avoid recursion from events
return;
@ -309,6 +325,8 @@ public class ProtonHandler extends ProtonInitializable {
} finally {
inDispatch = false;
}
} finally {
lock.unlock();
}
flushBytes();

View File

@ -72,7 +72,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
ByteBuffer buffer;
MessageImpl msg;
synchronized (connection.getLock()) {
connection.lock();
try {
// Replenish coordinator receiver credit on exhaustion so sender can continue
// transaction declare and discahrge operations.
if (receiver.getCredit() < amqpLowMark) {
@ -94,6 +95,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
receiver.advance();
msg = decodeMessage(buffer);
} finally {
connection.unlock();
}
Object action = ((AmqpValue) msg.getBody()).getValue();
@ -102,8 +105,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
Binary txID = sessionSPI.newTransaction();
Declared declared = new Declared();
declared.setTxnId(txID);
synchronized (connection.getLock()) {
connection.lock();
try {
delivery.disposition(declared);
} finally {
connection.unlock();
}
} else if (action instanceof Discharge) {
Discharge discharge = (Discharge) action;
@ -114,33 +120,48 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
if (discharge.getFail()) {
tx.rollback();
synchronized (connection.getLock()) {
connection.lock();
try {
delivery.disposition(new Accepted());
} finally {
connection.unlock();
}
connection.flush();
} else {
tx.commit();
synchronized (connection.getLock()) {
connection.lock();
try {
delivery.disposition(new Accepted());
} finally {
connection.unlock();
}
connection.flush();
}
}
} catch (ActiveMQAMQPException amqpE) {
log.warn(amqpE.getMessage(), amqpE);
synchronized (connection.getLock()) {
connection.lock();
try {
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
} finally {
connection.unlock();
}
connection.flush();
} catch (Throwable e) {
log.warn(e.getMessage(), e);
synchronized (connection.getLock()) {
connection.lock();
try {
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
} finally {
connection.unlock();
}
connection.flush();
} finally {
synchronized (connection.getLock()) {
connection.lock();
try {
delivery.settle();
} finally {
connection.unlock();
}
connection.flush();
}

View File

@ -1584,6 +1584,45 @@ public class ProtonTest extends ProtonTestBase {
System.out.println("taken = " + taken);
}
@Test
public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
String name = "exampleQueue1";
int numMessages = 50;
System.out.println("1. Send messages into queue");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(name);
MessageProducer p = session.createProducer(queue);
for (int i = 0; i < numMessages; i++) {
TextMessage message = session.createTextMessage();
message.setText("Message temporary");
p.send(message);
}
p.close();
session.close();
System.out.println("2. Receive one by one, each in its own session");
for (int i = 0; i < numMessages; i++) {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(name);
MessageConsumer c = session.createConsumer(queue);
Message m = c.receive(1000);
p.close();
session.close();
}
System.out.println("3. Try to receive 10 in the same session");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(name);
MessageConsumer c = session.createConsumer(queue);
for (int i = 0; i < numMessages; i++) {
Message m = c.receive(1000);
}
p.close();
session.close();
}
@Test
public void testSimpleObject() throws Throwable {
final int numMessages = 1;