- The ack to the message store was being sent after the transaction commit. Not good. Fixed so that the ack gets sent to the message store

as it comes in.  This fixes teh failing jpa tests.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@643529 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-04-01 19:35:48 +00:00
parent 6f851c8cce
commit 00fb444810
9 changed files with 114 additions and 55 deletions

View File

@ -21,35 +21,85 @@
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<!-- Default configuration -->
<broker useJmx="false" xmlns="http://activemq.org/config/1.0">
<broker brokerName="broker1" useJmx="true" persistent="false" xmlns="http://activemq.org/config/1.0" useShutdownHook="false" monitorConnectionSplits="true">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
<persistenceAdapter>
<journaledJDBC journalLogFiles="2" dataDirectory="target/foo"/>
</persistenceAdapter>
</broker>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!-- Example of broker configuration that uses new logging options and dynamic management of logging
<broker useJmx="true" xmlns="http://activemq.org/config/1.0" persistent="false" deleteAllMessagesOnStartup="true">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616?trace=true&amp;logWriterName=custom&amp;dynamicManagement=true&amp;startLogging=true"/>
</transportConnectors>
<persistenceAdapter>
<memoryPersistenceAdapter/>
</persistenceAdapter>
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61626?socketBufferSize=256000)" userName="foo" password="bar" dynamicOnly="false" decreaseNetworkConsumerPriority="true">
<excludedDestinations>
<topic physicalName="bbm.batch.1"/>
<topic physicalName="intl.service.status"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616?socketBufferSize=256000"/>
<transportConnector uri="tcp://localhost:61618?socketBufferSize=256000"/>
</transportConnectors>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false" producerFlowControl="false">
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false" producerFlowControl="false">
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
</deadLetterStrategy>
</policyEntry>
<policyEntry topic="intl.bbm.batch.>" minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false" producerFlowControl="false">
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
</deadLetterStrategy>
<!--
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="${activemq.pending.message.limit}"/>
</pendingMessageLimitStrategy>
-->
<subscriptionRecoveryPolicy>
<fixedCountSubscriptionRecoveryPolicy maximumSize="10"/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="bbm.batch.>" minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false" producerFlowControl="false">
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" processNonPersistent="false"/>
</deadLetterStrategy>
<!--
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="${activemq.pending.message.limit}"/>
</pendingMessageLimitStrategy>
-->
<subscriptionRecoveryPolicy>
<fixedCountSubscriptionRecoveryPolicy maximumSize="10"/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="intl.service.status" minimumMessageSize="1" optimizedDispatch="true" lazyDispatch="false" producerFlowControl="false">
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="1gb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
</broker>
End of example-->
<!-- Note: the jmxPort=portnumber option on transportConnectors should only be used on clients.
On brokers, there is a default port (usually 1099) -->
</beans>
<!-- END SNIPPET: xbean -->

View File

@ -18,7 +18,7 @@
#
# The logging properties used during tests..
#
log4j.rootLogger=DEBUG, stdout
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.activemq.spring=WARN
# CONSOLE appender, not used by default

View File

@ -996,14 +996,36 @@ public class Queue extends BaseDestination implements Task {
removeMessage(c, null, r, ack);
}
protected void removeMessage(ConnectionContext context,Subscription sub,QueueMessageReference reference,MessageAck ack) throws IOException {
reference.drop();
protected void removeMessage(ConnectionContext context,Subscription sub,final QueueMessageReference reference,MessageAck ack) throws IOException {
reference.setAcked(true);
// This sends the ack the the journal..
acknowledge(context, sub, ack, reference);
destinationStatistics.getMessages().decrement();
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
if (!ack.isInTransaction()) {
reference.drop();
destinationStatistics.getMessages().decrement();
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
wakeup();
} else {
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
reference.drop();
destinationStatistics.getMessages().decrement();
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
wakeup();
}
public void afterRollback() throws Exception {
reference.setAcked(false);
}
});
}
wakeup();
}
public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference reference) {

View File

@ -51,20 +51,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
final Destination q = n.getRegionDestination();
final QueueMessageReference node = (QueueMessageReference)n;
final Queue queue = (Queue)q;
if (!ack.isInTransaction()) {
queue.removeMessage(context, this, node, ack);
} else {
node.setAcked(true);
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
queue.removeMessage(context, QueueSubscription.this, node, ack);
}
public void afterRollback() throws Exception {
node.setAcked(false);
}
});
}
queue.removeMessage(context, this, node, ack);
}
protected boolean canDispatch(MessageReference n) throws IOException {

View File

@ -39,7 +39,7 @@ public class JPARecoveryBrokerTest extends RecoveryBrokerTest {
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
pa.setEntityManagerProperties(props);
service.setPersistenceAdapter(pa);
return service;
@ -53,7 +53,7 @@ public class JPARecoveryBrokerTest extends RecoveryBrokerTest {
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
pa.setEntityManagerProperties(props);
service.setPersistenceAdapter(pa);
return service;

View File

@ -40,7 +40,7 @@ public class QuickJPAStoreRecoveryBrokerTest extends RecoveryBrokerTest {
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);
@ -57,7 +57,7 @@ public class QuickJPAStoreRecoveryBrokerTest extends RecoveryBrokerTest {
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);

View File

@ -48,7 +48,7 @@ public class QuickJPAStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);
@ -65,7 +65,7 @@ public class QuickJPAStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);

View File

@ -36,7 +36,7 @@
<prop key="openjpa.ConnectionDriverName">org.apache.derby.jdbc.EmbeddedDriver</prop>
<prop key="openjpa.ConnectionURL">jdbc:derby:activemq-data/derby;create=true</prop>
<prop key="openjpa.jdbc.SynchronizeMappings">buildSchema</prop>
<prop key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>
<!-- <prop key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop> -->
</props>
</property>
</bean>

View File

@ -39,7 +39,7 @@
<prop key="openjpa.ConnectionDriverName">org.apache.derby.jdbc.EmbeddedDriver</prop>
<prop key="openjpa.ConnectionURL">jdbc:derby:activemq-data/derby;create=true</prop>
<prop key="openjpa.jdbc.SynchronizeMappings">buildSchema</prop>
<prop key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>
<!-- <prop key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop> -->
</props>
</property>
</bean>