mirror of https://github.com/apache/activemq.git
Merge pull request #1075 from cshannon/AMQ-9343
AMQ-9343 - Reduce memory used for in flight transactions
This commit is contained in:
commit
961067ec19
|
@ -572,6 +572,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
|
|||
}
|
||||
}
|
||||
}, null);
|
||||
|
||||
/*
|
||||
* After we store the command in the journal we no longer need to keep the message
|
||||
* on the command, and we can clear the field here.
|
||||
*
|
||||
* The reason to clear the message is that for messages added as part of a transaction the command
|
||||
* will be added to the inflightTransactions map as a pending add operation.
|
||||
* For long-running transactions and/or transactions with a lot of pending messages
|
||||
* (or large messages) this can use up a decent amount of memory which can increase GC pressure.
|
||||
*
|
||||
* The commands are only tracked in the map so that the KahaDB index can be updated later
|
||||
* on transaction commit, but updating the index only requires metadata from the command
|
||||
* such as message id or destination and not the message itself, so we can safely clear the field.
|
||||
*
|
||||
* Note that on broker restart and recovery of the KahaDB journal the pending message
|
||||
* adds for transactions will be loaded again and the memory won't be cleared in that case.
|
||||
* This could be revisited in the future if an issue but that should not be a large
|
||||
* issue because that's only done on first startup and during recovery and then
|
||||
* after the broker is recovered the memory footprint will drop. Also, as of now, recovering
|
||||
* XA transactions in the transaction broker requires loading the messages and acks anyway
|
||||
* for processing, so we need to load the full message and keep it in the pending operation.
|
||||
*/
|
||||
command.clearMessage();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,142 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import static junit.framework.TestCase.assertNotNull;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.URI;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import jakarta.jms.JMSException;
|
||||
import jakarta.jms.MessageProducer;
|
||||
import jakarta.jms.Queue;
|
||||
import jakarta.jms.Session;
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.store.kahadb.MessageDatabase.AddOperation;
|
||||
import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
|
||||
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class KahaDBInFlightTxMemoryUsageTest {
|
||||
static final Logger LOG = LoggerFactory.getLogger(KahaDBInFlightTxMemoryUsageTest.class);
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
|
||||
|
||||
private BrokerService broker;
|
||||
private URI brokerConnectURI;
|
||||
|
||||
private Map<TransactionId, List<Operation<?>>> inflightTransactions;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Before
|
||||
public void startBroker() throws Exception {
|
||||
broker = new BrokerService();
|
||||
broker.setPersistent(true);
|
||||
broker.setDataDirectoryFile(dataFileDir.getRoot());
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
|
||||
//set up a transport
|
||||
TransportConnector connector = broker
|
||||
.addConnector(new TransportConnector());
|
||||
connector.setUri(new URI("tcp://0.0.0.0:0"));
|
||||
connector.setName("tcp");
|
||||
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
|
||||
|
||||
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
|
||||
Field inflightField = MessageDatabase.class.getDeclaredField("inflightTransactions");
|
||||
inflightField.setAccessible(true);
|
||||
|
||||
inflightTransactions = (LinkedHashMap<TransactionId, List<Operation<?>>>)
|
||||
inflightField.get(adapter.getStore());
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopBroker() throws Exception {
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testKahaDBInFlightTxMessagesClearedFromMemory() throws JMSException {
|
||||
final String queueName = "test.queue";
|
||||
|
||||
final ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory(brokerConnectURI)
|
||||
.createConnection();
|
||||
// sync send so messages are immediately sent for testing, normally transacted sessions async send
|
||||
connection.setAlwaysSyncSend(true);
|
||||
connection.start();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Queue queue = session.createQueue(queueName);
|
||||
|
||||
try {
|
||||
// Should be empty before any sends
|
||||
assertTrue(inflightTransactions.isEmpty());
|
||||
|
||||
// Send 10 transacted messages but don't commit so they are pending
|
||||
MessageProducer prod = session.createProducer(queue);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
prod.send(session.createTextMessage("test"));
|
||||
}
|
||||
|
||||
// Check the inflight transactions map to verify the pending operations had messages cleared
|
||||
assertEquals(inflightTransactions.size(), 1);
|
||||
List<Operation<?>> pendingOps = inflightTransactions.values().stream().findFirst().orElseThrow();
|
||||
assertEquals(10, pendingOps.size());
|
||||
|
||||
for (Operation<?> pendingOp : pendingOps) {
|
||||
// all 10 ops should be AddOperation
|
||||
assertTrue(pendingOp instanceof AddOperation);
|
||||
KahaAddMessageCommand command = ((AddOperation)pendingOp).getCommand();
|
||||
assertNotNull(pendingOp.getLocation());
|
||||
assertNotNull(command);
|
||||
assertNotNull(command.getMessageId());
|
||||
assertNotNull(command.getDestination());
|
||||
|
||||
// Message should now be null when in the pending list
|
||||
assertNull(command.getMessage());
|
||||
}
|
||||
|
||||
// assert cleared after successful commit
|
||||
session.commit();
|
||||
assertTrue(inflightTransactions.isEmpty());
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue