mirror of https://github.com/apache/activemq.git
AMQ-9343 - Reduce memory used for in flight transactions
This commit will reduce the memory required in KahaDB for long running transactions and transactions with a lot of pending message sends by clearing out the message memory when no longer needed instead of keeping it tracked in the pending map
This commit is contained in:
parent
07cce0609b
commit
c3bef84be5
|
@ -572,6 +572,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, null);
|
}, null);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* After we store the command in the journal we no longer need to keep the message
|
||||||
|
* on the command, and we can clear the field here.
|
||||||
|
*
|
||||||
|
* The reason to clear the message is that for messages added as part of a transaction the command
|
||||||
|
* will be added to the inflightTransactions map as a pending add operation.
|
||||||
|
* For long-running transactions and/or transactions with a lot of pending messages
|
||||||
|
* (or large messages) this can use up a decent amount of memory which can increase GC pressure.
|
||||||
|
*
|
||||||
|
* The commands are only tracked in the map so that the KahaDB index can be updated later
|
||||||
|
* on transaction commit, but updating the index only requires metadata from the command
|
||||||
|
* such as message id or destination and not the message itself, so we can safely clear the field.
|
||||||
|
*
|
||||||
|
* Note that on broker restart and recovery of the KahaDB journal the pending message
|
||||||
|
* adds for transactions will be loaded again and the memory won't be cleared in that case.
|
||||||
|
* This could be revisited in the future if an issue but that should not be a large
|
||||||
|
* issue because that's only done on first startup and during recovery and then
|
||||||
|
* after the broker is recovered the memory footprint will drop. Also, as of now, recovering
|
||||||
|
* XA transactions in the transaction broker requires loading the messages and acks anyway
|
||||||
|
* for processing, so we need to load the full message and keep it in the pending operation.
|
||||||
|
*/
|
||||||
|
command.clearMessage();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,142 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import jakarta.jms.JMSException;
|
||||||
|
import jakarta.jms.MessageProducer;
|
||||||
|
import jakarta.jms.Queue;
|
||||||
|
import jakarta.jms.Session;
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.command.TransactionId;
|
||||||
|
import org.apache.activemq.store.kahadb.MessageDatabase.AddOperation;
|
||||||
|
import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
|
||||||
|
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class KahaDBInFlightTxMemoryUsageTest {
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(KahaDBInFlightTxMemoryUsageTest.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
|
||||||
|
|
||||||
|
private BrokerService broker;
|
||||||
|
private URI brokerConnectURI;
|
||||||
|
|
||||||
|
private Map<TransactionId, List<Operation<?>>> inflightTransactions;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Before
|
||||||
|
public void startBroker() throws Exception {
|
||||||
|
broker = new BrokerService();
|
||||||
|
broker.setPersistent(true);
|
||||||
|
broker.setDataDirectoryFile(dataFileDir.getRoot());
|
||||||
|
broker.setDeleteAllMessagesOnStartup(true);
|
||||||
|
|
||||||
|
//set up a transport
|
||||||
|
TransportConnector connector = broker
|
||||||
|
.addConnector(new TransportConnector());
|
||||||
|
connector.setUri(new URI("tcp://0.0.0.0:0"));
|
||||||
|
connector.setName("tcp");
|
||||||
|
|
||||||
|
broker.start();
|
||||||
|
broker.waitUntilStarted();
|
||||||
|
brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
|
||||||
|
|
||||||
|
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
|
||||||
|
Field inflightField = MessageDatabase.class.getDeclaredField("inflightTransactions");
|
||||||
|
inflightField.setAccessible(true);
|
||||||
|
|
||||||
|
inflightTransactions = (LinkedHashMap<TransactionId, List<Operation<?>>>)
|
||||||
|
inflightField.get(adapter.getStore());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void stopBroker() throws Exception {
|
||||||
|
broker.stop();
|
||||||
|
broker.waitUntilStopped();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKahaDBInFlightTxMessagesClearedFromMemory() throws JMSException {
|
||||||
|
final String queueName = "test.queue";
|
||||||
|
|
||||||
|
final ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory(brokerConnectURI)
|
||||||
|
.createConnection();
|
||||||
|
// sync send so messages are immediately sent for testing, normally transacted sessions async send
|
||||||
|
connection.setAlwaysSyncSend(true);
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
Queue queue = session.createQueue(queueName);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Should be empty before any sends
|
||||||
|
assertTrue(inflightTransactions.isEmpty());
|
||||||
|
|
||||||
|
// Send 10 transacted messages but don't commit so they are pending
|
||||||
|
MessageProducer prod = session.createProducer(queue);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
prod.send(session.createTextMessage("test"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the inflight transactions map to verify the pending operations had messages cleared
|
||||||
|
assertEquals(inflightTransactions.size(), 1);
|
||||||
|
List<Operation<?>> pendingOps = inflightTransactions.values().stream().findFirst().orElseThrow();
|
||||||
|
assertEquals(10, pendingOps.size());
|
||||||
|
|
||||||
|
for (Operation<?> pendingOp : pendingOps) {
|
||||||
|
// all 10 ops should be AddOperation
|
||||||
|
assertTrue(pendingOp instanceof AddOperation);
|
||||||
|
KahaAddMessageCommand command = ((AddOperation)pendingOp).getCommand();
|
||||||
|
assertNotNull(pendingOp.getLocation());
|
||||||
|
assertNotNull(command);
|
||||||
|
assertNotNull(command.getMessageId());
|
||||||
|
assertNotNull(command.getDestination());
|
||||||
|
|
||||||
|
// Message should now be null when in the pending list
|
||||||
|
assertNull(command.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// assert cleared after successful commit
|
||||||
|
session.commit();
|
||||||
|
assertTrue(inflightTransactions.isEmpty());
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue