mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@924661 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
af65ba7242
commit
e75affa660
|
@ -108,8 +108,9 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
|
||||||
String cronEntry = "";
|
String cronEntry = "";
|
||||||
Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
|
Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
|
||||||
Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
|
Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
|
||||||
|
Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
|
||||||
|
|
||||||
if (cronValue != null || periodValue != null) {
|
if (cronValue != null || periodValue != null || delayValue != null) {
|
||||||
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend);
|
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(messageSend);
|
||||||
if (cronValue != null) {
|
if (cronValue != null) {
|
||||||
cronEntry = cronValue.toString();
|
cronEntry = cronValue.toString();
|
||||||
|
@ -117,7 +118,6 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
|
||||||
if (periodValue != null) {
|
if (periodValue != null) {
|
||||||
period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
|
period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
|
||||||
}
|
}
|
||||||
Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
|
|
||||||
if (delayValue != null) {
|
if (delayValue != null) {
|
||||||
delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
|
delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
|
||||||
}
|
}
|
||||||
|
@ -131,7 +131,6 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
|
||||||
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
super.send(producerExchange, messageSend);
|
super.send(producerExchange, messageSend);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -414,7 +414,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
|
||||||
}
|
}
|
||||||
|
|
||||||
checkValidObject(value);
|
checkValidObject(value);
|
||||||
checkValidScheduled(name, value);
|
value = convertScheduled(name, value);
|
||||||
PropertySetter setter = JMS_PROPERTY_SETERS.get(name);
|
PropertySetter setter = JMS_PROPERTY_SETERS.get(name);
|
||||||
|
|
||||||
if (setter != null && value != null) {
|
if (setter != null && value != null) {
|
||||||
|
@ -468,6 +468,20 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Object convertScheduled(String name, Object value) throws MessageFormatException {
|
||||||
|
Object result = value;
|
||||||
|
if (AMQ_SCHEDULED_DELAY.equals(name)){
|
||||||
|
result = TypeConversionSupport.convert(value, Long.class);
|
||||||
|
}
|
||||||
|
else if (AMQ_SCHEDULED_PERIOD.equals(name)){
|
||||||
|
result = TypeConversionSupport.convert(value, Long.class);
|
||||||
|
}
|
||||||
|
else if (AMQ_SCHEDULED_REPEAT.equals(name)){
|
||||||
|
result = TypeConversionSupport.convert(value, Integer.class);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
public Object getObjectProperty(String name) throws JMSException {
|
public Object getObjectProperty(String name) throws JMSException {
|
||||||
if (name == null) {
|
if (name == null) {
|
||||||
throw new NullPointerException("Property name cannot be null");
|
throw new NullPointerException("Property name cannot be null");
|
||||||
|
|
|
@ -93,7 +93,7 @@ public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
|
||||||
MessageProducer producer = session.createProducer(destination);
|
MessageProducer producer = session.createProducer(destination);
|
||||||
TextMessage message = session.createTextMessage("test msg");
|
TextMessage message = session.createTextMessage("test msg");
|
||||||
|
|
||||||
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, time);
|
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
|
||||||
|
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
producer.close();
|
producer.close();
|
||||||
|
|
|
@ -16,15 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.stomp;
|
package org.apache.activemq.transport.stomp;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
|
||||||
import org.apache.activemq.CombinationTestSupport;
|
|
||||||
import org.apache.activemq.broker.BrokerFactory;
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
|
||||||
import org.apache.activemq.broker.jmx.BrokerViewMBean;
|
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
|
||||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
|
@ -44,6 +35,15 @@ import javax.jms.ObjectMessage;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.CombinationTestSupport;
|
||||||
|
import org.apache.activemq.broker.BrokerFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.jmx.BrokerViewMBean;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
public class StompTest extends CombinationTestSupport {
|
public class StompTest extends CombinationTestSupport {
|
||||||
private static final Log LOG = LogFactory.getLog(StompTest.class);
|
private static final Log LOG = LogFactory.getLog(StompTest.class);
|
||||||
|
@ -58,7 +58,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
private Session session;
|
private Session session;
|
||||||
private ActiveMQQueue queue;
|
private ActiveMQQueue queue;
|
||||||
private String xmlObject = "<pojo>\n"
|
private final String xmlObject = "<pojo>\n"
|
||||||
+ " <name>Dejan</name>\n"
|
+ " <name>Dejan</name>\n"
|
||||||
+ " <city>Belgrade</city>\n"
|
+ " <city>Belgrade</city>\n"
|
||||||
+ "</pojo>";
|
+ "</pojo>";
|
||||||
|
@ -74,7 +74,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
+ " </entry>\n"
|
+ " </entry>\n"
|
||||||
+ "</map>\n";
|
+ "</map>\n";
|
||||||
|
|
||||||
private String jsonObject = "{\"pojo\":{"
|
private final String jsonObject = "{\"pojo\":{"
|
||||||
+ "\"name\":\"Dejan\","
|
+ "\"name\":\"Dejan\","
|
||||||
+ "\"city\":\"Belgrade\""
|
+ "\"city\":\"Belgrade\""
|
||||||
+ "}}";
|
+ "}}";
|
||||||
|
@ -86,6 +86,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
+ "]"
|
+ "]"
|
||||||
+ "}}";
|
+ "}}";
|
||||||
|
|
||||||
|
@Override
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
// The order of the entries is different when using ibm jdk 5.
|
// The order of the entries is different when using ibm jdk 5.
|
||||||
if (System.getProperty("java.vendor").equals("IBM Corporation")
|
if (System.getProperty("java.vendor").equals("IBM Corporation")
|
||||||
|
@ -132,6 +133,7 @@ public class StompTest extends CombinationTestSupport {
|
||||||
return getClass().getName() + "." + getName();
|
return getClass().getName() + "." + getName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected void tearDown() throws Exception {
|
protected void tearDown() throws Exception {
|
||||||
try {
|
try {
|
||||||
connection.close();
|
connection.close();
|
||||||
|
@ -244,6 +246,26 @@ public class StompTest extends CombinationTestSupport {
|
||||||
assertEquals("bar", "123", message.getStringProperty("bar"));
|
assertEquals("bar", "123", message.getStringProperty("bar"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSendMessageWithDelay() throws Exception {
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame = "SEND\n" + "AMQ_SCHEDULED_DELAY:5000\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
|
||||||
|
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
TextMessage message = (TextMessage)consumer.receive(2000);
|
||||||
|
assertNull(message);
|
||||||
|
message = (TextMessage)consumer.receive(5000);
|
||||||
|
assertNotNull(message);
|
||||||
|
}
|
||||||
|
|
||||||
public void testSendMessageWithStandardHeaders() throws Exception {
|
public void testSendMessageWithStandardHeaders() throws Exception {
|
||||||
|
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
Loading…
Reference in New Issue