AMQ-7196 - During startup ActiveMq load all the scheduleDB.data on memory causing OOM

This commit is contained in:
Alan Protasio 2019-05-08 12:00:42 -07:00
parent 7404b43f2d
commit b56819123b
4 changed files with 122 additions and 22 deletions

View File

@ -657,22 +657,35 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
* @param tx
* the transaction under which this operation was invoked.
*
* @return a list of all referenced Location values for this JobSchedulerImpl
* @return a iterator of all referenced Location values for this JobSchedulerImpl
*
* @throws IOException if an error occurs walking the scheduler tree.
*/
protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
List<JobLocation> references = new ArrayList<>();
protected Iterator<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
return new Iterator<JobLocation>() {
for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
Map.Entry<Long, List<JobLocation>> entry = i.next();
List<JobLocation> scheduled = entry.getValue();
for (JobLocation job : scheduled) {
references.add(job);
final Iterator<Map.Entry<Long, List<JobLocation>>> mapIterator = index.iterator(tx);
Iterator<JobLocation> iterator;
@Override
public boolean hasNext() {
while (iterator == null || !iterator.hasNext()) {
if (!mapIterator.hasNext()) {
break;
}
iterator = new ArrayList<>(mapIterator.next().getValue()).iterator();
}
return iterator != null && iterator.hasNext();
}
}
return references;
@Override
public JobLocation next() {
return iterator.next();
}
};
}
@Override

View File

@ -826,8 +826,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
Map.Entry<String, JobSchedulerImpl> entry = i.next();
JobSchedulerImpl scheduler = entry.getValue();
List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
for (JobLocation job : jobs) {
for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
final JobLocation job = jobLocationIterator.next();
if (job.getLocation().compareTo(lastAppendLocation) >= 0) {
if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) {
LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId());
@ -854,8 +854,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
Map.Entry<String, JobSchedulerImpl> entry = i.next();
JobSchedulerImpl scheduler = entry.getValue();
List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
for (JobLocation job : jobs) {
for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
final JobLocation job = jobLocationIterator.next();
missingJournalFiles.add(job.getLocation().getDataFileId());
if (job.getLastUpdate() != null) {
missingJournalFiles.add(job.getLastUpdate().getDataFileId());
@ -937,8 +937,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
Map.Entry<String, JobSchedulerImpl> entry = i.next();
JobSchedulerImpl scheduler = entry.getValue();
List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
for (JobLocation job : jobs) {
for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
final JobLocation job = jobLocationIterator.next();
// Remove all jobs in missing log files.
if (missing.contains(job.getLocation().getDataFileId())) {

View File

@ -35,8 +35,14 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -201,6 +207,69 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
@Test
public void testScheduleRestart() throws Exception {
testScheduleRestart(RestartType.NORMAL);
}
@Test
public void testScheduleFullRecoveryRestart() throws Exception {
testScheduleRestart(RestartType.FULL_RECOVERY);
}
@Test
public void testUpdatesAppliedToIndexBeforeJournalShouldBeDiscarded() throws Exception {
final int NUMBER_OF_MESSAGES = 1000;
final AtomicInteger numberOfDiscardedJobs = new AtomicInteger();
final JobSchedulerStoreImpl jobSchedulerStore = (JobSchedulerStoreImpl) broker.getJobSchedulerStore();
Location middleLocation = null;
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getMessage().toString().contains("Removed Job past last appened in the journal")) {
numberOfDiscardedJobs.incrementAndGet();
}
}
};
registerLogAppender(appender);
// send a messages
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = session.createTextMessage("test msg");
long time = 5000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
producer.send(message);
if (NUMBER_OF_MESSAGES / 2 == i) {
middleLocation = jobSchedulerStore.getJournal().getLastAppendLocation();
}
}
producer.close();
broker.stop();
broker.waitUntilStopped();
// Simulating the case here updates got applied on the index before the journal updates
jobSchedulerStore.getJournal().setLastAppendLocation(middleLocation);
jobSchedulerStore.load();
assertEquals(numberOfDiscardedJobs.get(), NUMBER_OF_MESSAGES / 2);
}
private void registerLogAppender(final Appender appender) {
org.apache.log4j.Logger log4jLogger =
org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class);
log4jLogger.addAppender(appender);
log4jLogger.setLevel(Level.TRACE);
}
private void testScheduleRestart(final RestartType restartType) throws Exception {
// send a message
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -213,12 +282,7 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
producer.close();
//restart broker
broker.stop();
broker.waitUntilStopped();
broker = createBroker(false);
broker.start();
broker.waitUntilStarted();
restartBroker(restartType);
// consume the message
connection = createConnection();

View File

@ -41,6 +41,11 @@ public class JobSchedulerTestSupport {
@Rule public TestName name = new TestName();
enum RestartType {
NORMAL,
FULL_RECOVERY
}
protected String connectionUri;
protected BrokerService broker;
protected JobScheduler jobScheduler;
@ -113,4 +118,22 @@ public class JobSchedulerTestSupport {
answer.setUseJmx(isUseJmx());
return answer;
}
protected void restartBroker(RestartType restartType) throws Exception {
tearDown();
if (restartType == RestartType.FULL_RECOVERY) {
File dir = broker.getSchedulerDirectoryFile();
if (dir != null) {
IOHelper.deleteFile(new File(dir, "scheduleDB.data"));
IOHelper.deleteFile(new File(dir, "scheduleDB.redo"));
}
}
broker = createBroker(false);
broker.start();
broker.waitUntilStarted();
}
}