git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@906054 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-02-03 14:07:08 +00:00
parent 568ab1ebe6
commit 5170a8bba3
8 changed files with 227 additions and 28 deletions

View File

@ -498,6 +498,9 @@
<exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
<!-- used just to test potential memory leaks manually -->
<exclude>**/JDBCTestMemory.*</exclude>
<exclude>**/amq1490/*</exclude>
<exclude>**/AMQ1925*</exclude>
<exclude>**/archive/*</exclude>

View File

@ -86,7 +86,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
clearIterator(true);
recovered = true;
} else {
LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
if (LOG.isDebugEnabled()) {
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
}
storeHasMessages = true;
}
return recovered;
@ -160,6 +162,10 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
recoverMessage(node.getMessage(),true);
lastCachedId = node.getMessageId();
} else {
if (lastCachedId != null && node.getMessageId().getBrokerSequenceId() < lastCachedId.getBrokerSequenceId()) {
lastCachedId = node.getMessageId();
setBatch(lastCachedId);
}
if (cacheEnabled) {
cacheEnabled=false;
if (LOG.isDebugEnabled()) {

View File

@ -231,8 +231,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
if (listener.hasSpace()) {
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
lastMessageId.set(sequenceId);
if (listener.recoverMessage(msg)) {
lastMessageId.set(sequenceId);
}
return true;
}
return false;

View File

@ -345,7 +345,7 @@ public class Statements {
public String getFindNextMessagesStatement() {
if (findNextMessagesStatement == null) {
findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
+ " WHERE CONTAINER=? AND ID >= ? ORDER BY ID";
}
return findNextMessagesStatement;
}

View File

@ -742,9 +742,6 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
long id = 0;
List<Long> cleanupIds = new ArrayList<Long>();
int index = 0;
try {
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
s.setMaxRows(maxReturned * 2);
@ -754,15 +751,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
int count = 0;
if (this.statements.isUseExternalMessageReferences()) {
while (rs.next() && count < maxReturned) {
id = rs.getLong(1);
if (this.lastRecoveredMessagesIds.contains(id)) {
// this message was already recovered
cleanupIds.add(id);
continue;
}
if (listener.recoverMessageReference(rs.getString(1))) {
count++;
this.lastRecoveredMessagesIds.add(id);
} else {
LOG.debug("Stopped recover next messages");
break;
@ -770,27 +760,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
}
} else {
while (rs.next() && count < maxReturned) {
id = rs.getLong(1);
if (this.lastRecoveredMessagesIds.contains(id)) {
// this message was already recovered
cleanupIds.add(id);
continue;
}
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++;
this.lastRecoveredMessagesIds.add(id);
} else {
LOG.debug("Stopped recover next messages");
break;
}
}
}
// not cleanup the list of recovered messages
index = 0;
Iterator<Long> it = cleanupIds.iterator();
while (it.hasNext() && index < count) {
this.lastRecoveredMessagesIds.remove(it.next());
}
} catch (Exception e) {
e.printStackTrace();
} finally {

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.cursors;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Properties;
@ -155,7 +156,7 @@ public class NegativeQueueTest extends TestCase {
MessageProducer producer = session.createProducer(queue);
List<TextMessage> senderList = new ArrayList<TextMessage>();
for (int i = 0; i < MESSAGE_COUNT; i++) {
TextMessage msg = session.createTextMessage(formatter.format(new Date()));
TextMessage msg = session.createTextMessage(i + " " + formatter.format(new Date()));
senderList.add(msg);
producer.send(msg);
if(TRANSACTED) session.commit();
@ -268,6 +269,7 @@ public class NegativeQueueTest extends TestCase {
super.tearDown();
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}
@ -285,6 +287,7 @@ public class NegativeQueueTest extends TestCase {
BrokerService answer = new BrokerService();
configureBroker(answer);
answer.start();
answer.waitUntilStarted();
return answer;
}

View File

@ -0,0 +1,75 @@
package org.apache.activemq.store.jdbc;
import java.io.PrintStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.cursors.NegativeQueueTest;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCNegativeQueueTest extends NegativeQueueTest {
EmbeddedDataSource dataSource;
protected void configureBroker(BrokerService answer) throws Exception {
super.configureBroker(answer);
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
answer.setPersistenceAdapter(jdbc);
}
protected void tearDown() throws Exception {
/*Connection conn = dataSource.getConnection();
printQuery(conn, "Select * from ACTIVEMQ_MSGS", System.out); */
super.tearDown();
}
private void printQuery(Connection c, String query, PrintStream out)
throws SQLException {
printQuery(c.prepareStatement(query), out);
}
private void printQuery(PreparedStatement s, PrintStream out)
throws SQLException {
ResultSet set = null;
try {
set = s.executeQuery();
ResultSetMetaData metaData = set.getMetaData();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
if (i == 1)
out.print("||");
out.print(metaData.getColumnName(i) + "||");
}
out.println();
while (set.next()) {
for (int i = 1; i <= metaData.getColumnCount(); i++) {
if (i == 1)
out.print("|");
out.print(set.getString(i) + "|");
}
out.println();
}
} finally {
try {
set.close();
} catch (Throwable ignore) {
}
try {
s.close();
} catch (Throwable ignore) {
}
}
}
}

View File

@ -0,0 +1,134 @@
package org.apache.activemq.store.jdbc;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCTestMemory extends TestCase {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn;
Session sess;
Destination dest;
BrokerService broker;
protected void setUp() throws Exception {
broker = createBroker();
broker.start();
broker.waitUntilStarted();
}
protected void tearDown() throws Exception {
broker.stop();
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setUseJmx(true);
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
EmbeddedDataSource dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
jdbc.deleteAllMessages();
broker.setPersistenceAdapter(jdbc);
broker.addConnector("tcp://0.0.0.0:61616");
return broker;
}
protected BrokerService createRestartedBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setUseJmx(true);
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
EmbeddedDataSource dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
broker.setPersistenceAdapter(jdbc);
broker.addConnector("tcp://0.0.0.0:61616");
return broker;
}
public void init() throws Exception {
conn = factory.createConnection();
conn.start();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest = sess.createQueue("test");
}
public void testRecovery() throws Exception {
init();
MessageProducer producer = sess.createProducer(dest);
for (int i = 0; i < 1000; i++) {
producer.send(sess.createTextMessage("test"));
}
producer.close();
sess.close();
conn.close();
broker.stop();
broker.waitUntilStopped();
broker = createRestartedBroker();
broker.start();
broker.waitUntilStarted();
init();
for (int i = 0; i < 10; i++) {
new Thread("Producer " + i) {
public void run() {
try {
MessageProducer producer = sess.createProducer(dest);
for (int i = 0; i < 15000; i++) {
producer.send(sess.createTextMessage("test"));
if (i % 100 == 0) {
System.out.println(getName() + " sent message " + i);
}
}
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
new Thread("Consumer " + i) {
public void run() {
try {
MessageConsumer consumer = sess.createConsumer(dest);
for (int i = 0; i < 15000; i++) {
consumer.receive(2000);
if (i % 100 == 0) {
System.out.println(getName() + " received message " + i);
}
}
consumer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
// Check out JConsole
System.in.read();
sess.close();
conn.close();
}
}