AMQ-7118 This closes #327 - with thanks to Heath Kesler

This commit is contained in:
jgoodyear 2018-12-04 12:24:24 -03:30
parent 6d4459a00c
commit 612d4aeeb4
3 changed files with 349 additions and 1 deletions

View File

@ -148,6 +148,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected Location ackMessageFileMapLocation = null;
protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<>();
protected transient AtomicBoolean ackMessageFileMapDirtyFlag = new AtomicBoolean(false);
protected int version = VERSION;
protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
@ -825,6 +826,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
requiresReplay = false;
} catch (Exception e) {
LOG.warn("Cannot recover ackMessageFileMap", e);
@ -1631,6 +1633,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
referenceFileIds = new HashSet<>();
referenceFileIds.add(messageLocation.getDataFileId());
metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
} else {
Integer id = Integer.valueOf(messageLocation.getDataFileId());
if (!referenceFileIds.contains(id)) {
@ -1757,7 +1761,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
metadata.state = OPEN_STATE;
metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
if (metadata.ackMessageFileMapDirtyFlag.get() || (metadata.ackMessageFileMapLocation == null)) {
metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
}
metadata.ackMessageFileMapDirtyFlag.lazySet(false);
Location[] inProgressTxRange = getInProgressTxLocationRange();
metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
tx.store(metadata.page, metadataMarshaller, true);
@ -1928,6 +1935,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
if (gcCandidateSet.contains(candidate)) {
ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("not removing data file: " + candidate
@ -1942,6 +1950,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
for (Integer candidate : gcCandidateSet) {
for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
ackMessageFileMapMod |= ackFiles.remove(candidate);
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
}
}
if (ackMessageFileMapMod) {
@ -2146,6 +2155,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
referenceFileIds = new HashSet<>();
referenceFileIds.addAll(entry.getValue());
metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
} else {
referenceFileIds.addAll(entry.getValue());
}
@ -2154,6 +2164,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// remove the old location data from the ack map so that the old journal log file can
// be removed on next GC.
metadata.ackMessageFileMap.remove(journalToRead);
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
indexLock.writeLock().unlock();

View File

@ -0,0 +1,228 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.lang.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.io.File;
import java.io.FilenameFilter;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class AMQ7118Test {
protected static final Logger LOG = LoggerFactory.getLogger(AMQ7118Test.class);
protected static Random r = new Random();
final static String WIRE_LEVEL_ENDPOINT = "tcp://localhost:61616";
protected BrokerService broker;
protected Connection producerConnection;
protected Session pSession;
protected Connection cConnection;
protected Session cSession;
private final String xbean = "xbean:";
private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq7118";
int checkpointIndex = 0;
private static final ActiveMQConnectionFactory ACTIVE_MQ_CONNECTION_FACTORY = new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT);
@Before
public void setup() throws Exception {
deleteData(new File("target/data"));
createBroker();
}
@After
public void shutdown() throws Exception {
broker.stop();
}
public void setupProducerConnection() throws Exception {
producerConnection = ACTIVE_MQ_CONNECTION_FACTORY.createConnection();
producerConnection.start();
pSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void setupConsumerConnection() throws Exception {
cConnection = ACTIVE_MQ_CONNECTION_FACTORY.createConnection();
cConnection.setClientID("myClient1");
cConnection.start();
cSession = cConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
private void createBroker() throws Exception {
broker = new BrokerService();
broker = BrokerFactory.createBroker(xbean + confBase + "/activemq.xml");
broker.start();
}
@Test
public void testCompaction() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
setupProducerConnection();
setupConsumerConnection();
Topic topic = pSession.createTopic("test");
MessageConsumer consumer = cSession.createDurableSubscriber(topic, "clientId1");
LOG.info("Produce message to test topic");
produce(pSession, topic, 1, 512 ); // just one message
LOG.info("Consume message from test topic");
Message msg = consumer.receive(5000);
assertNotNull(msg);
LOG.info("Produce more messages to test topic and get into PFC");
boolean sent = produce(cSession, topic, 20, 512 * 1024); // Fill the store
assertFalse("Never got to PFC condition", sent);
LOG.info("PFC hit");
//We hit PFC, so shut down the producer
producerConnection.close();
//Lets check the db-*.log file count before checkpointUpdate
checkFiles(false, 21, "db-21.log");
// Force checkFiles update
checkFiles(true, 23, "db-23.log");
//The ackMessageFileMap should be clean, so no more writing
checkFiles(true, 23, "db-23.log");
//One more time just to be sure - The ackMessageFileMap should be clean, so no more writing
checkFiles(true, 23, "db-23.log");
//Read out the rest of the messages
LOG.info("Consuming the rest of the files...");
for (int i = 0; i < 20; i++) {
msg = consumer.receive(5000);
}
LOG.info("All messages Consumed.");
//Clean up the log files and be sure its stable
checkFiles(true, 2, "db-33.log");
checkFiles(true, 3, "db-34.log");
checkFiles(true, 2, "db-34.log");
checkFiles(true, 2, "db-34.log");
checkFiles(true, 2, "db-34.log");
broker.stop();
broker.waitUntilStopped();
}
protected static boolean produce(Session session, Topic topic, int messageCount, int messageSize) throws JMSException {
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < messageCount; i++) {
TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", messageSize));
try {
producer.send(helloMessage);
} catch (ResourceAllocationException e){
return false;
}
}
return true;
}
private void deleteData(File file) {
String[] entries = file.list();
if (entries == null) return;
for (String s : entries) {
File currentFile = new File(file.getPath(), s);
if (currentFile.isDirectory()) {
deleteData(currentFile);
}
currentFile.delete();
}
file.delete();
}
private void checkFiles(boolean doCheckpoint, int expectedCount, String lastFileName) throws Exception {
File dbfiles = new File("target/data/kahadb");
FilenameFilter lff = new FilenameFilter(){
@Override
public boolean accept(File dir, String name) {
return name.toLowerCase().startsWith("db-") && name.toLowerCase().endsWith("log");
}
};
if(doCheckpoint) {
LOG.info("Initiating checkpointUpdate "+ ++checkpointIndex + " ...");
broker.getPersistenceAdapter().checkpoint(true);
TimeUnit.SECONDS.sleep(2);
LOG.info("Checkpoint complete.");
}
File files[] = dbfiles.listFiles(lff);
Arrays.sort(files, new DBFileComparator() );
logfiles(files);
assertEquals(expectedCount, files.length);
assertEquals(lastFileName, files[files.length-1].getName());
}
private void logfiles(File[] files){
LOG.info("Files found in KahaDB:");
for (File file : files) {
LOG.info(" " + file.getName());
}
}
private class DBFileComparator implements Comparator<File> {
@Override
public int compare(File o1, File o2) {
int n1 = extractNumber(o1.getName());
int n2 = extractNumber(o2.getName());
return n1 - n2;
}
private int extractNumber(String name) {
int i = 0;
try {
int s = name.indexOf('-')+1;
int e = name.lastIndexOf('.');
String number = name.substring(s, e);
i = Integer.parseInt(number);
} catch(Exception e) {
i = 0; // if filename does not match the format
// then default to 0
}
return i;
}
}
}

View File

@ -0,0 +1,109 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/data">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="target/data/kahadb" journalMaxFileLength="1k"/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage sendFailIfNoSpace="true">
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="10 mb" total="10000000"/>
</storeUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
</beans>
<!-- END SNIPPET: example -->