This closes #665
This commit is contained in:
commit
64e95b9b91
|
@ -269,7 +269,7 @@ public final class ActiveMQClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allows programatically configuration of global thread pools properties. This method will update the global
|
* Allows programmatical configuration of global thread pools properties. This method will update the global
|
||||||
* thread pool configuration based on the provided values notifying all globalThreadPoolListeners.
|
* thread pool configuration based on the provided values notifying all globalThreadPoolListeners.
|
||||||
*
|
*
|
||||||
* Note: If global thread pools have already been created, they will not be updated with these new values.
|
* Note: If global thread pools have already been created, they will not be updated with these new values.
|
||||||
|
|
|
@ -40,7 +40,7 @@ public interface Channel {
|
||||||
long getID();
|
long getID();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This number increases every time the channel reconnects succesfully.
|
* This number increases every time the channel reconnects successfully.
|
||||||
* This is used to guarantee the integrity of the channel on sequential commands such as large messages.
|
* This is used to guarantee the integrity of the channel on sequential commands such as large messages.
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<!-- JMS Client because of some Convertions that are done -->
|
<!-- JMS Client because of some conversions that are done -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.activemq</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
<artifactId>artemis-jms-client</artifactId>
|
<artifactId>artemis-jms-client</artifactId>
|
||||||
|
|
|
@ -42,7 +42,7 @@ public class MQTTSessionCallback implements SessionCallback {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int sendMessage(MessageReference referece, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
|
public int sendMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
|
||||||
try {
|
try {
|
||||||
session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
|
session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,7 @@ public class MQTTSubscriptionManager {
|
||||||
|
|
||||||
private MQTTLogger log = MQTTLogger.LOGGER;
|
private MQTTLogger log = MQTTLogger.LOGGER;
|
||||||
|
|
||||||
// We filter out Artemis managment messages and notifications
|
// We filter out Artemis management messages and notifications
|
||||||
private SimpleString managementFilter;
|
private SimpleString managementFilter;
|
||||||
|
|
||||||
public MQTTSubscriptionManager(MQTTSession session) {
|
public MQTTSubscriptionManager(MQTTSession session) {
|
||||||
|
|
|
@ -845,7 +845,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
|
|
||||||
CommandProcessor commandProcessorInstance = new CommandProcessor();
|
CommandProcessor commandProcessorInstance = new CommandProcessor();
|
||||||
|
|
||||||
// This will listen for commands throught the protocolmanager
|
// This will listen for commands through the protocolmanager
|
||||||
public class CommandProcessor implements CommandVisitor {
|
public class CommandProcessor implements CommandVisitor {
|
||||||
|
|
||||||
public AMQConnectionContext getContext() {
|
public AMQConnectionContext getContext() {
|
||||||
|
|
|
@ -448,7 +448,7 @@ public class OpenWireMessageConverter implements MessageConverter {
|
||||||
return md;
|
return md;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ActiveMQMessage toAMQMessage(MessageReference refernce, ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException {
|
private static ActiveMQMessage toAMQMessage(MessageReference reference, ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException {
|
||||||
ActiveMQMessage amqMsg = null;
|
ActiveMQMessage amqMsg = null;
|
||||||
byte coreType = coreMessage.getType();
|
byte coreType = coreMessage.getType();
|
||||||
switch (coreType) {
|
switch (coreType) {
|
||||||
|
@ -748,7 +748,7 @@ public class OpenWireMessageConverter implements MessageConverter {
|
||||||
amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
|
amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
|
||||||
}
|
}
|
||||||
|
|
||||||
amqMsg.setRedeliveryCounter(refernce.getDeliveryCount() - 1);
|
amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
|
||||||
|
|
||||||
byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
|
byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
|
||||||
if (replyToBytes != null) {
|
if (replyToBytes != null) {
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<!-- JMS Client because of some Convertions that are done -->
|
<!-- JMS Client because of some conversions that are done -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.activemq</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
<artifactId>artemis-jms-client</artifactId>
|
<artifactId>artemis-jms-client</artifactId>
|
||||||
|
|
|
@ -792,7 +792,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
|
||||||
return buffer.toString();
|
return buffer.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
// here for backwards compatibilty
|
// here for backwards compatibility
|
||||||
public void setUseDLQ(final boolean b) {
|
public void setUseDLQ(final boolean b) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.activemq.artemis.rest.util.LinkStrategy;
|
||||||
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Auto-acknowleged consumer
|
* Auto-acknowledged consumer
|
||||||
*/
|
*/
|
||||||
public class QueueConsumer {
|
public class QueueConsumer {
|
||||||
|
|
||||||
|
|
|
@ -124,7 +124,7 @@ public final class BatchingIDGenerator implements IDGenerator {
|
||||||
|
|
||||||
if (!storageManager.isStarted()) {
|
if (!storageManager.isStarted()) {
|
||||||
// This could happen after the server is stopped
|
// This could happen after the server is stopped
|
||||||
// while notifications are being sent and ID gerated.
|
// while notifications are being sent and ID generated.
|
||||||
// If the ID is intended to the journal you would know soon enough
|
// If the ID is intended to the journal you would know soon enough
|
||||||
// so we just ignore this for now
|
// so we just ignore this for now
|
||||||
logger.debug("The journalStorageManager is not loaded. " + "This is probably ok as long as it's a notification being sent after shutdown");
|
logger.debug("The journalStorageManager is not loaded. " + "This is probably ok as long as it's a notification being sent after shutdown");
|
||||||
|
|
|
@ -192,7 +192,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
latch.await(30, TimeUnit.SECONDS);
|
latch.await(30, TimeUnit.SECONDS);
|
||||||
|
|
||||||
// We cache the variable as the replicator could be changed between here and the time we call stop
|
// We cache the variable as the replicator could be changed between here and the time we call stop
|
||||||
// since sendLiveIsStoping my issue a close back from the channel
|
// since sendLiveIsStopping may issue a close back from the channel
|
||||||
// and we want to ensure a stop here just in case
|
// and we want to ensure a stop here just in case
|
||||||
ReplicationManager replicatorInUse = replicator;
|
ReplicationManager replicatorInUse = replicator;
|
||||||
if (replicatorInUse != null) {
|
if (replicatorInUse != null) {
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class ActiveMQXAResourceRecovery {
|
||||||
|
|
||||||
public boolean initialise(final String config) {
|
public boolean initialise(final String config) {
|
||||||
if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled()) {
|
if (ActiveMQXARecoveryLogger.LOGGER.isTraceEnabled()) {
|
||||||
ActiveMQXARecoveryLogger.LOGGER.trace(this + " intialise: " + config);
|
ActiveMQXARecoveryLogger.LOGGER.trace(this + " initialise: " + config);
|
||||||
}
|
}
|
||||||
|
|
||||||
String[] configs = config.split(";");
|
String[] configs = config.split(";");
|
||||||
|
|
|
@ -74,12 +74,12 @@ public abstract class AutoFailTestSupport extends TestCase {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
// Wait for test to finish succesfully
|
// Wait for test to finish successfully
|
||||||
Thread.sleep(getMaxTestTime());
|
Thread.sleep(getMaxTestTime());
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
// This usually means the test was successful
|
// This usually means the test was successful
|
||||||
} finally {
|
} finally {
|
||||||
// Check if the test was able to tear down succesfully,
|
// Check if the test was able to tear down successfully,
|
||||||
// which usually means, it has finished its run.
|
// which usually means, it has finished its run.
|
||||||
if (!isTestSuccess.get()) {
|
if (!isTestSuccess.get()) {
|
||||||
LOG.error("Test case has exceeded the maximum allotted time to run of: " + getMaxTestTime() + " ms.");
|
LOG.error("Test case has exceeded the maximum allotted time to run of: " + getMaxTestTime() + " ms.");
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class JmsAutoAckListenerTest extends TestSupport implements MessageListen
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests if acknowleged messages are being consumed.
|
* Tests if acknowledged messages are being consumed.
|
||||||
*
|
*
|
||||||
* @throws javax.jms.JMSException
|
* @throws javax.jms.JMSException
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class JmsAutoAckTest extends TestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests if acknowleged messages are being consumed.
|
* Tests if acknowledged messages are being consumed.
|
||||||
*
|
*
|
||||||
* @throws javax.jms.JMSException
|
* @throws javax.jms.JMSException
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -51,7 +51,7 @@ public class JmsClientAckListenerTest extends TestSupport implements MessageList
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests if acknowleged messages are being consumed.
|
* Tests if acknowledged messages are being consumed.
|
||||||
*
|
*
|
||||||
* @throws javax.jms.JMSException
|
* @throws javax.jms.JMSException
|
||||||
*/
|
*/
|
||||||
|
@ -82,7 +82,7 @@ public class JmsClientAckListenerTest extends TestSupport implements MessageList
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests if unacknowleged messages are being redelivered when the consumer
|
* Tests if unacknowledged messages are being redelivered when the consumer
|
||||||
* connects again.
|
* connects again.
|
||||||
*
|
*
|
||||||
* @throws javax.jms.JMSException
|
* @throws javax.jms.JMSException
|
||||||
|
|
|
@ -139,7 +139,7 @@ public class NetworkLoadTest extends TestCase {
|
||||||
|
|
||||||
forwardingClients = new ForwardingClient[BROKER_COUNT - 1];
|
forwardingClients = new ForwardingClient[BROKER_COUNT - 1];
|
||||||
for (int i = 0; i < forwardingClients.length; i++) {
|
for (int i = 0; i < forwardingClients.length; i++) {
|
||||||
LOG.info("Starting fowarding client " + i);
|
LOG.info("Starting forwarding client " + i);
|
||||||
forwardingClients[i] = new ForwardingClient(i, i + 1);
|
forwardingClients[i] = new ForwardingClient(i, i + 1);
|
||||||
forwardingClients[i].start();
|
forwardingClients[i].start();
|
||||||
}
|
}
|
||||||
|
@ -148,11 +148,11 @@ public class NetworkLoadTest extends TestCase {
|
||||||
@Override
|
@Override
|
||||||
protected void tearDown() throws Exception {
|
protected void tearDown() throws Exception {
|
||||||
for (int i = 0; i < forwardingClients.length; i++) {
|
for (int i = 0; i < forwardingClients.length; i++) {
|
||||||
LOG.info("Stoping fowarding client " + i);
|
LOG.info("Stopping forwarding client " + i);
|
||||||
forwardingClients[i].close();
|
forwardingClients[i].close();
|
||||||
}
|
}
|
||||||
for (int i = 0; i < brokers.length; i++) {
|
for (int i = 0; i < brokers.length; i++) {
|
||||||
LOG.info("Stoping broker " + i);
|
LOG.info("Stopping broker " + i);
|
||||||
brokers[i].stop();
|
brokers[i].stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -448,7 +448,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
// See if the durable sub works in a new connection.
|
// See if the durable sub works in a new connection.
|
||||||
// The embeded broker shutsdown when his connections are closed.
|
// The embedded broker shuts down when its connections are closed.
|
||||||
// So we open the new connection before the old one is closed.
|
// So we open the new connection before the old one is closed.
|
||||||
connection.close();
|
connection.close();
|
||||||
connection = createConnection();
|
connection = createConnection();
|
||||||
|
|
|
@ -102,7 +102,7 @@ public class ThreeBrokerTempDestDemandSubscriptionCleanupTest extends JmsMultipl
|
||||||
assertNotNull("We should have gotten a response, but didn't for iter: " + i, response);
|
assertNotNull("We should have gotten a response, but didn't for iter: " + i, response);
|
||||||
assertEquals("We got the wrong response from the echo service", "Iteration: " + i, response.getText());
|
assertEquals("We got the wrong response from the echo service", "Iteration: " + i, response.getText());
|
||||||
|
|
||||||
// so we close the consumer so that an actual RemoveInfo command gets propogated through the
|
// so we close the consumer so that an actual RemoveInfo command gets propagated through the
|
||||||
// network
|
// network
|
||||||
responseConsumer.close();
|
responseConsumer.close();
|
||||||
conn.close();
|
conn.close();
|
||||||
|
@ -263,4 +263,4 @@ public class ThreeBrokerTempDestDemandSubscriptionCleanupTest extends JmsMultipl
|
||||||
connector.setDuplex(true);
|
connector.setDuplex(true);
|
||||||
return connector;
|
return connector;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,13 +100,13 @@ public class UnmodifiableConnection implements Connection {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
|
public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
|
||||||
// TODO - If implemented this method should return an unmodifiable link isntance.
|
// TODO - If implemented this method should return an unmodifiable link instance.
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Delivery getWorkHead() {
|
public Delivery getWorkHead() {
|
||||||
// TODO - If implemented this method should return an unmodifiable delivery isntance.
|
// TODO - If implemented this method should return an unmodifiable delivery instance.
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -117,7 +117,7 @@ public class CoreClientTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
// One way around the setting destination problem is as follows -
|
// One way around the setting destination problem is as follows -
|
||||||
// Remove destination as an attribute from client producer.
|
// Remove destination as an attribute from client producer.
|
||||||
// The destination always has to be set explicity before sending a message
|
// The destination always has to be set explicitly before sending a message
|
||||||
|
|
||||||
message.setAddress(QUEUE);
|
message.setAddress(QUEUE);
|
||||||
|
|
||||||
|
|
|
@ -120,7 +120,7 @@ public class MultipleProducersTest extends JMSTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
// send 5 message to queueTwo
|
// send 5 message to queueTwo
|
||||||
// there shoudl be 5 messages on queueTwo
|
// there should be 5 messages on queueTwo
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
sendMessage(queueTwo, session);
|
sendMessage(queueTwo, session);
|
||||||
}
|
}
|
||||||
|
@ -137,7 +137,7 @@ public class MultipleProducersTest extends JMSTestBase {
|
||||||
sendMessage(queueOne, session);
|
sendMessage(queueOne, session);
|
||||||
}
|
}
|
||||||
|
|
||||||
// at the end of the test there shoudl be 5 message on queueOne and 5 messages on queueTwo
|
// at the end of the test there should be 5 message on queueOne and 5 messages on queueTwo
|
||||||
|
|
||||||
session.close();
|
session.close();
|
||||||
|
|
||||||
|
|
|
@ -1517,7 +1517,7 @@ public class JMSQueueControlTest extends ManagementTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMoveMessageToUnknownQueue() throws Exception {
|
public void testMoveMessageToUnknownQueue() throws Exception {
|
||||||
String unknwonQueue = RandomUtil.randomString();
|
String unknownQueue = RandomUtil.randomString();
|
||||||
|
|
||||||
String[] messageIDs = JMSUtil.sendMessages(queue, 1);
|
String[] messageIDs = JMSUtil.sendMessages(queue, 1);
|
||||||
|
|
||||||
|
@ -1525,7 +1525,7 @@ public class JMSQueueControlTest extends ManagementTestBase {
|
||||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
queueControl.moveMessage(messageIDs[0], unknwonQueue);
|
queueControl.moveMessage(messageIDs[0], unknownQueue);
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
|
|
@ -32,7 +32,7 @@ import org.junit.Test;
|
||||||
public class JmsAutoAckTest extends BasicOpenWireTest {
|
public class JmsAutoAckTest extends BasicOpenWireTest {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests if acknowleged messages are being consumed.
|
* Tests if acknowledged messages are being consumed.
|
||||||
*
|
*
|
||||||
* @throws javax.jms.JMSException
|
* @throws javax.jms.JMSException
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -191,7 +191,7 @@ public class StompTest extends StompTestBase {
|
||||||
frame = "SEND\n" + "destination:" + address + "\n\n" + stompPayload + Stomp.NULL;
|
frame = "SEND\n" + "destination:" + address + "\n\n" + stompPayload + Stomp.NULL;
|
||||||
sendFrame(frame);
|
sendFrame(frame);
|
||||||
|
|
||||||
// Recieve MQTT Message
|
// Receive MQTT Message
|
||||||
byte[] mqttPayload = clientProvider.receive(10000);
|
byte[] mqttPayload = clientProvider.receive(10000);
|
||||||
clientProvider.disconnect();
|
clientProvider.disconnect();
|
||||||
|
|
||||||
|
|
|
@ -103,7 +103,7 @@ public interface Server extends Remote {
|
||||||
// int downCacheSize, boolean manageConfirmations) throws Exception;
|
// int downCacheSize, boolean manageConfirmations) throws Exception;
|
||||||
//
|
//
|
||||||
// /**
|
// /**
|
||||||
// * Creates a topic programatically.
|
// * Creates a topic programmatically.
|
||||||
// */
|
// */
|
||||||
// void deployTopicProgrammatically(String name, String jndiName) throws Exception;
|
// void deployTopicProgrammatically(String name, String jndiName) throws Exception;
|
||||||
//
|
//
|
||||||
|
@ -119,7 +119,7 @@ public interface Server extends Remote {
|
||||||
// int downCacheSize, boolean manageConfirmations) throws Exception;
|
// int downCacheSize, boolean manageConfirmations) throws Exception;
|
||||||
//
|
//
|
||||||
// /**
|
// /**
|
||||||
// * Creates a queue programatically.
|
// * Creates a queue programmatically.
|
||||||
// */
|
// */
|
||||||
// void deployQueueProgrammatically(String name, String jndiName) throws Exception;
|
// void deployQueueProgrammatically(String name, String jndiName) throws Exception;
|
||||||
|
|
||||||
|
@ -130,7 +130,7 @@ public interface Server extends Remote {
|
||||||
// void undeployDestination(boolean isQueue, String name) throws Exception;
|
// void undeployDestination(boolean isQueue, String name) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destroys a programatically created destination.
|
* Destroys a programmatically created destination.
|
||||||
*/
|
*/
|
||||||
// boolean undeployDestinationProgrammatically(boolean isQueue, String name) throws Exception;
|
// boolean undeployDestinationProgrammatically(boolean isQueue, String name) throws Exception;
|
||||||
void deployConnectionFactory(String clientId,
|
void deployConnectionFactory(String clientId,
|
||||||
|
|
Loading…
Reference in New Issue