AMQ-7002 - ensure uniqueue jobids, fix and test via patch from Jamie goodyear applied with thanks

This commit is contained in:
gtully 2018-06-27 14:14:44 +01:00
parent 919ca96cee
commit e0aa091d9e
2 changed files with 181 additions and 2 deletions

View File

@ -337,7 +337,8 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
}
getInternalScheduler().schedule(msg.getMessageId().toString(),
String jobId = ID_GENERATOR.generateId();
getInternalScheduler().schedule(jobId,
new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat);
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.util.RedeliveryPlugin;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.io.File;
import java.util.List;
import static junit.framework.TestCase.assertNotNull;
import static org.junit.Assert.*;
public class AMQ7002Test {
static final Logger LOG = LoggerFactory.getLogger(AMQ7002Test.class);
protected ActiveMQConnection connection;
protected ActiveMQConnectionFactory connectionFactory;
private BrokerService brokerService;
private JobSchedulerStoreImpl store;
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
}
protected Connection createConnection() throws Exception {
return getConnectionFactory().createConnection();
}
public ActiveMQConnectionFactory getConnectionFactory() throws Exception {
if (connectionFactory == null) {
connectionFactory = createConnectionFactory();
assertTrue("Should have created a connection factory!", connectionFactory != null);
}
return connectionFactory;
}
protected BrokerService createBroker() throws Exception {
File directory = new File("target/test/ScheduledJobsDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
createSchedulerStore(directory);
BrokerService service = new BrokerService();
service.setPersistent(true);
service.setUseJmx(false);
service.setJobSchedulerStore(store);
service.setSchedulerSupport(true);
service.setDeleteAllMessagesOnStartup(true);
RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
RedeliveryPolicy brokerRedeliveryPolicy = new RedeliveryPolicy();
brokerRedeliveryPolicy.setInitialRedeliveryDelay(60000);
brokerRedeliveryPolicy.setMaximumRedeliveries(20);
brokerRedeliveryPolicy.setMaximumRedeliveryDelay(300000);
brokerRedeliveryPolicy.setBackOffMultiplier(2);
RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
redeliveryPolicyMap.setDefaultEntry(brokerRedeliveryPolicy);
redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
service.setPlugins(new BrokerPlugin[]{redeliveryPlugin});
service.start();
service.waitUntilStarted();
return service;
}
protected ConsumerObject getConsumer(int id) throws Exception {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(0);
redeliveryPolicy.setMaximumRedeliveries(0);
ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
consumerConnection.setRedeliveryPolicy(redeliveryPolicy);
consumerConnection.start();
Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue("Consumer." + id + ".VirtualTopic.Orders"));
LOG.info(consumer.toString());
ConsumerObject co = new ConsumerObject(consumerSession, consumer, consumerConnection);
return co;
}
@Before
public void before() throws Exception {
brokerService = createBroker();
}
@After
public void after() throws Exception {
brokerService.stop();
brokerService.waitUntilStopped();
}
@Test
public void TestDuplicateJobIDs() throws Exception {
ConsumerObject consumer1 = getConsumer(1);
ConsumerObject consumer2 = getConsumer(2);
ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
producerConnection.start();
//Session session = producerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session = producerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination dest = session.createTopic("VirtualTopic.Orders");
MessageProducer producer = session.createProducer(dest);
TextMessage msg = session.createTextMessage("Test Me");
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(msg);
Message message1 = consumer1.getConsumer().receive();
assertNotNull("got message", message1);
LOG.info("got: " + message1);
Message message2 = consumer2.getConsumer().receive();
assertNotNull("got message", message2);
LOG.info("got: " + message2);
//Force rollback
consumer1.getSession().rollback();
consumer2.getSession().rollback();
// Check the scheduled jobs here //
Thread.sleep(2000);
JobScheduler js = brokerService.getJobSchedulerStore().getJobScheduler("JMS");
List<Job> jobList = js.getAllJobs();
assertNotNull(jobList);
assertEquals(2, jobList.size());
String jobId1 = jobList.get(0).getJobId();
String jobId2 = jobList.get(1).getJobId();
assertFalse("FAIL: JobIDs are duplicates!",jobId1.equals(jobId2));
}
private class ConsumerObject {
Session session;
MessageConsumer consumer;
Connection connection;
public ConsumerObject(Session session, MessageConsumer consumer, Connection connection) {
this.session = session;
this.consumer = consumer;
this.connection = connection;
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
public MessageConsumer getConsumer() {
return consumer;
}
public void setConsumer(MessageConsumer consumer) {
this.consumer = consumer;
}
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
}
protected void createSchedulerStore(File directory) throws Exception {
store = new JobSchedulerStoreImpl();
store.setDirectory(directory);
store.setCheckpointInterval(5000);
store.setCleanupInterval(10000);
//store.setJournalMaxFileLength(10 * 1024);
}
}