mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-2868 - rework to remove sync on transaction completion, cursor updates are now stacked so that they ocurr in order, independent of thread execution after waiting for the journal to complete a write. This ensures that the cursors are updates in the same order as the index while still working wo the index lock. TransactedConsumerTest shows horizontal scaling now works better with transactions. Reworked metadata.lastUpdate to always work with the existing index lock rather than reaquire, this may help with spurious gc journal data file issue on windows - https://issues.apache.org/jira/browse/AMQ-3470
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1163613 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
33eb944e18
commit
c6ed5ff237
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.camel;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageProducer;
|
||||
|
@ -28,7 +27,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
|
|||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.Processor;
|
||||
|
@ -88,9 +87,12 @@ public class TransactedConsumeTest extends CamelSpringTestSupport {
|
|||
|
||||
brokerService.setAdvisorySupport(false);
|
||||
brokerService.setDataDirectory("target/data");
|
||||
AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
|
||||
amq.setDirectory(new File("target/data"));
|
||||
brokerService.setPersistenceAdapter(amq);
|
||||
//AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
|
||||
//amq.setDirectory(new File("target/data"));
|
||||
//brokerService.setPersistenceAdapter(amq);
|
||||
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)
|
||||
brokerService.getPersistenceAdapter();
|
||||
kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false);
|
||||
brokerService.addConnector("tcp://localhost:61616");
|
||||
return brokerService;
|
||||
}
|
||||
|
|
|
@ -28,22 +28,17 @@
|
|||
<context:annotation-config/>
|
||||
|
||||
<bean id="vhfBatchListenerJMSConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
|
||||
<property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.all=1000"/>
|
||||
<property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.all=1"/>
|
||||
</bean>
|
||||
|
||||
<bean id="vhfBatchListenerPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
|
||||
<!-- match maxConnections to the number of routes that share the connection factory -->
|
||||
<property name="maxConnections" value="2"/>
|
||||
<!-- match maximumActive (which is active sessions) to num routes * concurrentConsumers in the MLC -->
|
||||
<property name="maximumActive" value="20"/>
|
||||
<!-- match maxConnections to the number of routes that share the connection factory -->
|
||||
<property name="maxConnections" value="10"/>
|
||||
<!-- match maximumActive (which is active sessions) >= concurrentConsumers in the MLC -->
|
||||
<property name="maximumActive" value="1"/>
|
||||
<property name="connectionFactory" ref="vhfBatchListenerJMSConnectionFactory"/>
|
||||
</bean>
|
||||
|
||||
<!-- bean id="vhfBatchListenerSingleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
|
||||
<property name="reconnectOnException" value="true" />
|
||||
<property name="targetConnectionFactory" ref="vhfBatchListenerJMSConnectionFactory" />
|
||||
</bean -->
|
||||
|
||||
<!-- JMS Transaction manager -->
|
||||
<bean id="vhfBatchListenerJMSTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
|
||||
<property name="connectionFactory" ref="vhfBatchListenerPooledConnectionFactory"/>
|
||||
|
@ -53,9 +48,8 @@
|
|||
<bean id="vhfBatchListenerJMSConfig" class="org.apache.camel.component.jms.JmsConfiguration">
|
||||
<property name="connectionFactory" ref="vhfBatchListenerPooledConnectionFactory"/>
|
||||
<property name="transactionManager" ref="vhfBatchListenerJMSTransactionManager"/>
|
||||
<property name="receiveTimeout" value="20000" />
|
||||
<property name="transacted" value="true"/>
|
||||
<property name="concurrentConsumers" value="10"/>
|
||||
<property name="concurrentConsumers" value="1"/>
|
||||
<property name="cacheLevelName" value="CACHE_CONSUMER"/>
|
||||
</bean>
|
||||
|
||||
|
@ -72,6 +66,30 @@
|
|||
<bean id="activemq2" class="org.apache.activemq.camel.component.ActiveMQComponent">
|
||||
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
|
||||
</bean>
|
||||
<bean id="activemq3" class="org.apache.activemq.camel.component.ActiveMQComponent">
|
||||
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
|
||||
</bean>
|
||||
<bean id="activemq4" class="org.apache.activemq.camel.component.ActiveMQComponent">
|
||||
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
|
||||
</bean>
|
||||
<bean id="activemq5" class="org.apache.activemq.camel.component.ActiveMQComponent">
|
||||
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
|
||||
</bean>
|
||||
<bean id="activemq6" class="org.apache.activemq.camel.component.ActiveMQComponent">
|
||||
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
|
||||
</bean>
|
||||
<bean id="activemq7" class="org.apache.activemq.camel.component.ActiveMQComponent">
|
||||
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
|
||||
</bean>
|
||||
<bean id="activemq8" class="org.apache.activemq.camel.component.ActiveMQComponent">
|
||||
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
|
||||
</bean>
|
||||
<bean id="activemq9" class="org.apache.activemq.camel.component.ActiveMQComponent">
|
||||
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
|
||||
</bean>
|
||||
<bean id="activemq10" class="org.apache.activemq.camel.component.ActiveMQComponent">
|
||||
<property name="configuration" ref="vhfBatchListenerJMSConfig"/>
|
||||
</bean>
|
||||
|
||||
<camelContext xmlns="http://camel.apache.org/schema/spring">
|
||||
<route>
|
||||
|
@ -79,11 +97,44 @@
|
|||
<process ref="connectionLog"/>
|
||||
</route>
|
||||
|
||||
<!-- better through put with a second route/connection once shared pool config matches concurrentConsumers -->
|
||||
<!-- better through put with a additional route/connection once shared pool config matches concurrentConsumers -->
|
||||
<route>
|
||||
<from uri="activemq2:queue:scp_transacted"/>
|
||||
<process ref="connectionLog"/>
|
||||
</route>
|
||||
<route>
|
||||
<from uri="activemq3:queue:scp_transacted"/>
|
||||
<process ref="connectionLog"/>
|
||||
</route>
|
||||
<route>
|
||||
<from uri="activemq4:queue:scp_transacted"/>
|
||||
<process ref="connectionLog"/>
|
||||
</route>
|
||||
<route>
|
||||
<from uri="activemq5:queue:scp_transacted"/>
|
||||
<process ref="connectionLog"/>
|
||||
</route>
|
||||
<route>
|
||||
<from uri="activemq6:queue:scp_transacted"/>
|
||||
<process ref="connectionLog"/>
|
||||
</route>
|
||||
<route>
|
||||
<from uri="activemq7:queue:scp_transacted"/>
|
||||
<process ref="connectionLog"/>
|
||||
</route>
|
||||
<route>
|
||||
<from uri="activemq8:queue:scp_transacted"/>
|
||||
<process ref="connectionLog"/>
|
||||
</route>
|
||||
<route>
|
||||
<from uri="activemq9:queue:scp_transacted"/>
|
||||
<process ref="connectionLog"/>
|
||||
</route>
|
||||
<route>
|
||||
<from uri="activemq10:queue:scp_transacted"/>
|
||||
<process ref="connectionLog"/>
|
||||
</route>
|
||||
|
||||
</camelContext>
|
||||
|
||||
<bean id="connectionLog" class="org.apache.activemq.camel.TransactedConsumeTest.ConnectionLog"/>
|
||||
|
|
|
@ -463,7 +463,6 @@
|
|||
<exclude>**/QuickJournalRecoveryBrokerTest.*</exclude>
|
||||
<exclude>**/QuickJournalXARecoveryBrokerTest.*</exclude>
|
||||
<exclude>**/RendezvousDiscoverTransportTest.*</exclude>
|
||||
<exclude>**/MissingDataFileTest.*</exclude>
|
||||
|
||||
<!-- m2 tests failing since move from assembly -->
|
||||
<exclude>**/QueueConsumerCloseAndReconnectTest.*</exclude>
|
||||
|
|
|
@ -286,13 +286,10 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
|
||||
} else {
|
||||
KahaTransactionInfo info = getTransactionInfo(txid);
|
||||
// ensure message order w.r.t to cursor and store for setBatch()
|
||||
synchronized (this) {
|
||||
for (Journal journal : theStore.getJournalManager().getJournals()) {
|
||||
theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
|
||||
}
|
||||
forgetRecoveredAcks(txid);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.error("Null transaction passed on commit");
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.activemq.command.ActiveMQDestination;
|
|||
import org.apache.activemq.command.ConnectionId;
|
||||
import org.apache.activemq.command.LocalTransactionId;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.command.SubscriptionInfo;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.XATransactionId;
|
||||
|
@ -678,7 +677,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
lastRecoveryPosition = nextRecoveryPosition;
|
||||
metadata.lastUpdate = lastRecoveryPosition;
|
||||
JournalCommand<?> message = load(journal, lastRecoveryPosition);
|
||||
process(message, lastRecoveryPosition);
|
||||
process(message, lastRecoveryPosition, (Runnable)null);
|
||||
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
|
||||
}
|
||||
} finally {
|
||||
|
@ -779,24 +778,30 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
long start = System.currentTimeMillis();
|
||||
Location location = journal.write(os.toByteSequence(), sync);
|
||||
long start2 = System.currentTimeMillis();
|
||||
process(data, location);
|
||||
process(data, location, after);
|
||||
long end = System.currentTimeMillis();
|
||||
if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
|
||||
LOG.info("Slow KahaDB access: Journal append took: " + (start2 - start) + " ms, Index update took " + (end - start2) + " ms");
|
||||
}
|
||||
|
||||
this.indexLock.writeLock().lock();
|
||||
try {
|
||||
metadata.lastUpdate = location;
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
if (after != null) {
|
||||
Runnable afterCompletion = null;
|
||||
synchronized (orderedTransactionAfters) {
|
||||
if (!orderedTransactionAfters.empty()) {
|
||||
afterCompletion = orderedTransactionAfters.pop();
|
||||
}
|
||||
}
|
||||
if (afterCompletion != null) {
|
||||
afterCompletion.run();
|
||||
} else {
|
||||
// non persistent message case
|
||||
after.run();
|
||||
}
|
||||
}
|
||||
|
||||
if (!checkpointThread.isAlive()) {
|
||||
startCheckpoint();
|
||||
}
|
||||
if (after != null) {
|
||||
after.run();
|
||||
}
|
||||
return location;
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("KahaDB failed to store to Journal", ioe);
|
||||
|
@ -831,7 +836,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
*/
|
||||
void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
|
||||
if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
|
||||
process(data, location);
|
||||
process(data, location, (Runnable) null);
|
||||
} else {
|
||||
// just recover producer audit
|
||||
data.visit(new Visitor() {
|
||||
|
@ -848,7 +853,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
// from the recovery method too so they need to be idempotent
|
||||
// /////////////////////////////////////////////////////////////////
|
||||
|
||||
void process(JournalCommand<?> data, final Location location) throws IOException {
|
||||
void process(JournalCommand<?> data, final Location location, final Runnable after) throws IOException {
|
||||
data.visit(new Visitor() {
|
||||
@Override
|
||||
public void visit(KahaAddMessageCommand command) throws IOException {
|
||||
|
@ -867,7 +872,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
|
||||
@Override
|
||||
public void visit(KahaCommitCommand command) throws IOException {
|
||||
process(command, location);
|
||||
process(command, location, after);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -884,6 +889,16 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
public void visit(KahaSubscriptionCommand command) throws IOException {
|
||||
process(command, location);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void visit(KahaProducerAuditCommand command) throws IOException {
|
||||
processLocation(location);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void visit(KahaTraceCommand command) {
|
||||
processLocation(location);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -950,7 +965,25 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
}
|
||||
|
||||
protected void process(KahaCommitCommand command, Location location) throws IOException {
|
||||
protected void processLocation(final Location location) {
|
||||
this.indexLock.writeLock().lock();
|
||||
try {
|
||||
metadata.lastUpdate = location;
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private final Stack<Runnable> orderedTransactionAfters = new Stack<Runnable>();
|
||||
private void push(Runnable after) {
|
||||
if (after != null) {
|
||||
synchronized (orderedTransactionAfters) {
|
||||
orderedTransactionAfters.push(after);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
|
||||
TransactionId key = key(command.getTransactionInfo());
|
||||
List<Operation> inflightTx;
|
||||
synchronized (inflightTransactions) {
|
||||
|
@ -973,6 +1006,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
}
|
||||
});
|
||||
metadata.lastUpdate = location;
|
||||
push(after);
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
|
@ -1046,6 +1081,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
// record this id in any event, initial send or recovery
|
||||
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
|
||||
metadata.lastUpdate = location;
|
||||
}
|
||||
|
||||
void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
|
||||
|
@ -1079,6 +1115,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
|
||||
}
|
||||
metadata.lastUpdate = ackLocation;
|
||||
}
|
||||
|
||||
Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
|
||||
|
@ -2224,6 +2261,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
cursor.lowPriorityCursorPosition = nextPosition.longValue();
|
||||
}
|
||||
} else {
|
||||
LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this);
|
||||
lastDefaultKey = sequence;
|
||||
cursor.defaultCursorPosition = nextPosition.longValue();
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import junit.framework.TestCase;
|
|||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.activemq.usage.SystemUsage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -44,7 +45,7 @@ public class MissingDataFileTest extends TestCase {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MissingDataFileTest.class);
|
||||
|
||||
private static int counter = 300;
|
||||
private static int counter = 500;
|
||||
|
||||
private static int hectorToHaloCtr;
|
||||
private static int xenaToHaloCtr;
|
||||
|
@ -94,12 +95,13 @@ public class MissingDataFileTest extends TestCase {
|
|||
|
||||
SystemUsage systemUsage;
|
||||
systemUsage = new SystemUsage();
|
||||
systemUsage.getMemoryUsage().setLimit(1024 * 1024); // Just a few messags
|
||||
systemUsage.getMemoryUsage().setLimit(10 * 1024 * 1024); // Just a few messags
|
||||
broker.setSystemUsage(systemUsage);
|
||||
|
||||
AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
|
||||
factory.setMaxFileLength(2*1024); // ~4 messages
|
||||
factory.setCleanupInterval(1000); // every few second
|
||||
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
|
||||
kahaDBPersistenceAdapter.setJournalMaxFileLength(16*1024);
|
||||
kahaDBPersistenceAdapter.setCleanupInterval(500);
|
||||
broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
|
||||
|
||||
broker.start();
|
||||
LOG.info("Starting broker..");
|
||||
|
|
Loading…
Reference in New Issue