diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index b367fa5571..7c081fba7b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -541,6 +541,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor { SessionId sessionId = id.getParentId(); ConnectionId connectionId = sessionId.getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); + if (cs == null) { + throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " + + connectionId); + } SessionState ss = cs.getSessionState(sessionId); if (ss == null) { throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " @@ -576,6 +580,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { ConnectionId connectionId = id.getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); + if (cs == null) { + throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); + } SessionState session = cs.getSessionState(id); if (session == null) { throw new IllegalStateException("Cannot remove session that had not been registered: " + id); @@ -643,7 +650,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } registerConnectionState(info.getConnectionId(), state); - LOG.debug("Setting up new connection: " + getRemoteAddress()); + LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress()); // Setup the context. String clientId = info.getClientId(); context = new ConnectionContext(); @@ -680,6 +687,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws InterruptedException { + LOG.debug("remove connection id: " + id); TransportConnectionState cs = lookupConnectionState(id); if (cs != null) { // Don't allow things to be added to the connection state while we diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index e205d1cf5b..b7f897b12d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -85,9 +85,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i clearIterator(true); recovered = true; } else { - if (LOG.isDebugEnabled()) { - LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message); - } + LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message); storeHasMessages = true; } return recovered; diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index be9558a3d0..85b7162b94 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -118,6 +118,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { // Restore the connections. for (Iterator iter = connectionStates.values().iterator(); iter.hasNext();) { ConnectionState connectionState = iter.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("conn: " + connectionState.getInfo().getConnectionId()); + } transport.oneway(connectionState.getInfo()); restoreTempDestinations(transport, connectionState); @@ -173,6 +176,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { // Restore the connection's sessions for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) { SessionState sessionState = (SessionState)iter2.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("session: " + sessionState.getInfo().getSessionId()); + } transport.oneway(sessionState.getInfo()); if (restoreProducers) { @@ -207,6 +213,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { // Restore the session's producers for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) { ProducerState producerState = (ProducerState)iter3.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("producer: " + producerState.getInfo().getProducerId()); + } transport.oneway(producerState.getInfo()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index e12ab5a69d..37a07a8787 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -81,4 +81,6 @@ public interface JDBCAdapter { void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception; long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; + + void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit, JDBCMessageIdScanListener listener) throws SQLException, IOException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java new file mode 100755 index 0000000000..4b3e569ed8 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.jdbc; + +import org.apache.activemq.command.MessageId; + +public interface JDBCMessageIdScanListener { + boolean messageId(MessageId id); +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index d6a43ac62d..f342d2e7a2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -18,8 +18,6 @@ package org.apache.activemq.store.jdbc; import java.io.IOException; import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.ActiveMQMessageAudit; @@ -28,10 +26,8 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; import org.apache.activemq.store.AbstractMessageStore; import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequenceData; import org.apache.activemq.util.IOExceptionSupport; @@ -57,8 +53,28 @@ public class JDBCMessageStore extends AbstractMessageStore { this.adapter = adapter; this.wireFormat = wireFormat; this.audit = audit; + initAudit(); } + /* + * revisit: This can be destination agnostic and back in the jdbc persistence adapter start + */ + public void initAudit() { + if (audit != null) { + try { + TransactionContext c = persistenceAdapter.getTransactionContext(null); + adapter.doMessageIdScan(c, destination, 100, new JDBCMessageIdScanListener() { + public boolean messageId(MessageId id) { + audit.isDuplicate(id); + return true; + } + }); + } catch (Exception e) { + LOG.error("Failed to reload store message audit for queue store " + destination); + } + } + } + public void addMessage(ConnectionContext context, Message message) throws IOException { MessageId messageId = message.getMessageId(); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java index d73d1f0a36..b05c613ceb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -145,6 +145,16 @@ public class Statements { } return findAllMessagesStatement; } + + public String getFindAllMessageIds() { + // this needs to be limited maybe need to use getFindLastSequenceIdInMsgsStatement + // and work back for X + if (findAllMessagesStatement == null) { + findAllMessagesStatement = "SELECT ID, MSGID_PROD, MSGID_SEQ FROM " + getFullMessageTableName() + + " WHERE CONTAINER=? ORDER BY ID DESC"; + } + return findAllMessagesStatement; + } public String getFindLastSequenceIdInMsgsStatement() { if (findLastSequenceIdInMsgsStatement == null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 030b337ddb..e08e970e6b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -33,6 +33,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.jdbc.JDBCAdapter; +import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.Statements; @@ -326,6 +327,27 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } + public void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit, + JDBCMessageIdScanListener listener) throws SQLException, IOException { + PreparedStatement s = null; + ResultSet rs = null; + try { + s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIds()); + s.setString(1, destination.getQualifiedName()); + // limit the query. just need the the last few messages that could be replayed + // on recovery. send or commit reply lost so it gets replayed. + rs = s.executeQuery(); + while (rs.next()) { + if (!listener.messageId(new MessageId(rs.getString(2), rs.getLong(3)))) { + break; + } + } + } finally { + close(rs); + close(s); + } + } + public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq) throws SQLException, IOException { PreparedStatement s = c.getUpdateLastAckStatement(); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index 97c1a60da3..ba5b001e4d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -18,10 +18,14 @@ package org.apache.activemq.transport.failover; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; @@ -34,10 +38,10 @@ import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.TransactionId; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.After; -import org.junit.Before; import org.junit.Test; // see https://issues.apache.org/activemq/browse/AMQ-2473 @@ -48,7 +52,6 @@ public class FailoverTransactionTest { private String url = "tcp://localhost:61616"; BrokerService broker; - @Before public void startCleanBroker() throws Exception { startBroker(true); } @@ -69,13 +72,13 @@ public class FailoverTransactionTest { broker = new BrokerService(); broker.setUseJmx(false); broker.addConnector(url); - broker.setDeleteAllMessagesOnStartup(true); + broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); return broker; } - @Test + //@Test public void testFailoverProducerCloseBeforeTransaction() throws Exception { - + startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); Connection connection = cf.createConnection(); connection.start(); @@ -103,10 +106,20 @@ public class FailoverTransactionTest { @Test public void testFailoverCommitReplyLost() throws Exception { - - broker.stop(); + doTestFailoverCommitReplyLost(false); + } + + @Test + public void testFailoverCommitReplyLostJdbc() throws Exception { + doTestFailoverCommitReplyLost(true); + } + + public void doTestFailoverCommitReplyLost(boolean useJdbcPersistenceAdapter) throws Exception { broker = createBroker(true); + if (useJdbcPersistenceAdapter) { + broker.setPersistenceAdapter(new JDBCPersistenceAdapter()); + } broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() { @Override @@ -141,13 +154,15 @@ public class FailoverTransactionTest { TextMessage message = session.createTextMessage("Test message"); producer.send(message); - + + final CountDownLatch commitDoneLatch = new CountDownLatch(1); // broker will die on commit reply so this will hang till restart Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("doing async commit..."); try { session.commit(); + commitDoneLatch.countDown(); LOG.info("done async commit"); } catch (Exception e) { e.printStackTrace(); @@ -155,18 +170,54 @@ public class FailoverTransactionTest { } }); + // will be stopped by the plugin broker.waitUntilStopped(); - startBroker(false); + broker = createBroker(false); + if (useJdbcPersistenceAdapter) { + broker.setPersistenceAdapter(new JDBCPersistenceAdapter()); + } + broker.start(); - assertNotNull("we got the message", consumer.receive(20000)); + assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); + + // new transaction + Message msg = consumer.receive(20000); + LOG.info("Received: " + msg); + assertNotNull("we got the message", msg); assertNull("we got just one message", consumer.receive(2000)); - session.commit(); + session.commit(); + consumer.close(); + connection.close(); + + // ensure no dangling messages with fresh broker etc + broker.stop(); + broker.waitUntilStopped(); + + LOG.info("Checking for remaining/hung messages.."); + broker = createBroker(false); + if (useJdbcPersistenceAdapter) { + broker.setPersistenceAdapter(new JDBCPersistenceAdapter()); + } + broker.start(); + + // after restart, ensure no dangling messages + cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + connection = cf.createConnection(); + connection.start(); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session2.createConsumer(destination); + msg = consumer.receive(1000); + if (msg == null) { + msg = consumer.receive(5000); + } + LOG.info("Received: " + msg); + assertNull("no messges left dangling but got: " + msg, msg); connection.close(); } @Test public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception { - + startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false"); Connection connection = cf.createConnection(); connection.start(); @@ -188,15 +239,15 @@ public class FailoverTransactionTest { session.commit(); - // withough tracking producers, message will not be replayed on recovery - assertNull("we got the message", consumer.receive(2000)); + // without tracking producers, message will not be replayed on recovery + assertNull("we got the message", consumer.receive(5000)); session.commit(); connection.close(); } @Test public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception { - + startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); Connection connection = cf.createConnection(); connection.start();