From d8f8ae9f925b905968c51b386049c5b2d9b6b2d0 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 11 Jan 2018 12:56:40 +0000 Subject: [PATCH] [AMQ-6815] rework to drop the batch reference from Location such that batches are free for gc when index pages are agressively cached (cherry picked from commit ec6fa190999160676cab900038b268b2d40a4d5c) --- .../store/kahadb/MessageDatabase.java | 4 +- .../store/kahadb/disk/journal/Location.java | 13 ++- .../DataFileAppenderNoSpaceNoBatchTest.java | 2 +- .../org/apache/activemq/bugs/AMQ6815Test.java | 95 +++++++++++++++++++ 4 files changed, 106 insertions(+), 8 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index b391de700c..94de6ea640 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -2132,8 +2132,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); try { location.getLatch().await(); - if (location.getBatch().exception.get() != null) { - throw location.getBatch().exception.get(); + if (location.getException().get() != null) { + throw location.getException().get(); } } catch (InterruptedException e) { throw new InterruptedIOException(e.toString()); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java index f3da47a56e..673d9f68fd 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java @@ -20,6 +20,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; /** * Used as a location in the data store. @@ -36,7 +37,8 @@ public final class Location implements Comparable { private int offset = NOT_SET; private int size = NOT_SET; private byte type = NOT_SET_TYPE; - private DataFileAppender.WriteBatch batch; + private CountDownLatch latch; + private AtomicReference exception; public Location() { } @@ -114,11 +116,12 @@ public final class Location implements Comparable { } public CountDownLatch getLatch() { - return batch.latch; + return latch; } public void setBatch(DataFileAppender.WriteBatch batch) { - this.batch = batch; + this.latch = batch.latch; + this.exception = batch.exception; } public int compareTo(Location o) { @@ -142,7 +145,7 @@ public final class Location implements Comparable { return dataFileId ^ offset; } - public DataFileAppender.WriteBatch getBatch() { - return batch; + public AtomicReference getException() { + return exception; } } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java index a6b19eeed3..6d778c3e84 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java @@ -183,7 +183,7 @@ public class DataFileAppenderNoSpaceNoBatchTest { boolean someExceptions = false; for (Location location: locations) { - someExceptions |= (location.getBatch().exception != null); + someExceptions |= (location.getException().get() != null); } assertTrue(someExceptions); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java new file mode 100644 index 0000000000..0b41195962 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertTrue; + +public class AMQ6815Test { + static final Logger LOG = LoggerFactory.getLogger(AMQ6815Test.class); + private final static int MEM_LIMIT = 5*1024*1024; + private final static byte[] payload = new byte[5*1024]; + + protected BrokerService brokerService; + protected Connection connection; + protected Session session; + protected Queue amqDestination; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + PolicyEntry policy = new PolicyEntry(); + policy.setMemoryLimit(MEM_LIMIT); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + brokerService.setDestinationPolicy(pMap); + + brokerService.start(); + connection = new ActiveMQConnectionFactory("vm://localhost").createConnection(); + connection.start(); + session = connection.createSession(false, ActiveMQSession.AUTO_ACKNOWLEDGE); + amqDestination = session.createQueue("QQ"); + } + + @After + public void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + brokerService.stop(); + } + + @Test(timeout = 60000) + public void testHeapUsage() throws Exception { + Runtime.getRuntime().gc(); + final long initUsedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + sendMessages(10000); + Runtime.getRuntime().gc(); + long usedMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory() - initUsedMemory; + LOG.info("Mem in use: " + usedMem/1024 + "K"); + assertTrue("Used Mem reasonable " + usedMem, usedMem < 5*MEM_LIMIT); + } + + protected void sendMessages(int count) throws JMSException { + MessageProducer producer = session.createProducer(amqDestination); + for (int i = 0; i < count; i++) { + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(payload); + producer.send(bytesMessage); + } + producer.close(); + } + +}