diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 8df11e2f6a..047c9e337d 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -85,13 +85,7 @@ import org.apache.activemq.store.kahadb.disk.util.Sequence; import org.apache.activemq.store.kahadb.disk.util.SequenceSet; import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.Callback; -import org.apache.activemq.util.DataByteArrayInputStream; -import org.apache.activemq.util.DataByteArrayOutputStream; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.util.ServiceSupport; +import org.apache.activemq.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -231,6 +225,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private boolean enableIndexDiskSyncs = true; private boolean enableIndexRecoveryFile = true; private boolean enableIndexPageCaching = true; + ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); public MessageDatabase() { } @@ -393,20 +388,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void close() throws IOException, InterruptedException { if( opened.compareAndSet(true, false)) { - this.indexLock.writeLock().lock(); + checkpointLock.writeLock().lock(); try { if (metadata.page != null) { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, true); - } - }); + checkpointUpdate(true); } pageFile.unload(); metadata = new Metadata(); } finally { - this.indexLock.writeLock().unlock(); + checkpointLock.writeLock().unlock(); } journal.close(); synchronized (checkpointThreadLock) { @@ -844,16 +834,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if( !opened.get() ) { return; } - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, cleanup); - } - }); } finally { this.indexLock.writeLock().unlock(); } - + checkpointUpdate(cleanup); long end = System.currentTimeMillis(); if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { if (LOG.isInfoEnabled()) { @@ -862,21 +846,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - public void checkpoint(Callback closure) throws Exception { - this.indexLock.writeLock().lock(); - try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, false); - } - }); - closure.execute(); - } finally { - this.indexLock.writeLock().unlock(); - } - } - public ByteSequence toByteSequence(JournalCommand data) throws IOException { int size = data.serializedSizeFramed(); DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); @@ -912,17 +881,26 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } try { ByteSequence sequence = toByteSequence(data); - long start = System.currentTimeMillis(); - Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; - long start2 = System.currentTimeMillis(); - process(data, location, after); - long end = System.currentTimeMillis(); - if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { - if (LOG.isInfoEnabled()) { - LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); - } - } + Location location; + checkpointLock.readLock().lock(); + try { + + long start = System.currentTimeMillis(); + location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; + long start2 = System.currentTimeMillis(); + process(data, location, after); + + long end = System.currentTimeMillis(); + if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { + if (LOG.isInfoEnabled()) { + LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); + } + } + + } finally{ + checkpointLock.readLock().unlock(); + } if (after != null) { Runnable afterCompletion = null; synchronized (orderedTransactionAfters) { @@ -1384,6 +1362,26 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + private void checkpointUpdate(final boolean cleanup) throws IOException { + checkpointLock.writeLock().lock(); + try { + this.indexLock.writeLock().lock(); + try { + pageFile.tx().execute(new Transaction.Closure() { + @Override + public void execute(Transaction tx) throws IOException { + checkpointUpdate(tx, cleanup); + } + }); + } finally { + this.indexLock.writeLock().unlock(); + } + + } finally { + checkpointLock.writeLock().unlock(); + } + } + /** * @param tx * @throws IOException diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java new file mode 100644 index 0000000000..b2488fa215 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertTrue; + +public class AMQ4368Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4368Test.class); + + private BrokerService broker; + private ActiveMQConnectionFactory connectionFactory; + private final Destination destination = new ActiveMQQueue("large_message_queue"); + private String connectionUri; + + @Before + public void setUp() throws Exception { + broker = createBroker(); + connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); + broker.start(); + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + + PolicyEntry policy = new PolicyEntry(); + policy.setUseCache(false); + broker.setDestinationPolicy(new PolicyMap()); + broker.getDestinationPolicy().setDefaultEntry(policy); + + KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter(); + kahadb.setChecksumJournalFiles(true); + kahadb.setCheckForCorruptJournalFiles(true); + kahadb.setCleanupInterval(1000); + + kahadb.deleteAllMessages(); + broker.setPersistenceAdapter(kahadb); + broker.getSystemUsage().getMemoryUsage().setLimit(1024*1024*100); + return broker; + } + + abstract class Client implements Runnable { + private final String name; + final AtomicBoolean done = new AtomicBoolean(); + CountDownLatch doneLatch = new CountDownLatch(1); + Connection connection; + Session session; + final AtomicLong size = new AtomicLong(); + + Client(String name) { + this.name = name; + } + + public void start() { + LOG.info("Starting: " + name); + new Thread(this, name).start(); + } + + public void stopAsync() { + done.set(true); + } + + public void stop() throws InterruptedException { + stopAsync(); + if (!doneLatch.await(20, TimeUnit.MILLISECONDS)) { + try { + connection.close(); + doneLatch.await(); + } catch (Exception e) { + } + } + } + + @Override + public void run() { + try { + connection = createConnection(); + connection.start(); + try { + session = createSession(); + work(); + } finally { + try { + connection.close(); + } catch (JMSException ignore) { + } + LOG.info("Stopped: " + name); + } + } catch (Exception e) { + e.printStackTrace(); + done.set(true); + } finally { + doneLatch.countDown(); + } + } + + protected Session createSession() throws JMSException { + return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + protected Connection createConnection() throws JMSException { + return connectionFactory.createConnection(); + } + + abstract protected void work() throws Exception; + } + + class ProducingClient extends Client { + + ProducingClient(String name) { + super(name); + } + + private String createMessage() { + StringBuffer stringBuffer = new StringBuffer(); + for (long i = 0; i < 1000000; i++) { + stringBuffer.append("1234567890"); + } + return stringBuffer.toString(); + } + + @Override + protected void work() throws Exception { + String data = createMessage(); + MessageProducer producer = session.createProducer(destination); + while (!done.get()) { + producer.send(session.createTextMessage(data)); + long i = size.incrementAndGet(); + if ((i % 1000) == 0) { + LOG.info("produced " + i + "."); + } + } + } + } + + class ConsumingClient extends Client { + + public ConsumingClient(String name) { + super(name); + } + + @Override + protected void work() throws Exception { + MessageConsumer consumer = session.createConsumer(destination); + while (!done.get()) { + Message msg = consumer.receive(100); + if (msg != null) { + size.incrementAndGet(); + } + } + } + } + + @Test + public void testENTMQ220() throws InterruptedException, JMSException { + LOG.info("Start test."); + + ProducingClient producer1 = new ProducingClient("1"); + ProducingClient producer2 = new ProducingClient("2"); + ConsumingClient listener1 = new ConsumingClient("subscriber-1"); + try { + + producer1.start(); + producer2.start(); + listener1.start(); + + long lastSize = listener1.size.get(); + for (int i = 0; i < 10; i++) { + Thread.sleep(2000); + long size = listener1.size.get(); + LOG.info("Listener 1: consumed: " + (size - lastSize)); + assertTrue("No messages received on iteration: " + i, size > lastSize); + lastSize = size; + } + } finally { + LOG.info("Stopping clients"); + producer1.stop(); + producer2.stop(); + listener1.stop(); + } + } +} \ No newline at end of file