This commit is contained in:
Clebert Suconic 2018-11-06 22:00:26 -05:00
commit 7c5470548a
5 changed files with 136 additions and 1 deletions
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp
artemis-server/src/main/java/org/apache/activemq/artemis
core/server/impl
spi/core/protocol
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp

View File

@ -56,6 +56,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@ -109,6 +110,8 @@ public class AMQPSessionCallback implements SessionCallback {
private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
private ProtonTransactionHandler transactionHandler;
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
@ -690,6 +693,14 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
@Override
public Transaction getCurrentTransaction() {
if (this.transactionHandler != null) {
return this.transactionHandler.getCurrentTransaction();
}
return null;
}
public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
return protonSPI.getTransaction(txid, remove);
}
@ -740,6 +751,14 @@ public class AMQPSessionCallback implements SessionCallback {
serverSession.removeProducer(name);
}
public void setTransactionHandler(ProtonTransactionHandler transactionHandler) {
this.transactionHandler = transactionHandler;
}
public ProtonTransactionHandler getTransactionHandler() {
return this.transactionHandler;
}
class AddressQueryCache<T> {
SimpleString address;

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import java.nio.ByteBuffer;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@ -47,6 +48,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
private final int amqpCredit;
private final int amqpLowMark;
private Transaction currentTx;
final AMQPSessionCallback sessionSPI;
final AMQPConnectionContext connection;
@ -58,6 +60,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
this.connection = connection;
this.amqpCredit = connection.getAmqpCredits();
this.amqpLowMark = connection.getAmqpLowCredits();
this.sessionSPI.setTransactionHandler(this);
}
@Override
@ -100,6 +103,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
Binary txID = sessionSPI.newTransaction();
Declared declared = new Declared();
declared.setTxnId(txID);
currentTx = sessionSPI.getTransaction(txID, false);
IOCallback ioAction = new IOCallback() {
@Override
public void done() {
@ -115,7 +119,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
@Override
public void onError(int errorCode, String errorMessage) {
currentTx = null;
}
};
sessionSPI.afterIO(ioAction);
@ -133,6 +137,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
try {
delivery.settle();
delivery.disposition(new Accepted());
currentTx = null;
} finally {
connection.unlock();
connection.flush();
@ -192,4 +197,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
message.decode(encoded);
return message;
}
public Transaction getCurrentTransaction() {
return currentTx;
}
}

View File

@ -1884,6 +1884,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return oper.getListOnConsumer(consumerId);
}
} else {
//amqp handles the transaction in callback
if (callback != null) {
Transaction transaction = callback.getCurrentTransaction();
if (transaction != null) {
RefsOperation operation = (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
if (operation != null) {
return operation.getListOnConsumer(consumerId);
}
}
}
return Collections.emptyList();
}
}

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public interface SessionCallback {
@ -93,4 +94,8 @@ public interface SessionCallback {
default void close(boolean failed) {
}
default Transaction getCurrentTransaction() {
return null;
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Map;
public class JMXManagementTest extends JMSClientTestSupport {
@Test
public void testListDeliveringMessages() throws Exception {
SimpleString queue = new SimpleString(getQueueName());
Connection connection1 = createConnection();
Connection connection2 = createConnection();
Session prodSession = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consSession = connection2.createSession(true, Session.SESSION_TRANSACTED);
javax.jms.Queue jmsQueue = prodSession.createQueue(queue.toString());
QueueControl queueControl = createManagementControl(queue, queue);
MessageProducer producer = prodSession.createProducer(jmsQueue);
final int num = 20;
for (int i = 0; i < num; i++) {
TextMessage message = prodSession.createTextMessage("hello" + i);
producer.send(message);
}
connection2.start();
MessageConsumer consumer = consSession.createConsumer(jmsQueue);
for (int i = 0; i < num; i++) {
TextMessage msgRec = (TextMessage) consumer.receive(5000);
assertNotNull(msgRec);
assertEquals(msgRec.getText(), "hello" + i);
}
//before commit
assertEquals(num, queueControl.getDeliveringCount());
Map<String, Map<String, Object>[]> result = queueControl.listDeliveringMessages();
assertEquals(1, result.size());
Map<String, Object>[] msgMaps = result.entrySet().iterator().next().getValue();
assertEquals(num, msgMaps.length);
consSession.commit();
result = queueControl.listDeliveringMessages();
assertEquals(0, result.size());
consSession.close();
prodSession.close();
connection1.close();
connection2.close();
}
protected QueueControl createManagementControl(final SimpleString address,
final SimpleString queue) throws Exception {
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST, this.mBeanServer);
return queueControl;
}
}