merging 820268,821106 - jdbc recovery

git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@821112 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-10-02 17:50:55 +00:00
parent 2b50ad6378
commit 4ae5d3b704
8 changed files with 659 additions and 3 deletions

View File

@ -22,13 +22,11 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
@ -57,7 +55,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class); private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
protected Statements statements; protected Statements statements;
protected boolean batchStatments = true; protected boolean batchStatments = true;
private Set<Long> lastRecoveredMessagesIds = Collections.synchronizedSet(new TreeSet<Long>()); private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
s.setBytes(index, data); s.setBytes(index, data);
@ -738,6 +736,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
this.lastRecoveredMessagesIds.add(id); this.lastRecoveredMessagesIds.add(id);
} else { } else {
LOG.debug("Stopped recover next messages"); LOG.debug("Stopped recover next messages");
break;
} }
} }
} else { } else {
@ -753,6 +752,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
this.lastRecoveredMessagesIds.add(id); this.lastRecoveredMessagesIds.add(id);
} else { } else {
LOG.debug("Stopped recover next messages"); LOG.debug("Stopped recover next messages");
break;
} }
} }
} }

View File

@ -94,6 +94,21 @@
<artifactId>spring-jms</artifactId> <artifactId>spring-jms</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.Random;
import javax.jms.ConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class ConsumerThread extends Thread {
private DefaultMessageListenerContainer container;
private MessageDrivenPojo messageListener;
private boolean run;
private String destination;
private ConnectionFactory connectionFactory;
private boolean durable;
private int concurrentConsumers;
private boolean sessionTransacted;
private boolean pubSubDomain;
private boolean running;
private Log log = LogFactory.getLog(ConsumerThread.class);
private int numberOfQueues;
private String consumerName;
@Override
public void run() {
run = true;
createContainer();
container.initialize();
container.start();
running = true;
while (run) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
container.stop();
container.destroy();
if (connectionFactory instanceof SingleConnectionFactory) {
((SingleConnectionFactory)connectionFactory).destroy();
}
log.info("ConsumerThread1 closing down");
}
private DefaultMessageListenerContainer createContainer() {
Random generator = new Random(consumerName.hashCode());
int queueSuffix = generator.nextInt(numberOfQueues);
container = new DefaultMessageListenerContainer();
container.setPubSubDomain(pubSubDomain);
container.setDestinationName(destination + queueSuffix);
container.setMessageListener(messageListener);
container.setConnectionFactory(connectionFactory);
container.setConcurrentConsumers(concurrentConsumers);
container.setSessionTransacted(sessionTransacted);
//container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
//container.setMaxConcurrentConsumers(concurrentConsumers);
//container.setAcceptMessagesWhileStopping(false);
//container.setAutoStartup(false);
//without setting a tx manager, this will use local JMS tx.
/*
if (durable) {
container.setSubscriptionDurable(true);
container.setDurableSubscriptionName("ConsumerThread1" + Thread.currentThread().getId());
}
*/
container.afterPropertiesSet();
log.info("subscribing to " + destination + queueSuffix);
return container;
}
/**
* @param messageListener the messageListener to set
*/
public void setMessageDrivenPojo(MessageDrivenPojo messageListener) {
this.messageListener = messageListener;
}
/**
* @param run the run to set
*/
public void setRun(boolean run) {
this.run = run;
}
/**
* @param destination the destination to set
*/
public void setDestination(String destination) {
this.destination = destination;
}
public void setNumberOfQueues(int no) {
this.numberOfQueues = no;
}
public int getNumberOfQueues() {
return this.numberOfQueues;
}
public void setConsumerName(String name) {
this.consumerName = name;
}
/**
* @param connectionFactory the connectionFactory to set
*/
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
/**
* @param durable the durable to set
*/
public void setDurable(boolean durable) {
this.durable = durable;
}
/**
* @param concurrentConsumers the concurrentConsumers to set
*/
public void setConcurrentConsumers(int concurrentConsumers) {
this.concurrentConsumers = concurrentConsumers;
}
/**
* @param sessionTransacted the sessionTransacted to set
*/
public void setSessionTransacted(boolean sessionTransacted) {
this.sessionTransacted = sessionTransacted;
}
/**
* @param pubSubDomain the pubSubDomain to set
*/
public void setPubSubDomain(boolean pubSubDomain) {
this.pubSubDomain = pubSubDomain;
}
/**
* @return the messageListener
*/
public MessageDrivenPojo getMessageDrivenPojo() {
return messageListener;
}
public boolean isRunning() {
return running;
}
}

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.ArrayList;
import java.util.List;
import javax.jms.ConnectionFactory;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class JDBCSpringTest extends TestCase {
private static Log log = LogFactory.getLog(JDBCSpringTest.class);
int numberOfConsumerThreads = 20;
int numberOfProducerThreads = 20;
int numberOfMessages = 50;
int numberOfQueues = 5;
String url = "tcp://localhost:61616";
BrokerService broker;
public void setUp() throws Exception {
broker = BrokerFactory.createBroker("xbean:activemq-spring-jdbc.xml");
broker.start();
broker.waitUntilStarted();
}
protected void tearDown() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
public void testJDBCSpringTest() throws Exception {
log.info("Using " + numberOfConsumerThreads + " consumers, " +
numberOfProducerThreads + " producers, " +
numberOfMessages + " messages per publisher, and " +
numberOfQueues + " queues.");
ConnectionFactory connectionFactory;
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
prefetch.setQueuePrefetch(1);
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(url);
amq.setPrefetchPolicy(prefetch);
connectionFactory = new PooledConnectionFactory(amq);
((PooledConnectionFactory)connectionFactory).setMaxConnections(5);
StringBuffer buffer = new StringBuffer();
for (int i=0; i<2048; i++) {
buffer.append(".");
}
String twoKbMessage = buffer.toString();
List<ProducerThread> ProducerThreads = new ArrayList<ProducerThread>();
for (int i=0; i<numberOfProducerThreads; i++) {
ProducerThread thread = new ProducerThread();
thread.setMessage(twoKbMessage);
thread.setNumberOfMessagesToSend(numberOfMessages);
thread.setNumberOfQueues(numberOfQueues);
thread.setQueuePrefix("AMQ-2436.queue.");
thread.setConnectionFactory(connectionFactory);
thread.setSendDelay(100);
ProducerThreads.add(thread);
}
List<Thread> ConsumerThreads = new ArrayList<Thread>();
for (int i=0; i<numberOfConsumerThreads; i++) {
ConsumerThread thread = new ConsumerThread();
MessageDrivenPojo mdp1 = new MessageDrivenPojo();
thread.setMessageDrivenPojo(mdp1);
thread.setConcurrentConsumers(1);
thread.setConnectionFactory(connectionFactory);
thread.setDestination("AMQ-2436.queue.");
thread.setPubSubDomain(false);
thread.setSessionTransacted(true);
thread.setNumberOfQueues(numberOfQueues);
thread.setConsumerName("consumer" + i);
ConsumerThreads.add(thread);
thread.start();
while (!thread.isRunning()) {
Thread.sleep(200);
}
}
Thread.sleep(5000);
for (ProducerThread thread : ProducerThreads) {
thread.start();
}
boolean finished = false;
int retry = 0;
int previous = 0;
while (!finished) {
int totalMessages = 0;
retry++;
for (Thread thread : ConsumerThreads) {
totalMessages += ((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
}
log.info(totalMessages + " received so far...");
if (totalMessages != 0 && previous == totalMessages) {
for (Thread thread : ConsumerThreads) {
((ConsumerThread)thread).setRun(false);
}
Thread.sleep(3000);
fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
}
previous = totalMessages;
if (totalMessages >= (numberOfMessages * numberOfProducerThreads)) {
finished = true;
log.info("Received all " + totalMessages + " messages. Finishing.");
for (Thread thread : ConsumerThreads) {
((ConsumerThread)thread).setRun(false);
}
for (Thread thread : ConsumerThreads) {
thread.join();
}
} else {
Thread.sleep(10000);
}
}
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageDrivenPojo implements MessageListener, Serializable {
private Log log = LogFactory.getLog(MessageDrivenPojo.class);
private AtomicInteger messageCount = new AtomicInteger();
/*
* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
public void onMessage(Message message) {
messageCount.incrementAndGet();
if (log.isDebugEnabled()) {
try {
logMessage(message);
} catch (Exception e) {
log.error("Error:", e);
}
}
try {
Thread.sleep(200);
} catch (InterruptedException ex ) {
log.error(ex);
}
}
private void logMessage(Message message) throws Exception {
StringBuffer buffer = new StringBuffer();
buffer.append("\nJMSMessageID:");
buffer.append(message.getJMSMessageID());
buffer.append("\nJMSCorrelationID:");
buffer.append(message.getJMSMessageID());
buffer.append("\nMessage Contents:\n");
if (message instanceof TextMessage) {
buffer.append(((TextMessage)message).getText());
} else {
buffer.append(message.toString());
}
log.debug(buffer.toString());
}
/**
* @return the stats
*/
protected int getMessageCount() {
return messageCount.get();
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.Random;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ProducerThread extends Thread {
private JmsTemplate jmsTemplate;
private int numberOfTopics;
private int numberOfMessagesToSend;
private int messagesSent;
private Random generator = new Random();
private String queuePrefix;
private ConnectionFactory connectionFactory;
private String message;
private MessageCreator messageCreator;
private int sendDelay;
private Log log = LogFactory.getLog(ProducerThread.class);
@Override
public void run() {
initialize();
while (messagesSent < numberOfMessagesToSend) {
int queueSuffix = generator.nextInt(numberOfTopics);
jmsTemplate.send(queuePrefix + queueSuffix, messageCreator);
messagesSent++;
log.debug(Thread.currentThread().getName() +
": sent msg #" + messagesSent);
try {
Thread.sleep(sendDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("ProducerThread shutting down.");
}
private void initialize() {
jmsTemplate = new JmsTemplate();
jmsTemplate.setPubSubDomain(false);
jmsTemplate.setConnectionFactory(connectionFactory);
messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
};
}
/**
* @param numberOfTopics the numberOfTopics to set
*/
protected void setNumberOfQueues(int numberOfTopics) {
this.numberOfTopics = numberOfTopics;
}
/**
* @param queuePrefix the queuePrefix to set
*/
protected void setQueuePrefix(String queuePrefix) {
this.queuePrefix = queuePrefix;
}
/**
* @param connectionFactory the connectionFactory to set
*/
protected void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
/**
* @param message the message to set
*/
protected void setMessage(String message) {
this.message = message;
}
/**
* @param numberOfMessagesToSend the numberOfMessagesToSend to set
*/
protected void setNumberOfMessagesToSend(int numberOfMessagesToSend) {
this.numberOfMessagesToSend = numberOfMessagesToSend;
}
public void setSendDelay(int sendDelay) {
this.sendDelay = sendDelay;
}
public int getMessagesSent() {
return messagesSent;
}
}

View File

@ -0,0 +1,69 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false" deleteAllMessagesOnStartup="true">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="target/"/>
</persistenceAdapter>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" memoryLimit="10240"/>
<policyEntry topic=">" memoryLimit="10240">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="102400"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
</transportConnectors>
</broker>
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
<property name="databaseName" value="derbydb"/>
<property name="createDatabase" value="create"/>
</bean>
</beans>
<!-- END SNIPPET: example -->

View File

@ -0,0 +1,35 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.spring=WARN
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
# File appender
log4j.appender.out=org.apache.log4j.FileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
log4j.appender.out.file=target/activemq-test.log
log4j.appender.out.append=true