This commit is contained in:
Justin Bertram 2021-09-29 13:39:59 -05:00
commit 98a3af8383
2 changed files with 21 additions and 12 deletions

View File

@ -1759,6 +1759,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 222303, value = "Redistribution by {0} of messageID = {1} failed", format = Message.Format.MESSAGE_FORMAT)
void errorRedistributing(@Cause Throwable t, String queueName, long m);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222304, value = "Unable to load message from journal", format = Message.Format.MESSAGE_FORMAT)
void unableToLoadMessageFromJournal(@Cause Throwable t);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);

View File

@ -221,23 +221,28 @@ public class PostOfficeJournalLoader implements JournalLoader {
long currentTime = System.currentTimeMillis();
for (AddMessageRecord record : valueRecords) {
long scheduledDeliveryTime = record.getScheduledDeliveryTime();
try {
long scheduledDeliveryTime = record.getScheduledDeliveryTime();
if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <= currentTime) {
scheduledDeliveryTime = 0;
record.getMessage().setScheduledDeliveryTime(0L);
}
if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <= currentTime) {
scheduledDeliveryTime = 0;
record.getMessage().setScheduledDeliveryTime(0L);
}
if (scheduledDeliveryTime != 0) {
record.getMessage().setScheduledDeliveryTime(scheduledDeliveryTime);
}
if (scheduledDeliveryTime != 0) {
record.getMessage().setScheduledDeliveryTime(scheduledDeliveryTime);
}
MessageReference ref = postOffice.reload(record.getMessage(), queue, null);
MessageReference ref = postOffice.reload(record.getMessage(), queue, null);
ref.setDeliveryCount(record.getDeliveryCount());
ref.setDeliveryCount(record.getDeliveryCount());
if (scheduledDeliveryTime != 0) {
record.getMessage().setScheduledDeliveryTime(0L);
if (scheduledDeliveryTime != 0) {
record.getMessage().setScheduledDeliveryTime(0L);
}
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.unableToLoadMessageFromJournal(t);
continue;
}
}
}