first cut of audit recovery for https://issues.apache.org/activemq/browse/AMQ-2540 - test case reproduces. Little more tidy up needed such that the query is limited and such that the audit is recovered at the persistence adapter rather than at the store level - also related to https://issues.apache.org/activemq/browse/AMQ-2473

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@892242 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-12-18 13:03:40 +00:00
parent c6de4d7e56
commit 8732f70793
9 changed files with 162 additions and 23 deletions

View File

@ -541,6 +541,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
SessionId sessionId = id.getParentId(); SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId(); ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId); TransportConnectionState cs = lookupConnectionState(connectionId);
if (cs == null) {
throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
+ connectionId);
}
SessionState ss = cs.getSessionState(sessionId); SessionState ss = cs.getSessionState(sessionId);
if (ss == null) { if (ss == null) {
throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
@ -576,6 +580,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
ConnectionId connectionId = id.getParentId(); ConnectionId connectionId = id.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId); TransportConnectionState cs = lookupConnectionState(connectionId);
if (cs == null) {
throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
}
SessionState session = cs.getSessionState(id); SessionState session = cs.getSessionState(id);
if (session == null) { if (session == null) {
throw new IllegalStateException("Cannot remove session that had not been registered: " + id); throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
@ -643,7 +650,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
} }
registerConnectionState(info.getConnectionId(), state); registerConnectionState(info.getConnectionId(), state);
LOG.debug("Setting up new connection: " + getRemoteAddress()); LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress());
// Setup the context. // Setup the context.
String clientId = info.getClientId(); String clientId = info.getClientId();
context = new ConnectionContext(); context = new ConnectionContext();
@ -680,6 +687,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
throws InterruptedException { throws InterruptedException {
LOG.debug("remove connection id: " + id);
TransportConnectionState cs = lookupConnectionState(id); TransportConnectionState cs = lookupConnectionState(id);
if (cs != null) { if (cs != null) {
// Don't allow things to be added to the connection state while we // Don't allow things to be added to the connection state while we

View File

@ -85,9 +85,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
clearIterator(true); clearIterator(true);
recovered = true; recovered = true;
} else { } else {
if (LOG.isDebugEnabled()) { LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
}
storeHasMessages = true; storeHasMessages = true;
} }
return recovered; return recovered;

View File

@ -118,6 +118,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
// Restore the connections. // Restore the connections.
for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) { for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
ConnectionState connectionState = iter.next(); ConnectionState connectionState = iter.next();
if (LOG.isDebugEnabled()) {
LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
}
transport.oneway(connectionState.getInfo()); transport.oneway(connectionState.getInfo());
restoreTempDestinations(transport, connectionState); restoreTempDestinations(transport, connectionState);
@ -173,6 +176,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
// Restore the connection's sessions // Restore the connection's sessions
for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) { for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
SessionState sessionState = (SessionState)iter2.next(); SessionState sessionState = (SessionState)iter2.next();
if (LOG.isDebugEnabled()) {
LOG.debug("session: " + sessionState.getInfo().getSessionId());
}
transport.oneway(sessionState.getInfo()); transport.oneway(sessionState.getInfo());
if (restoreProducers) { if (restoreProducers) {
@ -207,6 +213,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
// Restore the session's producers // Restore the session's producers
for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) { for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
ProducerState producerState = (ProducerState)iter3.next(); ProducerState producerState = (ProducerState)iter3.next();
if (LOG.isDebugEnabled()) {
LOG.debug("producer: " + producerState.getInfo().getProducerId());
}
transport.oneway(producerState.getInfo()); transport.oneway(producerState.getInfo());
} }
} }

View File

@ -81,4 +81,6 @@ public interface JDBCAdapter {
void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception; void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
} }

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import org.apache.activemq.command.MessageId;
public interface JDBCMessageIdScanListener {
boolean messageId(MessageId id);
}

View File

@ -18,8 +18,6 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.ActiveMQMessageAudit;
@ -28,10 +26,8 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.store.AbstractMessageStore; import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData; import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -57,8 +53,28 @@ public class JDBCMessageStore extends AbstractMessageStore {
this.adapter = adapter; this.adapter = adapter;
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
this.audit = audit; this.audit = audit;
initAudit();
} }
/*
* revisit: This can be destination agnostic and back in the jdbc persistence adapter start
*/
public void initAudit() {
if (audit != null) {
try {
TransactionContext c = persistenceAdapter.getTransactionContext(null);
adapter.doMessageIdScan(c, destination, 100, new JDBCMessageIdScanListener() {
public boolean messageId(MessageId id) {
audit.isDuplicate(id);
return true;
}
});
} catch (Exception e) {
LOG.error("Failed to reload store message audit for queue store " + destination);
}
}
}
public void addMessage(ConnectionContext context, Message message) throws IOException { public void addMessage(ConnectionContext context, Message message) throws IOException {
MessageId messageId = message.getMessageId(); MessageId messageId = message.getMessageId();

View File

@ -145,6 +145,16 @@ public class Statements {
} }
return findAllMessagesStatement; return findAllMessagesStatement;
} }
public String getFindAllMessageIds() {
// this needs to be limited maybe need to use getFindLastSequenceIdInMsgsStatement
// and work back for X
if (findAllMessagesStatement == null) {
findAllMessagesStatement = "SELECT ID, MSGID_PROD, MSGID_SEQ FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=? ORDER BY ID DESC";
}
return findAllMessagesStatement;
}
public String getFindLastSequenceIdInMsgsStatement() { public String getFindLastSequenceIdInMsgsStatement() {
if (findLastSequenceIdInMsgsStatement == null) { if (findLastSequenceIdInMsgsStatement == null) {

View File

@ -33,6 +33,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.jdbc.JDBCAdapter; import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.Statements; import org.apache.activemq.store.jdbc.Statements;
@ -326,6 +327,27 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
public void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit,
JDBCMessageIdScanListener listener) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIds());
s.setString(1, destination.getQualifiedName());
// limit the query. just need the the last few messages that could be replayed
// on recovery. send or commit reply lost so it gets replayed.
rs = s.executeQuery();
while (rs.next()) {
if (!listener.messageId(new MessageId(rs.getString(2), rs.getLong(3)))) {
break;
}
}
} finally {
close(rs);
close(s);
}
}
public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, long seq) throws SQLException, IOException { String subscriptionName, long seq) throws SQLException, IOException {
PreparedStatement s = c.getUpdateLastAckStatement(); PreparedStatement s = c.getUpdateLastAckStatement();

View File

@ -18,10 +18,14 @@ package org.apache.activemq.transport.failover;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
@ -34,10 +38,10 @@ import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.junit.After; import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
// see https://issues.apache.org/activemq/browse/AMQ-2473 // see https://issues.apache.org/activemq/browse/AMQ-2473
@ -48,7 +52,6 @@ public class FailoverTransactionTest {
private String url = "tcp://localhost:61616"; private String url = "tcp://localhost:61616";
BrokerService broker; BrokerService broker;
@Before
public void startCleanBroker() throws Exception { public void startCleanBroker() throws Exception {
startBroker(true); startBroker(true);
} }
@ -69,13 +72,13 @@ public class FailoverTransactionTest {
broker = new BrokerService(); broker = new BrokerService();
broker.setUseJmx(false); broker.setUseJmx(false);
broker.addConnector(url); broker.addConnector(url);
broker.setDeleteAllMessagesOnStartup(true); broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
return broker; return broker;
} }
@Test //@Test
public void testFailoverProducerCloseBeforeTransaction() throws Exception { public void testFailoverProducerCloseBeforeTransaction() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
Connection connection = cf.createConnection(); Connection connection = cf.createConnection();
connection.start(); connection.start();
@ -103,10 +106,20 @@ public class FailoverTransactionTest {
@Test @Test
public void testFailoverCommitReplyLost() throws Exception { public void testFailoverCommitReplyLost() throws Exception {
doTestFailoverCommitReplyLost(false);
broker.stop(); }
@Test
public void testFailoverCommitReplyLostJdbc() throws Exception {
doTestFailoverCommitReplyLost(true);
}
public void doTestFailoverCommitReplyLost(boolean useJdbcPersistenceAdapter) throws Exception {
broker = createBroker(true); broker = createBroker(true);
if (useJdbcPersistenceAdapter) {
broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
}
broker.setPlugins(new BrokerPlugin[] { broker.setPlugins(new BrokerPlugin[] {
new BrokerPluginSupport() { new BrokerPluginSupport() {
@Override @Override
@ -141,13 +154,15 @@ public class FailoverTransactionTest {
TextMessage message = session.createTextMessage("Test message"); TextMessage message = session.createTextMessage("Test message");
producer.send(message); producer.send(message);
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
// broker will die on commit reply so this will hang till restart // broker will die on commit reply so this will hang till restart
Executors.newSingleThreadExecutor().execute(new Runnable() { Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() { public void run() {
LOG.info("doing async commit..."); LOG.info("doing async commit...");
try { try {
session.commit(); session.commit();
commitDoneLatch.countDown();
LOG.info("done async commit"); LOG.info("done async commit");
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
@ -155,18 +170,54 @@ public class FailoverTransactionTest {
} }
}); });
// will be stopped by the plugin
broker.waitUntilStopped(); broker.waitUntilStopped();
startBroker(false); broker = createBroker(false);
if (useJdbcPersistenceAdapter) {
broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
}
broker.start();
assertNotNull("we got the message", consumer.receive(20000)); assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// new transaction
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
assertNotNull("we got the message", msg);
assertNull("we got just one message", consumer.receive(2000)); assertNull("we got just one message", consumer.receive(2000));
session.commit(); session.commit();
consumer.close();
connection.close();
// ensure no dangling messages with fresh broker etc
broker.stop();
broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages..");
broker = createBroker(false);
if (useJdbcPersistenceAdapter) {
broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
}
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session2.createConsumer(destination);
msg = consumer.receive(1000);
if (msg == null) {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close(); connection.close();
} }
@Test @Test
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception { public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false"); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
Connection connection = cf.createConnection(); Connection connection = cf.createConnection();
connection.start(); connection.start();
@ -188,15 +239,15 @@ public class FailoverTransactionTest {
session.commit(); session.commit();
// withough tracking producers, message will not be replayed on recovery // without tracking producers, message will not be replayed on recovery
assertNull("we got the message", consumer.receive(2000)); assertNull("we got the message", consumer.receive(5000));
session.commit(); session.commit();
connection.close(); connection.close();
} }
@Test @Test
public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception { public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
Connection connection = cf.createConnection(); Connection connection = cf.createConnection();
connection.start(); connection.start();