mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-03-01 06:49:25 +00:00
ARTEMIS-4768 _AMQ_SCHED_DELIVERY msg prop lost after broker restart
This commit is contained in:
parent
e13d65b16d
commit
7e151ee1ce
@ -231,22 +231,18 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||||||
try {
|
try {
|
||||||
long scheduledDeliveryTime = record.getScheduledDeliveryTime();
|
long scheduledDeliveryTime = record.getScheduledDeliveryTime();
|
||||||
|
|
||||||
if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <= currentTime) {
|
|
||||||
scheduledDeliveryTime = 0;
|
|
||||||
record.getMessage().setScheduledDeliveryTime(0L);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (scheduledDeliveryTime != 0) {
|
if (scheduledDeliveryTime != 0) {
|
||||||
|
if (scheduledDeliveryTime <= currentTime) {
|
||||||
|
// scheduled delivery time already passed while the broker wasn't running
|
||||||
|
record.getMessage().setScheduledDeliveryTime(0L);
|
||||||
|
} else {
|
||||||
record.getMessage().setScheduledDeliveryTime(scheduledDeliveryTime);
|
record.getMessage().setScheduledDeliveryTime(scheduledDeliveryTime);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MessageReference ref = postOffice.reload(record.getMessage(), queue, null);
|
MessageReference ref = postOffice.reload(record.getMessage(), queue, null);
|
||||||
|
|
||||||
ref.setDeliveryCount(record.getDeliveryCount());
|
ref.setDeliveryCount(record.getDeliveryCount());
|
||||||
|
|
||||||
if (scheduledDeliveryTime != 0) {
|
|
||||||
record.getMessage().setScheduledDeliveryTime(0L);
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
ActiveMQServerLogger.LOGGER.unableToLoadMessageFromJournal(t);
|
ActiveMQServerLogger.LOGGER.unableToLoadMessageFromJournal(t);
|
||||||
continue;
|
continue;
|
||||||
|
@ -0,0 +1,68 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class ScheduledMessageRestartTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
private ActiveMQServer server;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
server = createServer(true, false);
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSchedulePropertyExistsAfterRestart() throws Exception {
|
||||||
|
final String queueName = RandomUtil.randomString();
|
||||||
|
final long scheduledTime = System.currentTimeMillis() * 2;
|
||||||
|
server.createQueue(new QueueConfiguration(queueName).setAddress(queueName));
|
||||||
|
ServerLocator locator = createInVMLocator(0);
|
||||||
|
ClientSessionFactory factory = locator.createSessionFactory();
|
||||||
|
ClientSession session = factory.createSession();
|
||||||
|
ClientProducer producer = session.createProducer(queueName);
|
||||||
|
ClientMessage m = session.createMessage(true);
|
||||||
|
m.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, scheduledTime);
|
||||||
|
producer.send(m);
|
||||||
|
locator.close();
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
|
List<MessageReference> scheduledMessages = server.locateQueue(queueName).getScheduledMessages();
|
||||||
|
assertEquals(1, scheduledMessages.size());
|
||||||
|
Message serverMessage = scheduledMessages.get(0).getMessage();
|
||||||
|
assertTrue(serverMessage.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME));
|
||||||
|
assertEquals(scheduledTime, serverMessage.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME).longValue());
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user