resolve https://issues.apache.org/activemq/browse/AMQ-2473 - issue with jmstemplate were producers are closed before transaction commits which is fine save when failover ocurrs. the producers need to be replayed to allow tracked messages to be replayed. added the capability to track and relay transaction producers. Can be disabled if producers out live transactions

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@891582 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-12-17 08:42:52 +00:00
parent be01a5bd80
commit b9e51d6492
6 changed files with 230 additions and 9 deletions

View File

@ -29,7 +29,6 @@ import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
@ -61,6 +60,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
private boolean restoreProducers = true;
private boolean restoreTransaction = true;
private boolean trackMessages = true;
private boolean trackTransactionProducers = true;
private int maxCacheSize = 128 * 1024;
private int currentCacheSize;
private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){
@ -136,18 +136,31 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
TransactionState transactionState = (TransactionState)iter.next();
for (TransactionState transactionState : connectionState.getTransactionStates()) {
if (LOG.isDebugEnabled()) {
LOG.debug("tx: " + transactionState.getId());
}
for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
Command command = (Command)iterator.next();
for (ProducerState producerState : transactionState.getProducerStates().values()) {
if (LOG.isDebugEnabled()) {
LOG.debug("tx replay producer :" + producerState.getInfo());
}
transport.oneway(producerState.getInfo());
}
for (Command command : transactionState.getCommands()) {
if (LOG.isDebugEnabled()) {
LOG.debug("tx replay: " + command);
}
transport.oneway(command);
}
for (ProducerState producerState : transactionState.getProducerStates().values()) {
if (LOG.isDebugEnabled()) {
LOG.debug("tx remove replayed producer :" + producerState.getInfo());
}
transport.oneway(producerState.getInfo().createRemoveCommand());
}
}
}
@ -350,13 +363,22 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
public Response processMessage(Message send) throws Exception {
if (send != null) {
if (trackTransactions && send.getTransactionId() != null) {
ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
ProducerId producerId = send.getProducerId();
ConnectionId connectionId = producerId.getParentId().getParentId();
if (connectionId != null) {
ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
if (transactionState != null) {
transactionState.addCommand(send);
if (trackTransactionProducers) {
// for jmstemplate, track the producer in case it is closed before commit
// and needs to be replayed
SessionState ss = cs.getSessionState(producerId.getParentId());
ProducerState producerState = ss.getProducerState(producerId);
producerState.setTransactionState(transactionState);
}
}
}
}
@ -501,6 +523,14 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
this.trackTransactions = trackTransactions;
}
public boolean isTrackTransactionProducers() {
return this.trackTransactionProducers;
}
public void setTrackTransactionProducers(boolean trackTransactionProducers) {
this.trackTransactionProducers = trackTransactionProducers;
}
public boolean isRestoreTransaction() {
return restoreTransaction;
}

View File

@ -21,6 +21,7 @@ import org.apache.activemq.command.ProducerInfo;
public class ProducerState {
final ProducerInfo info;
private TransactionState transactionState;
public ProducerState(ProducerInfo info) {
this.info = info;
@ -34,4 +35,11 @@ public class ProducerState {
return info;
}
public void setTransactionState(TransactionState transactionState) {
this.transactionState = transactionState;
}
public TransactionState getTransactionState() {
return transactionState;
}
}

View File

@ -50,7 +50,14 @@ public class SessionState {
}
public ProducerState removeProducer(ProducerId id) {
return producers.remove(id);
ProducerState producerState = producers.remove(id);
if (producerState != null) {
if (producerState.getTransactionState() != null) {
// allow the transaction to recreate dependent producer on recovery
producerState.getTransactionState().addProducerState(producerState);
}
}
return producerState;
}
public void addConsumer(ConsumerInfo info) {

View File

@ -18,9 +18,12 @@ package org.apache.activemq.state;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.TransactionId;
public class TransactionState {
@ -30,6 +33,7 @@ public class TransactionState {
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private boolean prepared;
private int preparedResult;
private final Map<ProducerId, ProducerState> producers = new ConcurrentHashMap<ProducerId, ProducerState>();
public TransactionState(TransactionId id) {
this.id = id;
@ -78,4 +82,14 @@ public class TransactionState {
return preparedResult;
}
public void addProducerState(ProducerState producerState) {
if (producerState != null) {
producers.put(producerState.getInfo().getProducerId(), producerState);
}
}
public Map<ProducerId, ProducerState> getProducerStates() {
return producers;
}
}

View File

@ -95,6 +95,7 @@ public class FailoverTransport implements CompositeTransport {
private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
private int backupPoolSize=1;
private boolean trackMessages = false;
private boolean trackTransactionProducers = true;
private int maxCacheSize = 128 * 1024;
private TransportListener disposedListener = new DefaultTransportListener() {};
@ -233,6 +234,7 @@ public class FailoverTransport implements CompositeTransport {
started = true;
stateTracker.setMaxCacheSize(getMaxCacheSize());
stateTracker.setTrackMessages(isTrackMessages());
stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
if (connectedTransport.get() != null) {
stateTracker.restore(connectedTransport.get());
} else {
@ -372,6 +374,14 @@ public class FailoverTransport implements CompositeTransport {
this.trackMessages = trackMessages;
}
public boolean isTrackTransactionProducers() {
return this.trackTransactionProducers;
}
public void setTrackTransactionProducers(boolean trackTransactionProducers) {
this.trackTransactionProducers = trackTransactionProducers;
}
public int getMaxCacheSize() {
return maxCacheSize;
}

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.failover;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
// see https://issues.apache.org/activemq/browse/AMQ-2473
public class FailoverTransactionTest {
private static final String QUEUE_NAME = "test.FailoverTransactionTest";
private String url = "tcp://localhost:61616";
BrokerService broker;
@Before
public void startCleanBroker() throws Exception {
startBroker(true);
}
@After
public void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
}
}
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
broker.addConnector(url);
broker.setDeleteAllMessagesOnStartup(true);
broker.start();
}
@Test
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("Test message");
producer.send(message);
// close producer before commit, emulate jmstemplate
producer.close();
// restart to force failover and connection state recovery before the commit
broker.stop();
startBroker(false);
session.commit();
assertNotNull("we got the message", consumer.receive(20000));
session.commit();
connection.close();
}
@Test
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("Test message");
producer.send(message);
// close producer before commit, emulate jmstemplate
producer.close();
// restart to force failover and connection state recovery before the commit
broker.stop();
startBroker(false);
session.commit();
// withough tracking producers, message will not be replayed on recovery
assertNull("we got the message", consumer.receive(2000));
session.commit();
connection.close();
}
@Test
public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer;
TextMessage message;
final int count = 10;
for (int i=0; i<count; i++) {
producer = session.createProducer(destination);
message = session.createTextMessage("Test message: " + count);
producer.send(message);
producer.close();
}
// restart to force failover and connection state recovery before the commit
broker.stop();
startBroker(false);
session.commit();
for (int i=0; i<count; i++) {
assertNotNull("we got all the message: " + count, consumer.receive(20000));
}
session.commit();
connection.close();
}
}