mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2437 - jdbc recovery
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@821106 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1d5bcaf274
commit
4e1d383555
|
@ -738,6 +738,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
this.lastRecoveredMessagesIds.add(id);
|
||||
} else {
|
||||
LOG.debug("Stopped recover next messages");
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -753,6 +754,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
|
|||
this.lastRecoveredMessagesIds.add(id);
|
||||
} else {
|
||||
LOG.debug("Stopped recover next messages");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ public class ConsumerThread extends Thread {
|
|||
private boolean running;
|
||||
private Log log = LogFactory.getLog(ConsumerThread.class);
|
||||
private int numberOfQueues;
|
||||
private String consumerName;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -69,7 +70,7 @@ public class ConsumerThread extends Thread {
|
|||
}
|
||||
|
||||
private DefaultMessageListenerContainer createContainer() {
|
||||
Random generator = new Random();
|
||||
Random generator = new Random(consumerName.hashCode());
|
||||
int queueSuffix = generator.nextInt(numberOfQueues);
|
||||
|
||||
|
||||
|
@ -94,6 +95,7 @@ public class ConsumerThread extends Thread {
|
|||
}
|
||||
*/
|
||||
container.afterPropertiesSet();
|
||||
log.info("subscribing to " + destination + queueSuffix);
|
||||
return container;
|
||||
}
|
||||
|
||||
|
@ -125,6 +127,11 @@ public class ConsumerThread extends Thread {
|
|||
public int getNumberOfQueues() {
|
||||
return this.numberOfQueues;
|
||||
}
|
||||
|
||||
|
||||
public void setConsumerName(String name) {
|
||||
this.consumerName = name;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param connectionFactory the connectionFactory to set
|
||||
|
|
|
@ -36,9 +36,9 @@ public class JDBCSpringTest extends TestCase {
|
|||
|
||||
private static Log log = LogFactory.getLog(JDBCSpringTest.class);
|
||||
|
||||
int numberOfConsumerThreads = 50;
|
||||
int numberOfProducerThreads = 50;
|
||||
int numberOfMessages = 100;
|
||||
int numberOfConsumerThreads = 20;
|
||||
int numberOfProducerThreads = 20;
|
||||
int numberOfMessages = 50;
|
||||
int numberOfQueues = 5;
|
||||
String url = "tcp://localhost:61616";
|
||||
|
||||
|
@ -46,7 +46,6 @@ public class JDBCSpringTest extends TestCase {
|
|||
|
||||
public void setUp() throws Exception {
|
||||
broker = BrokerFactory.createBroker("xbean:activemq-spring-jdbc.xml");
|
||||
//broker.deleteAllMessages();
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
}
|
||||
|
@ -87,7 +86,7 @@ public class JDBCSpringTest extends TestCase {
|
|||
thread.setMessage(twoKbMessage);
|
||||
thread.setNumberOfMessagesToSend(numberOfMessages);
|
||||
thread.setNumberOfQueues(numberOfQueues);
|
||||
thread.setQueuePrefix("DEV-1786.queue.");
|
||||
thread.setQueuePrefix("AMQ-2436.queue.");
|
||||
thread.setConnectionFactory(connectionFactory);
|
||||
thread.setSendDelay(100);
|
||||
ProducerThreads.add(thread);
|
||||
|
@ -100,10 +99,11 @@ public class JDBCSpringTest extends TestCase {
|
|||
thread.setMessageDrivenPojo(mdp1);
|
||||
thread.setConcurrentConsumers(1);
|
||||
thread.setConnectionFactory(connectionFactory);
|
||||
thread.setDestination("DEV-1786.queue.");
|
||||
thread.setDestination("AMQ-2436.queue.");
|
||||
thread.setPubSubDomain(false);
|
||||
thread.setSessionTransacted(true);
|
||||
thread.setNumberOfQueues(numberOfQueues);
|
||||
thread.setConsumerName("consumer" + i);
|
||||
ConsumerThreads.add(thread);
|
||||
thread.start();
|
||||
|
||||
|
@ -121,6 +121,7 @@ public class JDBCSpringTest extends TestCase {
|
|||
|
||||
boolean finished = false;
|
||||
int retry = 0;
|
||||
int previous = 0;
|
||||
while (!finished) {
|
||||
|
||||
int totalMessages = 0;
|
||||
|
@ -128,8 +129,17 @@ public class JDBCSpringTest extends TestCase {
|
|||
for (Thread thread : ConsumerThreads) {
|
||||
totalMessages += ((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
|
||||
}
|
||||
log.info(totalMessages + " received so far...");
|
||||
if (totalMessages != 0 && previous == totalMessages) {
|
||||
for (Thread thread : ConsumerThreads) {
|
||||
((ConsumerThread)thread).setRun(false);
|
||||
}
|
||||
Thread.sleep(3000);
|
||||
fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
|
||||
}
|
||||
previous = totalMessages;
|
||||
|
||||
if (totalMessages == (numberOfMessages * numberOfProducerThreads)) {
|
||||
if (totalMessages >= (numberOfMessages * numberOfProducerThreads)) {
|
||||
finished = true;
|
||||
log.info("Received all " + totalMessages + " messages. Finishing.");
|
||||
|
||||
|
@ -141,11 +151,7 @@ public class JDBCSpringTest extends TestCase {
|
|||
}
|
||||
|
||||
} else {
|
||||
if (retry == 10) {
|
||||
fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
|
||||
}
|
||||
Thread.sleep(10000);
|
||||
log.info(totalMessages + " received so far...");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,16 +23,27 @@
|
|||
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
|
||||
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
|
||||
|
||||
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false">
|
||||
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false" deleteAllMessagesOnStartup="true">
|
||||
|
||||
<persistenceAdapter>
|
||||
<jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="traget/"/>
|
||||
<jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="target/"/>
|
||||
</persistenceAdapter>
|
||||
|
||||
<destinationPolicy>
|
||||
<policyMap>
|
||||
<policyEntries>
|
||||
<policyEntry queue=">" memoryLimit="10240"/>
|
||||
<policyEntry topic=">" memoryLimit="10240">
|
||||
</policyEntry>
|
||||
</policyEntries>
|
||||
</policyMap>
|
||||
</destinationPolicy>
|
||||
|
||||
|
||||
<systemUsage>
|
||||
<systemUsage>
|
||||
<memoryUsage>
|
||||
<memoryUsage limit="20 mb"/>
|
||||
<memoryUsage limit="102400"/>
|
||||
</memoryUsage>
|
||||
<storeUsage>
|
||||
<storeUsage limit="1 gb" name="foo"/>
|
||||
|
|
Loading…
Reference in New Issue