Merge pull request #521 from coheigea/AMQ-7458

AMQ-7458 - Implement bounds checking on the message scheduling proper…
This commit is contained in:
Jean-Baptiste Onofré 2020-05-15 15:52:23 +02:00 committed by GitHub
commit 41bef94293
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 240 additions and 18 deletions

View File

@ -143,6 +143,7 @@ public class BrokerService implements Service {
public static final String DEFAULT_BROKER_NAME = "localhost";
public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
public static final long DEFAULT_START_TIMEOUT = 600000L;
public static final int MAX_SCHEDULER_REPEAT_ALLOWED = 1000;
private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
@ -238,6 +239,7 @@ public class BrokerService implements Service {
private boolean forceStart = false;
private IOExceptionHandler ioExceptionHandler;
private boolean schedulerSupport = false;
private int maxSchedulerRepeatAllowed = MAX_SCHEDULER_REPEAT_ALLOWED;
private File schedulerDirectoryFile;
private Scheduler scheduler;
private ThreadPoolExecutor executor;
@ -2460,6 +2462,7 @@ public class BrokerService implements Service {
protected Broker addInterceptors(Broker broker) throws Exception {
if (isSchedulerSupport()) {
SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed);
if (isUseJmx()) {
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
try {
@ -3295,4 +3298,12 @@ public class BrokerService implements Service {
public boolean isRollbackOnlyOnAsyncException() {
return rollbackOnlyOnAsyncException;
}
public int getMaxSchedulerRepeatAllowed() {
return maxSchedulerRepeatAllowed;
}
public void setMaxSchedulerRepeatAllowed(int maxSchedulerRepeatAllowed) {
this.maxSchedulerRepeatAllowed = maxSchedulerRepeatAllowed;
}
}

View File

@ -19,6 +19,8 @@ package org.apache.activemq.broker.scheduler;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.MessageFormatException;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
@ -56,6 +58,10 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class);
private static final IdGenerator ID_GENERATOR = new IdGenerator();
private static final LongSequenceGenerator longGenerator = new LongSequenceGenerator();
/**
* The max repeat value allowed to prevent clients from causing DoS issues with huge repeat counts
*/
private static final int MAX_REPEAT_ALLOWED = 1000;
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final AtomicBoolean started = new AtomicBoolean();
private final WireFormat wireFormat = new OpenWireFormat();
@ -65,6 +71,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
private final JobSchedulerStore store;
private JobScheduler scheduler;
private int maxRepeatAllowed = MAX_REPEAT_ALLOWED;
public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
super(next);
@ -336,6 +343,9 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
if (repeatValue != null) {
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
if (repeat > maxRepeatAllowed) {
throw new MessageFormatException("The scheduled repeat value is too large");
}
}
//job id should be unique for every job (Same format as MessageId)
@ -357,6 +367,9 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
int repeat = 0;
if (repeatValue != null) {
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
if (repeat > maxRepeatAllowed) {
throw new MessageFormatException("The scheduled repeat value is too large");
}
}
if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
@ -456,4 +469,12 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
LOG.error("Failed to send scheduled message {}", job.getJobId(), e);
}
}
public int getMaxRepeatAllowed() {
return maxRepeatAllowed;
}
public void setMaxRepeatAllowed(int maxRepeatAllowed) {
this.maxRepeatAllowed = maxRepeatAllowed;
}
}

View File

@ -543,27 +543,28 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
}
}
protected void checkValidScheduled(String name, Object value) throws MessageFormatException {
if (AMQ_SCHEDULED_DELAY.equals(name) || AMQ_SCHEDULED_PERIOD.equals(name) || AMQ_SCHEDULED_REPEAT.equals(name)) {
if (value instanceof Long == false && value instanceof Integer == false) {
throw new MessageFormatException(name + " should be long or int value");
}
}
if (AMQ_SCHEDULED_CRON.equals(name)) {
CronParser.validate(value.toString());
}
}
protected Object convertScheduled(String name, Object value) throws MessageFormatException {
Object result = value;
if (AMQ_SCHEDULED_DELAY.equals(name)){
result = TypeConversionSupport.convert(value, Long.class);
if (result != null && (Long)result < 0) {
throw new MessageFormatException(name + " must not be a negative value");
}
}
else if (AMQ_SCHEDULED_PERIOD.equals(name)){
result = TypeConversionSupport.convert(value, Long.class);
if (result != null && (Long)result < 0) {
throw new MessageFormatException(name + " must not be a negative value");
}
}
else if (AMQ_SCHEDULED_REPEAT.equals(name)){
result = TypeConversionSupport.convert(value, Integer.class);
if (result != null && (Integer)result < 0) {
throw new MessageFormatException(name + " must not be a negative value");
}
}
else if (AMQ_SCHEDULED_CRON.equals(name)) {
CronParser.validate(value.toString());
}
return result;
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.command;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import javax.jms.MessageFormatException;
import org.apache.activemq.ScheduledMessage;
import org.junit.Test;
public class ScheduledValuesTest {
@Test
public void testNegativeDelay() throws Exception {
ActiveMQMessage message = new ActiveMQMessage();
try {
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, -1L);
fail("Failure expected on a negative value");
} catch (MessageFormatException ex) {
assertEquals("AMQ_SCHEDULED_DELAY must not be a negative value", ex.getMessage());
}
}
@Test
public void testNegativeRepeat() throws Exception {
ActiveMQMessage message = new ActiveMQMessage();
try {
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
fail("Failure expected on a negative value");
} catch (MessageFormatException ex) {
assertEquals("AMQ_SCHEDULED_REPEAT must not be a negative value", ex.getMessage());
}
}
@Test
public void testNegativePeriod() throws Exception {
ActiveMQMessage message = new ActiveMQMessage();
try {
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, -1L);
fail("Failure expected on a negative value");
} catch (MessageFormatException ex) {
assertEquals("AMQ_SCHEDULED_PERIOD must not be a negative value", ex.getMessage());
}
}
@Test
public void testScheduledDelayViaCron() throws Exception {
ActiveMQMessage message = new ActiveMQMessage();
try {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "-1 * * * *");
fail("Failure expected on a negative value");
} catch (NumberFormatException ex) {
// expected
}
}
}

View File

@ -56,10 +56,10 @@ public class JobSchedulerJmxManagementTests extends JobSchedulerTestSupport {
JobSchedulerViewMBean view = getJobSchedulerMBean();
assertNotNull(view);
assertTrue(view.getAllJobs().isEmpty());
scheduleMessage(60000, -1, -1);
scheduleMessage(60000, 0, 0);
assertFalse(view.getAllJobs().isEmpty());
assertEquals(1, view.getAllJobs().size());
scheduleMessage(60000, -1, -1);
scheduleMessage(60000, 0, 0);
assertEquals(2, view.getAllJobs().size());
}
@ -68,7 +68,7 @@ public class JobSchedulerJmxManagementTests extends JobSchedulerTestSupport {
JobSchedulerViewMBean view = getJobSchedulerMBean();
assertNotNull(view);
assertTrue(view.getAllJobs().isEmpty());
scheduleMessage(60000, -1, -1);
scheduleMessage(60000, 0, 0);
assertFalse(view.getAllJobs().isEmpty());
TabularData jobs = view.getAllJobs();
assertEquals(1, jobs.size());
@ -85,7 +85,7 @@ public class JobSchedulerJmxManagementTests extends JobSchedulerTestSupport {
JobSchedulerViewMBean view = getJobSchedulerMBean();
assertNotNull(view);
assertTrue(view.getAllJobs().isEmpty());
scheduleMessage(60000, -1, -1);
scheduleMessage(60000, 0, 0);
assertFalse(view.getAllJobs().isEmpty());
String now = JobSupport.getDateTime(System.currentTimeMillis());
String later = JobSupport.getDateTime(System.currentTimeMillis() + 120 * 1000);
@ -98,7 +98,7 @@ public class JobSchedulerJmxManagementTests extends JobSchedulerTestSupport {
JobSchedulerViewMBean view = getJobSchedulerMBean();
assertNotNull(view);
assertTrue(view.getAllJobs().isEmpty());
scheduleMessage(60000, -1, -1);
scheduleMessage(60000, 0, 0);
assertFalse(view.getAllJobs().isEmpty());
long before = System.currentTimeMillis() + 57 * 1000;
long toLate = System.currentTimeMillis() + 63 * 1000;

View File

@ -164,7 +164,7 @@ public class KahaDBSchedulerIndexRebuildTest {
long time = 360 * 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 0);
producer.send(message);
producer.close();
}

View File

@ -179,7 +179,7 @@ public class KahaDBSchedulerMissingJournalLogsTest {
long time = 360 * 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 0);
producer.send(message);
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.scheduler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.net.ServerSocket;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
public class SchedulerRepeatTest {
private static BrokerService broker;
private static String brokerAddress;
@org.junit.BeforeClass
public static void startBroker() throws Exception {
broker = new BrokerService();
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
broker.setJobSchedulerStore(new InMemoryJobSchedulerStore());
broker.setDataDirectory("target/activemq-data");
broker.setSchedulerSupport(true);
ServerSocket serverSocket = new ServerSocket(0);
int brokerPort = serverSocket.getLocalPort();
serverSocket.close();
brokerAddress = "tcp://localhost:" + brokerPort;
broker.addConnector(brokerAddress);
broker.start();
}
@org.junit.AfterClass
public static void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
}
}
@org.junit.Test
public void testSendLotsofMessages() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerAddress);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("testqueue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Some txt");
message.setStringProperty("some header", "some value");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 0L);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 0L);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 2000);
try {
producer.send(message);
fail("Failure expected on too large a repeat value");
} catch (MessageFormatException ex) {
assertEquals("The scheduled repeat value is too large", ex.getMessage());
}
connection.close();
}
@org.junit.Test
public void testRepeat() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerAddress);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("testqueue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Some txt");
message.setStringProperty("some header", "some value");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 0L);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 0L);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 900);
producer.send(message);
connection.close();
}
}