diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQBytesMessage.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQBytesMessage.cs index e43fb1b9b2..9f4188160f 100644 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQBytesMessage.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQBytesMessage.cs @@ -22,24 +22,14 @@ using OpenWire.Client.Core; namespace OpenWire.Client.Commands { - public class ActiveMQBytesMessage : ActiveMQMessage + public class ActiveMQBytesMessage : ActiveMQMessage, IBytesMessage { - public const byte ID_ActiveMQBytesMessage = 24; - - - - - // TODO generate Equals method - // TODO generate GetHashCode method - // TODO generate ToString method - - - public override byte GetDataStructureType() { + public const byte ID_ActiveMQBytesMessage = 24; + + + public override byte GetDataStructureType() + { return ID_ActiveMQBytesMessage; } - - - // Properties - } } diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQDestination.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQDestination.cs index 639c6e2d8f..d4ade9f169 100755 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQDestination.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQDestination.cs @@ -18,443 +18,513 @@ using System; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client.Commands { +namespace OpenWire.Client.Commands +{ + + /// + /// Summary description for ActiveMQDestination. + /// + public abstract class ActiveMQDestination : AbstractCommand, IDestination + { - /// - /// Summary description for ActiveMQDestination. - /// - public abstract class ActiveMQDestination : AbstractCommand, IDestination { - - /** - * Topic Destination object - */ - public const int ACTIVEMQ_TOPIC = 1; - /** - * Temporary Topic Destination object - */ - public const int ACTIVEMQ_TEMPORARY_TOPIC = 2; - - /** - * Queue Destination object - */ - public const int ACTIVEMQ_QUEUE = 3; - /** - * Temporary Queue Destination object - */ - public const int ACTIVEMQ_TEMPORARY_QUEUE = 4; - - /** - * prefix for Advisory message destinations - */ - public const String ADVISORY_PREFIX = "ActiveMQ.Advisory."; - - /** - * prefix for consumer advisory destinations - */ - public const String CONSUMER_ADVISORY_PREFIX = ADVISORY_PREFIX + "Consumers."; - - /** - * prefix for producer advisory destinations - */ - public const String PRODUCER_ADVISORY_PREFIX = ADVISORY_PREFIX + "Producers."; - - /** - * prefix for connection advisory destinations - */ - public const String CONNECTION_ADVISORY_PREFIX = ADVISORY_PREFIX + "Connections."; - - /** - * The default target for ordered destinations - */ - public const String DEFAULT_ORDERED_TARGET = "coordinator"; - - private const int NULL_DESTINATION = 10; - - private const String TEMP_PREFIX = "{TD{"; - private const String TEMP_POSTFIX = "}TD}"; - private const String COMPOSITE_SEPARATOR = ","; - private const String QUEUE_PREFIX = "queue://"; - private const String TOPIC_PREFIX = "topic://"; - - - private String physicalName = ""; - - // Cached transient data - private bool exclusive; - private bool ordered; - private bool advisory; - private String orderedTarget = DEFAULT_ORDERED_TARGET; - - - /** - * The Default Constructor - */ - protected ActiveMQDestination() { + /** + * Topic Destination object + */ + public const int ACTIVEMQ_TOPIC = 1; + /** + * Temporary Topic Destination object + */ + public const int ACTIVEMQ_TEMPORARY_TOPIC = 2; + + /** + * Queue Destination object + */ + public const int ACTIVEMQ_QUEUE = 3; + /** + * Temporary Queue Destination object + */ + public const int ACTIVEMQ_TEMPORARY_QUEUE = 4; + + /** + * prefix for Advisory message destinations + */ + public const String ADVISORY_PREFIX = "ActiveMQ.Advisory."; + + /** + * prefix for consumer advisory destinations + */ + public const String CONSUMER_ADVISORY_PREFIX = ADVISORY_PREFIX + "Consumers."; + + /** + * prefix for producer advisory destinations + */ + public const String PRODUCER_ADVISORY_PREFIX = ADVISORY_PREFIX + "Producers."; + + /** + * prefix for connection advisory destinations + */ + public const String CONNECTION_ADVISORY_PREFIX = ADVISORY_PREFIX + "Connections."; + + /** + * The default target for ordered destinations + */ + public const String DEFAULT_ORDERED_TARGET = "coordinator"; + + private const int NULL_DESTINATION = 10; + + private const String TEMP_PREFIX = "{TD{"; + private const String TEMP_POSTFIX = "}TD}"; + private const String COMPOSITE_SEPARATOR = ","; + private const String QUEUE_PREFIX = "queue://"; + private const String TOPIC_PREFIX = "topic://"; + + + private String physicalName = ""; + + // Cached transient data + private bool exclusive; + private bool ordered; + private bool advisory; + private String orderedTarget = DEFAULT_ORDERED_TARGET; + + + /** + * The Default Constructor + */ + protected ActiveMQDestination() + { + } + + /** + * Construct the Destination with a defined physical name; + * + * @param name + */ + protected ActiveMQDestination(String name) + { + this.physicalName = name; + this.advisory = name != null && name.StartsWith(ADVISORY_PREFIX); + } + + + + /** + * @return Returns the advisory. + */ + public bool IsAdvisory() + { + return advisory; + } + /** + * @param advisory The advisory to set. + */ + public void SetAdvisory(bool advisory) + { + this.advisory = advisory; + } + + /** + * @return true if this is a destination for Consumer advisories + */ + public bool IsConsumerAdvisory() + { + return IsAdvisory() && physicalName.StartsWith(CONSUMER_ADVISORY_PREFIX); + } + + /** + * @return true if this is a destination for Producer advisories + */ + public bool IsProducerAdvisory() + { + return IsAdvisory() && physicalName.StartsWith(PRODUCER_ADVISORY_PREFIX); + } + + /** + * @return true if this is a destination for Connection advisories + */ + public bool IsConnectionAdvisory() + { + return IsAdvisory() && physicalName.StartsWith(CONNECTION_ADVISORY_PREFIX); + } + + /** + * @return Returns the exclusive. + */ + public bool IsExclusive() + { + return exclusive; + } + /** + * @param exclusive The exclusive to set. + */ + public void SetExclusive(bool exclusive) + { + this.exclusive = exclusive; + } + /** + * @return Returns the ordered. + */ + public bool IsOrdered() + { + return ordered; + } + /** + * @param ordered The ordered to set. + */ + public void SetOrdered(bool ordered) + { + this.ordered = ordered; + } + /** + * @return Returns the orderedTarget. + */ + public String GetOrderedTarget() + { + return orderedTarget; + } + /** + * @param orderedTarget The orderedTarget to set. + */ + public void SetOrderedTarget(String orderedTarget) + { + this.orderedTarget = orderedTarget; + } + /** + * A helper method to return a descriptive string for the topic or queue + * @param destination + * + * @return a descriptive string for this queue or topic + */ + public static String Inspect(ActiveMQDestination destination) + { + if (destination is ITopic) + { + return "Topic(" + destination.ToString() + ")"; + } + else + { + return "Queue(" + destination.ToString() + ")"; + } + } + + /** + * @param destination + * @return @throws JMSException + * @throws javax.jms.JMSException + */ + public static ActiveMQDestination Transform(IDestination destination) + { + ActiveMQDestination result = null; + if (destination != null) + { + if (destination is ActiveMQDestination) + { + result = (ActiveMQDestination) destination; } - - /** - * Construct the Destination with a defined physical name; - * - * @param name - */ - protected ActiveMQDestination(String name) { - this.physicalName = name; - this.advisory = name != null && name.StartsWith(ADVISORY_PREFIX); - } - - - - /** - * @return Returns the advisory. - */ - public bool IsAdvisory() { - return advisory; - } - /** - * @param advisory The advisory to set. - */ - public void SetAdvisory(bool advisory) { - this.advisory = advisory; - } - - /** - * @return true if this is a destination for Consumer advisories - */ - public bool IsConsumerAdvisory() { - return IsAdvisory() && physicalName.StartsWith(CONSUMER_ADVISORY_PREFIX); - } - - /** - * @return true if this is a destination for Producer advisories - */ - public bool IsProducerAdvisory() { - return IsAdvisory() && physicalName.StartsWith(PRODUCER_ADVISORY_PREFIX); - } - - /** - * @return true if this is a destination for Connection advisories - */ - public bool IsConnectionAdvisory() { - return IsAdvisory() && physicalName.StartsWith(CONNECTION_ADVISORY_PREFIX); - } - - /** - * @return Returns the exclusive. - */ - public bool IsExclusive() { - return exclusive; - } - /** - * @param exclusive The exclusive to set. - */ - public void SetExclusive(bool exclusive) { - this.exclusive = exclusive; - } - /** - * @return Returns the ordered. - */ - public bool IsOrdered() { - return ordered; - } - /** - * @param ordered The ordered to set. - */ - public void SetOrdered(bool ordered) { - this.ordered = ordered; - } - /** - * @return Returns the orderedTarget. - */ - public String GetOrderedTarget() { - return orderedTarget; - } - /** - * @param orderedTarget The orderedTarget to set. - */ - public void SetOrderedTarget(String orderedTarget) { - this.orderedTarget = orderedTarget; - } - /** - * A helper method to return a descriptive string for the topic or queue - * @param destination - * - * @return a descriptive string for this queue or topic - */ - public static String Inspect(ActiveMQDestination destination) { - if (destination is ITopic) { - return "Topic(" + destination.ToString() + ")"; - } else { - return "Queue(" + destination.ToString() + ")"; - } - } - - /** - * @param destination - * @return @throws JMSException - * @throws javax.jms.JMSException - */ - public static ActiveMQDestination Transform(IDestination destination) { - ActiveMQDestination result = null; - if (destination != null) { - if (destination is ActiveMQDestination) { - result = (ActiveMQDestination) destination; - } else { - if (destination is ITemporaryQueue) { - result = new ActiveMQTempQueue(((IQueue) destination).QueueName); - } else if (destination is ITemporaryTopic) { - result = new ActiveMQTempTopic(((ITopic) destination).TopicName); - } else if (destination is IQueue) { - result = new ActiveMQQueue(((IQueue) destination).QueueName); - } else if (destination is ITopic) { - result = new ActiveMQTopic(((ITopic) destination).TopicName); - } - } - } - return result; - } - - /** - * Create a Destination - * @param type - * @param pyhsicalName - * @return - */ - public static ActiveMQDestination CreateDestination(int type, String pyhsicalName) { - ActiveMQDestination result = null; - if (type == ACTIVEMQ_TOPIC) { - result = new ActiveMQTopic(pyhsicalName); - } else if (type == ACTIVEMQ_TEMPORARY_TOPIC) { - result = new ActiveMQTempTopic(pyhsicalName); - } else if (type == ACTIVEMQ_QUEUE) { - result = new ActiveMQQueue(pyhsicalName); - } else { - result = new ActiveMQTempQueue(pyhsicalName); - } - return result; - } - - /** - * Create a temporary name from the clientId - * - * @param clientId - * @return - */ - public static String CreateTemporaryName(String clientId) { - return TEMP_PREFIX + clientId + TEMP_POSTFIX; - } - - /** - * From a temporary destination find the clientId of the Connection that created it - * - * @param destination - * @return the clientId or null if not a temporary destination - */ - public static String GetClientId(ActiveMQDestination destination) { - String answer = null; - if (destination != null && destination.IsTemporary()) { - String name = destination.PhysicalName; - int start = name.IndexOf(TEMP_PREFIX); - if (start >= 0) { - start += TEMP_PREFIX.Length; - int stop = name.LastIndexOf(TEMP_POSTFIX); - if (stop > start && stop < name.Length) { - answer = name.Substring(start, stop); - } - } - } - return answer; - } - - - /** - * @param o object to compare - * @return 1 if this is less than o else 0 if they are equal or -1 if this is less than o - */ - public int CompareTo(Object o) { - if (o is ActiveMQDestination) { - return CompareTo((ActiveMQDestination) o); - } - return -1; - } - - /** - * Lets sort by name first then lets sort topics greater than queues - * - * @param that another destination to compare against - * @return 1 if this is less than o else 0 if they are equal or -1 if this is less than o - */ - public int CompareTo(ActiveMQDestination that) { - int answer = 0; - if (physicalName != that.physicalName) { - if (physicalName == null) { - return -1; - } else if (that.physicalName == null) { - return 1; - } - answer = physicalName.CompareTo(that.physicalName); - } - if (answer == 0) { - if (IsTopic()) { - if (that.IsQueue()) { - return 1; - } - } else { - if (that.IsTopic()) { - return -1; - } - } - } - return answer; - } - - - /** - * @return Returns the Destination type - */ - - public abstract int GetDestinationType(); - - - public String PhysicalName { - get { return this.physicalName; } - set { this.physicalName = value; } - } - - /** - * Returns true if a temporary Destination - * - * @return true/false - */ - - public bool IsTemporary() { - return GetDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC - || GetDestinationType() == ACTIVEMQ_TEMPORARY_QUEUE; - } - - /** - * Returns true if a Topic Destination - * - * @return true/false - */ - - public bool IsTopic() { - return GetDestinationType() == ACTIVEMQ_TOPIC - || GetDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC; - } - - /** - * Returns true if a Queue Destination - * - * @return true/false - */ - public bool IsQueue() { - return !IsTopic(); - } - - /** - * Returns true if this destination represents a collection of - * destinations; allowing a set of destinations to be published to or subscribed - * from in one JMS operation. - *

- * If this destination is a composite then you can call {@link #getChildDestinations()} - * to return the list of child destinations. - * - * @return true if this destination represents a collection of child destinations. - */ - public bool IsComposite() { - return physicalName.IndexOf(COMPOSITE_SEPARATOR) > 0; - } - - /** - * Returns a list of child destinations if this destination represents a composite - * destination. - * - * @return - */ - /*public List GetChildDestinations() { - List answer = new ArrayList(); - StringTokenizer iter = new StringTokenizer(physicalName, COMPOSITE_SEPARATOR); - while (iter.hasMoreTokens()) { - String name = iter.nextToken(); - Destination child = null; - if (name.StartsWith(QUEUE_PREFIX)) { - child = new ActiveMQQueue(name.Substring(QUEUE_PREFIX.Length)); - } - else if (name.StartsWith(TOPIC_PREFIX)) { - child = new ActiveMQTopic(name.Substring(TOPIC_PREFIX.Length)); - } - else { - child = createDestination(name); - } - answer.add(child); + else + { + if (destination is ITemporaryQueue) + { + result = new ActiveMQTempQueue(((IQueue) destination).QueueName); } - if (answer.size() == 1) { - // lets put ourselves inside the collection - // as we are not really a composite destination - answer.set(0, this); + else if (destination is ITemporaryTopic) + { + result = new ActiveMQTempTopic(((ITopic) destination).TopicName); + } + else if (destination is IQueue) + { + result = new ActiveMQQueue(((IQueue) destination).QueueName); + } + else if (destination is ITopic) + { + result = new ActiveMQTopic(((ITopic) destination).TopicName); } - return answer; - }*/ - - /** - * @return string representation of this instance - */ - - public override String ToString() { - return this.physicalName; } - - /** - * @return hashCode for this instance - */ - - public override int GetHashCode() { - int answer = 37; - - if (this.physicalName != null) { - answer = physicalName.GetHashCode(); - } - if (IsTopic()) { - answer ^= 0xfabfab; - } - return answer; + } + return result; + } + + /** + * Create a Destination + * @param type + * @param pyhsicalName + * @return + */ + public static ActiveMQDestination CreateDestination(int type, String pyhsicalName) + { + ActiveMQDestination result = null; + if (type == ACTIVEMQ_TOPIC) + { + result = new ActiveMQTopic(pyhsicalName); + } + else if (type == ACTIVEMQ_TEMPORARY_TOPIC) + { + result = new ActiveMQTempTopic(pyhsicalName); + } + else if (type == ACTIVEMQ_QUEUE) + { + result = new ActiveMQQueue(pyhsicalName); + } + else + { + result = new ActiveMQTempQueue(pyhsicalName); + } + return result; + } + + /** + * Create a temporary name from the clientId + * + * @param clientId + * @return + */ + public static String CreateTemporaryName(String clientId) + { + return TEMP_PREFIX + clientId + TEMP_POSTFIX; + } + + /** + * From a temporary destination find the clientId of the Connection that created it + * + * @param destination + * @return the clientId or null if not a temporary destination + */ + public static String GetClientId(ActiveMQDestination destination) + { + String answer = null; + if (destination != null && destination.IsTemporary()) + { + String name = destination.PhysicalName; + int start = name.IndexOf(TEMP_PREFIX); + if (start >= 0) + { + start += TEMP_PREFIX.Length; + int stop = name.LastIndexOf(TEMP_POSTFIX); + if (stop > start && stop < name.Length) + { + answer = name.Substring(start, stop); + } } - - /** - * if the object passed in is equivalent, return true - * - * @param obj the object to compare - * @return true if this instance and obj are equivalent - */ - - public override bool Equals(Object obj) { - bool result = this == obj; - if (!result && obj != null && obj is ActiveMQDestination) { - ActiveMQDestination other = (ActiveMQDestination) obj; - result = this.GetDestinationType() == other.GetDestinationType() - && this.physicalName.Equals(other.physicalName); - } - return result; + } + return answer; + } + + + /** + * @param o object to compare + * @return 1 if this is less than o else 0 if they are equal or -1 if this is less than o + */ + public int CompareTo(Object o) + { + if (o is ActiveMQDestination) + { + return CompareTo((ActiveMQDestination) o); + } + return -1; + } + + /** + * Lets sort by name first then lets sort topics greater than queues + * + * @param that another destination to compare against + * @return 1 if this is less than o else 0 if they are equal or -1 if this is less than o + */ + public int CompareTo(ActiveMQDestination that) + { + int answer = 0; + if (physicalName != that.physicalName) + { + if (physicalName == null) + { + return -1; } - - - /** - * @return true if the destination matches multiple possible destinations - */ - public bool IsWildcard() { - if (physicalName != null) { - return physicalName.IndexOf(DestinationFilter.ANY_CHILD) >= 0 - || physicalName.IndexOf(DestinationFilter.ANY_DESCENDENT) >= 0; - } - return false; + else if (that.physicalName == null) + { + return 1; } - - - /** - * Factory method to create a child destination if this destination is a composite - * @param name - * @return the created Destination - */ - public abstract ActiveMQDestination CreateDestination(String name); - } + answer = physicalName.CompareTo(that.physicalName); + } + if (answer == 0) + { + if (IsTopic()) + { + if (that.IsQueue()) + { + return 1; + } + } + else + { + if (that.IsTopic()) + { + return -1; + } + } + } + return answer; + } + + + /** + * @return Returns the Destination type + */ + + public abstract int GetDestinationType(); + + + public String PhysicalName + { + get { return this.physicalName; } + set { this.physicalName = value; } + } + + /** + * Returns true if a temporary Destination + * + * @return true/false + */ + + public bool IsTemporary() + { + return GetDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC + || GetDestinationType() == ACTIVEMQ_TEMPORARY_QUEUE; + } + + /** + * Returns true if a Topic Destination + * + * @return true/false + */ + + public bool IsTopic() + { + return GetDestinationType() == ACTIVEMQ_TOPIC + || GetDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC; + } + + /** + * Returns true if a Queue Destination + * + * @return true/false + */ + public bool IsQueue() + { + return !IsTopic(); + } + + /** + * Returns true if this destination represents a collection of + * destinations; allowing a set of destinations to be published to or subscribed + * from in one JMS operation. + *

+ * If this destination is a composite then you can call {@link #getChildDestinations()} + * to return the list of child destinations. + * + * @return true if this destination represents a collection of child destinations. + */ + public bool IsComposite() + { + return physicalName.IndexOf(COMPOSITE_SEPARATOR) > 0; + } + + /** + * Returns a list of child destinations if this destination represents a composite + * destination. + * + * @return + */ + /*public List GetChildDestinations() { + List answer = new ArrayList(); + StringTokenizer iter = new StringTokenizer(physicalName, COMPOSITE_SEPARATOR); + while (iter.hasMoreTokens()) { + String name = iter.nextToken(); + Destination child = null; + if (name.StartsWith(QUEUE_PREFIX)) { + child = new ActiveMQQueue(name.Substring(QUEUE_PREFIX.Length)); + } + else if (name.StartsWith(TOPIC_PREFIX)) { + child = new ActiveMQTopic(name.Substring(TOPIC_PREFIX.Length)); + } + else { + child = createDestination(name); + } + answer.add(child); + } + if (answer.size() == 1) { + // lets put ourselves inside the collection + // as we are not really a composite destination + answer.set(0, this); + } + return answer; + }*/ + + /** + * @return string representation of this instance + */ + + public override String ToString() + { + return this.physicalName; + } + + /** + * @return hashCode for this instance + */ + + public override int GetHashCode() + { + int answer = 37; + + if (this.physicalName != null) + { + answer = physicalName.GetHashCode(); + } + if (IsTopic()) + { + answer ^= 0xfabfab; + } + return answer; + } + + /** + * if the object passed in is equivalent, return true + * + * @param obj the object to compare + * @return true if this instance and obj are equivalent + */ + + public override bool Equals(Object obj) + { + bool result = this == obj; + if (!result && obj != null && obj is ActiveMQDestination) + { + ActiveMQDestination other = (ActiveMQDestination) obj; + result = this.GetDestinationType() == other.GetDestinationType() + && this.physicalName.Equals(other.physicalName); + } + return result; + } + + + /** + * @return true if the destination matches multiple possible destinations + */ + public bool IsWildcard() + { + if (physicalName != null) + { + return physicalName.IndexOf(DestinationFilter.ANY_CHILD) >= 0 + || physicalName.IndexOf(DestinationFilter.ANY_DESCENDENT) >= 0; + } + return false; + } + + + /** + * Factory method to create a child destination if this destination is a composite + * @param name + * @return the created Destination + */ + public abstract ActiveMQDestination CreateDestination(String name); + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMapMessage.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMapMessage.cs index 32c6044921..5f314dd4f6 100644 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMapMessage.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMapMessage.cs @@ -22,24 +22,40 @@ using OpenWire.Client.Core; namespace OpenWire.Client.Commands { - public class ActiveMQMapMessage : ActiveMQMessage + public class ActiveMQMapMessage : ActiveMQMessage, IMapMessage { - public const byte ID_ActiveMQMapMessage = 25; - - - - - // TODO generate Equals method - // TODO generate GetHashCode method - // TODO generate ToString method - - - public override byte GetDataStructureType() { + public const byte ID_ActiveMQMapMessage = 25; + + private PrimitiveMap body; + + + public override byte GetDataStructureType() + { return ID_ActiveMQMapMessage; } - - - // Properties - + + public IPrimitiveMap Body + { + get { + if (body == null) + { + body = PrimitiveMap.Unmarshal(Content); + } + return body; + } + } + + public void BeforeMarshall(OpenWireFormat wireFormat) + { + base.BeforeMarshall(wireFormat); + + if (body == null) { + Content = null; + } + else { + Content = body.Marshal(); + } + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs index 2ae8b348c0..9e8a05b8f1 100644 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs @@ -20,49 +20,281 @@ using System.Collections; using OpenWire.Client; using OpenWire.Client.Core; -namespace OpenWire.Client.Commands { - public class ActiveMQMessage : Message, IMessage, MarshallAware { - public const byte ID_ActiveMQMessage = 23; - - public static ActiveMQMessage Transform(IMessage message) { - return (ActiveMQMessage) message; - } - - // TODO generate Equals method - // TODO generate GetHashCode method - // TODO generate ToString method - - public override byte GetDataStructureType() { - return ID_ActiveMQMessage; - } - - - public override bool IsMarshallAware() { - return true; - } - - // Properties - public IDestination FromDestination { - get { return Destination; } - set { this.Destination = ActiveMQDestination.Transform(value); } - } - - public void BeforeMarshall(OpenWireFormat wireFormat) { - } - - public void AfterMarshall(OpenWireFormat wireFormat) { - } - - public void BeforeUnmarshall(OpenWireFormat wireFormat) { - } - public void AfterUnmarshall(OpenWireFormat wireFormat) { - } - - public void SetMarshalledForm(OpenWireFormat wireFormat, byte[] data) { - } - - public byte[] GetMarshalledForm(OpenWireFormat wireFormat) { - return null; - } +namespace OpenWire.Client.Commands +{ + public class ActiveMQMessage : Message, IMessage, MarshallAware + { + public const byte ID_ActiveMQMessage = 23; + + protected static MessagePropertyHelper propertyHelper = new MessagePropertyHelper(); + + private PrimitiveMap properties; + + + + public static ActiveMQMessage Transform(IMessage message) + { + return (ActiveMQMessage) message; } + + // TODO generate Equals method + // TODO generate GetHashCode method + + + public override byte GetDataStructureType() + { + return ID_ActiveMQMessage; + } + + + // Properties + + public IPrimitiveMap Properties + { + get { + if (properties == null) + { + properties = PrimitiveMap.Unmarshal(MarshalledProperties); + } + return properties; + } + } + + public IDestination FromDestination + { + get { return Destination; } + set { this.Destination = ActiveMQDestination.Transform(value); } + } + + + // IMessage interface + + // JMS headers + + ///

+ /// The correlation ID used to correlate messages with conversations or long running business processes + /// + public string JMSCorrelationID + { + get { + return CorrelationId; + } + set { + CorrelationId = value; + } + } + + /// + /// The destination of the message + /// + public IDestination JMSDestination + { + get { + return OriginalDestination; + } + } + + /// + /// The time in milliseconds that this message should expire in + /// + public long JMSExpiration + { + get { + return Expiration; + } + set { + Expiration = value; + } + } + + /// + /// The message ID which is set by the provider + /// + public string JMSMessageId + { + get { + return DataStreamMarshaller.ToString(MessageId); + } + } + + /// + /// Whether or not this message is persistent + /// + public bool JMSPersistent + { + get { + return Persistent; + } + set { + Persistent = value; + } + } + + /// + /// The Priority on this message + /// + public byte JMSPriority + { + get { + return Priority; + } + set { + Priority = value; + } + } + + /// + /// Returns true if this message has been redelivered to this or another consumer before being acknowledged successfully. + /// + public bool JMSRedelivered + { + get { + return RedeliveryCounter > 0; + } + } + + + /// + /// The destination that the consumer of this message should send replies to + /// + public IDestination JMSReplyTo + { + get { + return ReplyTo; + } + set { + ReplyTo = ActiveMQDestination.Transform(value); + } + } + + + /// + /// The timestamp the broker added to the message + /// + public long JMSTimestamp + { + get { + return Timestamp; + } + } + + /// + /// The type name of this message + /// + public string JMSType + { + get { + return Type; + } + set { + Type = value; + } + } + + + // JMS Extension headers + + /// + /// Returns the number of times this message has been redelivered to other consumers without being acknowledged successfully. + /// + public int JMSXDeliveryCount + { + get { + return RedeliveryCounter + 1; + } + } + + + /// + /// The Message Group ID used to group messages together to the same consumer for the same group ID value + /// + public string JMSXGroupID + { + get { + return GroupID; + } + set { + GroupID = value; + } + } + /// + /// The Message Group Sequence counter to indicate the position in a group + /// + public int JMSXGroupSeq + { + get { + return GroupSequence; + } + set { + GroupSequence = value; + } + } + + /// + /// Returns the ID of the producers transaction + /// + public string JMSXProducerTXID + { + get { + TransactionId txnId = OriginalTransactionId; + if (txnId == null) + { + txnId = TransactionId; + } + if (txnId != null) + { + return DataStreamMarshaller.ToString(txnId); + } + return null; + } + } + + public object GetObjectProperty(string name) + { + return propertyHelper.GetObjectProperty(this, name); + } + + public void SetObjectProperty(string name, object value) + { + propertyHelper.SetObjectProperty(this, name, value); + } + + // MarshallAware interface + + public override bool IsMarshallAware() + { + return true; + } + + public void BeforeMarshall(OpenWireFormat wireFormat) + { + MarshalledProperties = null; + if (properties != null) + { + MarshalledProperties = properties.Marshal(); + } + } + + public void AfterMarshall(OpenWireFormat wireFormat) + { + } + + public void BeforeUnmarshall(OpenWireFormat wireFormat) + { + } + + public void AfterUnmarshall(OpenWireFormat wireFormat) + { + } + + public void SetMarshalledForm(OpenWireFormat wireFormat, byte[] data) + { + } + + public byte[] GetMarshalledForm(OpenWireFormat wireFormat) + { + return null; + } + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempDestination.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempDestination.cs index 79df7362fa..0fc3402932 100644 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempDestination.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempDestination.cs @@ -1,19 +1,19 @@ /* -* Copyright 2006 The Apache Software Foundation or its licensors, as -* applicable. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ using System; using System.Collections; @@ -34,20 +34,18 @@ namespace OpenWire.Client.Commands public abstract class ActiveMQTempDestination : ActiveMQDestination { public const byte ID_ActiveMQTempDestination = 0; - - - - // TODO generate Equals method - // TODO generate GetHashCode method - // TODO generate ToString method - - - public override byte GetDataStructureType() { + + public ActiveMQTempDestination() : base() + { + } + + public ActiveMQTempDestination(String name) : base(name) + { + } + + public override byte GetDataStructureType() + { return ID_ActiveMQTempDestination; } - - - // Properties - } } diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs index 08e1418845..9e2599ef28 100755 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs @@ -22,15 +22,16 @@ using OpenWire.Client.Core; namespace OpenWire.Client.Commands { /// - /// Summary description for ActiveMQTempQueue. + /// A Temporary Queue /// - public class ActiveMQTempQueue : ActiveMQDestination, ITemporaryQueue + public class ActiveMQTempQueue : ActiveMQTempDestination, ITemporaryQueue { public const byte ID_ActiveMQTempQueue = 102; public ActiveMQTempQueue() : base() { } + public ActiveMQTempQueue(String name) : base(name) { } diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs index 3f92e6cc26..adc3fa5d2d 100755 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs @@ -22,15 +22,16 @@ using OpenWire.Client.Core; namespace OpenWire.Client.Commands { /// - /// Summary description for ActiveMQTempTopic. + /// A Temporary Topic /// - public class ActiveMQTempTopic : ActiveMQDestination, ITemporaryTopic + public class ActiveMQTempTopic : ActiveMQTempDestination, ITemporaryTopic { public const byte ID_ActiveMQTempTopic = 103; public ActiveMQTempTopic() : base() { } + public ActiveMQTempTopic(String name) : base(name) { } diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/TransactionId.cs b/openwire-dotnet/src/OpenWire.Client/Commands/TransactionId.cs index 55e4bcb7b0..87387dbc55 100644 --- a/openwire-dotnet/src/OpenWire.Client/Commands/TransactionId.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/TransactionId.cs @@ -1,19 +1,19 @@ /* -* Copyright 2006 The Apache Software Foundation or its licensors, as -* applicable. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ using System; using System.Collections; @@ -34,42 +34,44 @@ namespace OpenWire.Client.Commands public class TransactionId : AbstractCommand { public const byte ID_TransactionId = 0; - - - public override int GetHashCode() { + + + public override int GetHashCode() + { int answer = 0; return answer; - - } - - - public override bool Equals(object that) { - if (that is TransactionId) { - return Equals((TransactionId) that); - } - return false; - } - - public virtual bool Equals(TransactionId that) { + + } + + + public override bool Equals(object that) + { + if (that is TransactionId) + { + return Equals((TransactionId) that); + } + return false; + } + + public virtual bool Equals(TransactionId that) + { return true; - - } - - - public override string ToString() { + + } + + + public override string ToString() + { return GetType().Name + "[" + " ]"; - - } - - - - public override byte GetDataStructureType() { + + } + + + + public override byte GetDataStructureType() + { return ID_TransactionId; } - - - // Properties - } } diff --git a/openwire-dotnet/src/OpenWire.Client/Connection.cs b/openwire-dotnet/src/OpenWire.Client/Connection.cs index a4a424e1d7..ce725e065a 100755 --- a/openwire-dotnet/src/OpenWire.Client/Connection.cs +++ b/openwire-dotnet/src/OpenWire.Client/Connection.cs @@ -1,19 +1,3 @@ -/* - * Copyright 2006 The Apache Software Foundation or its licensors, as - * applicable. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ using System; using System.Collections; using OpenWire.Client.Commands; @@ -39,7 +23,8 @@ namespace OpenWire.Client private bool closed; private AcknowledgementMode acknowledgementMode; private long sessionCounter; - private IDictionary consumers = new Hashtable();// TODO threadsafe + private long temporaryDestinationCounter; + private IDictionary consumers = new Hashtable(); // TODO threadsafe public Connection(ITransport transport, ConnectionInfo info) @@ -49,7 +34,7 @@ namespace OpenWire.Client this.transport.Command += new CommandHandler(OnCommand); this.transport.Start(); } - + /// /// Starts message delivery for this connection. /// @@ -57,7 +42,8 @@ namespace OpenWire.Client { } - /// + + /// /// Stop message delivery for this connection. /// public void Stop() @@ -159,6 +145,18 @@ namespace OpenWire.Client return answer; } + + /// + /// Creates a new temporary destination name + /// + public String CreateTemporaryDestinationName() + { + lock (this) + { + return info.ConnectionId.Value + ":" + (++temporaryDestinationCounter); + } + } + protected void CheckConnected() { if (closed) diff --git a/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs index 7a91b1e48b..edd66725e2 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs @@ -17,10 +17,12 @@ using System; using System.IO; using System.Net; +using System.Text; using OpenWire.Client.Commands; using OpenWire.Client.Core; using OpenWire.Client.IO; +using System.Collections; namespace OpenWire.Client.Core { @@ -30,6 +32,36 @@ namespace OpenWire.Client.Core /// public abstract class DataStreamMarshaller { + public const byte NULL = 0; + public const byte BOOLEAN_TYPE = 1; + public const byte BYTE_TYPE = 2; + public const byte CHAR_TYPE = 3; + public const byte SHORT_TYPE = 4; + public const byte INTEGER_TYPE = 5; + public const byte LONG_TYPE = 6; + public const byte DOUBLE_TYPE = 7; + public const byte FLOAT_TYPE = 8; + public const byte STRING_TYPE = 9; + public const byte BYTE_ARRAY_TYPE = 10; + + private static String[] HEX_TABLE = new String[]{ + "00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "0a", "0b", "0c", "0d", "0e", "0f", + "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "1a", "1b", "1c", "1d", "1e", "1f", + "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "2a", "2b", "2c", "2d", "2e", "2f", + "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "3a", "3b", "3c", "3d", "3e", "3f", + "40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "4a", "4b", "4c", "4d", "4e", "4f", + "50", "51", "52", "53", "54", "55", "56", "57", "58", "59", "5a", "5b", "5c", "5d", "5e", "5f", + "60", "61", "62", "63", "64", "65", "66", "67", "68", "69", "6a", "6b", "6c", "6d", "6e", "6f", + "70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "7a", "7b", "7c", "7d", "7e", "7f", + "80", "81", "82", "83", "84", "85", "86", "87", "88", "89", "8a", "8b", "8c", "8d", "8e", "8f", + "90", "91", "92", "93", "94", "95", "96", "97", "98", "99", "9a", "9b", "9c", "9d", "9e", "9f", + "a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9", "aa", "ab", "ac", "ad", "ae", "af", + "b0", "b1", "b2", "b3", "b4", "b5", "b6", "b7", "b8", "b9", "ba", "bb", "bc", "bd", "be", "bf", + "c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "ca", "cb", "cc", "cd", "ce", "cf", + "d0", "d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9", "da", "db", "dc", "dd", "de", "df", + "e0", "e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "ea", "eb", "ec", "ed", "ee", "ef", + "f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "fa", "fb", "fc", "fd", "fe", "ff", + }; public abstract DataStructure CreateObject(); public abstract byte GetDataStructureType(); @@ -156,19 +188,11 @@ namespace OpenWire.Client.Core { if (bs.ReadBoolean()) { - int size = ReadShort(dataIn); - byte[] data = new byte[size]; - dataIn.Read(data, 0, size); - char[] text = new char[size]; - for (int i = 0; i < size; i++) - { - text[i] = (char) data[i]; - } - return new String(text); + return ReadAsciiString(dataIn); } else { - return dataIn.ReadString(); + return ReadUTF8(dataIn); } } else @@ -177,48 +201,61 @@ namespace OpenWire.Client.Core } } + protected virtual String ReadAsciiString(BinaryReader dataIn) + { + int size = ReadShort(dataIn); + byte[] data = new byte[size]; + dataIn.Read(data, 0, size); + char[] text = new char[size]; + for (int i = 0; i < size; i++) + { + text[i] = (char) data[i]; + } + return new String(text); + } + protected virtual int WriteString(String value, BooleanStream bs) { bs.WriteBoolean(value != null); if (value != null) { int strlen = value.Length; - + // TODO until we get UTF8 working, lets just force ASCII bs.WriteBoolean(true); return strlen + 2; /* - int utflen = 0; - int c = 0; - bool isOnlyAscii = true; - char[] charr = value.ToCharArray(); - for (int i = 0; i < strlen; i++) - { - c = charr[i]; - if ((c >= 0x0001) && (c <= 0x007F)) - { - utflen++; - } - else if (c > 0x07FF) - { - utflen += 3; - isOnlyAscii = false; - } - else - { - isOnlyAscii = false; - utflen += 2; - } - } - - if (utflen >= Int16.MaxValue) - throw new IOException("Encountered a String value that is too long to encode."); - - bs.WriteBoolean(isOnlyAscii); - return utflen + 2; - */ + int utflen = 0; + int c = 0; + bool isOnlyAscii = true; + char[] charr = value.ToCharArray(); + for (int i = 0; i < strlen; i++) + { + c = charr[i]; + if ((c >= 0x0001) && (c <= 0x007F)) + { + utflen++; + } + else if (c > 0x07FF) + { + utflen += 3; + isOnlyAscii = false; + } + else + { + isOnlyAscii = false; + utflen += 2; + } + } + + if (utflen >= Int16.MaxValue) + throw new IOException("Encountered a String value that is too long to encode."); + + bs.WriteBoolean(isOnlyAscii); + return utflen + 2; + */ } else { @@ -236,7 +273,8 @@ namespace OpenWire.Client.Core WriteShort((short) value.Length, dataOut); // now lets write the bytes char[] chars = value.ToCharArray(); - for (int i = 0; i < chars.Length; i++) { + for (int i = 0; i < chars.Length; i++) + { WriteByte((byte) chars[i], dataOut); } } @@ -280,7 +318,7 @@ namespace OpenWire.Client.Core public static void WriteChar(char value, BinaryWriter dataOut) { - dataOut.Write(SwitchEndian(value)); + dataOut.Write(SwitchEndian((short) value)); } public static void WriteShort(short value, BinaryWriter dataOut) @@ -313,7 +351,8 @@ namespace OpenWire.Client.Core public static long SwitchEndian(long x) { long answer = 0; - for (int i = 0; i < 8; i++) { + for (int i = 0; i < 8; i++) + { long lowest = x & 0xff; x >>= 8; answer <<= 8; @@ -528,5 +567,418 @@ namespace OpenWire.Client.Core WriteString(o.Message, dataOut, bs); } } + + /// + /// Marshals the primitive type map to a byte array + /// + public static byte[] MarshalPrimitiveMap(IDictionary map) + { + if (map == null) + { + return null; + } + else + { + MemoryStream memoryStream = new MemoryStream(); + MarshalPrimitiveMap(map, new BinaryWriter(memoryStream)); + return memoryStream.GetBuffer(); + } + } + public static void MarshalPrimitiveMap(IDictionary map, BinaryWriter dataOut) + { + if (map == null) + { + WriteInt(-1, dataOut); + } + else + { + WriteInt(map.Count, dataOut); + foreach (DictionaryEntry entry in map) + { + String name = (String) entry.Key; + WriteUTF8(name, dataOut); + Object value = entry.Value; + MarshalPrimitive(dataOut, value); + } + }} + + + + /// + /// Unmarshals the primitive type map from the given byte array + /// + public static IDictionary UnmarshalPrimitiveMap(byte[] data) + { + if (data == null) + { + return new Hashtable(); + } + else + { + return UnmarshalPrimitiveMap(new BinaryReader(new MemoryStream(data))); + } + } + + public static IDictionary UnmarshalPrimitiveMap(BinaryReader dataIn) + { + int size = ReadInt(dataIn); + if (size < 0) + { + return null; + } + else + { + IDictionary answer = new Hashtable(size); + for (int i=0; i < size; i++) + { + String name = ReadUTF8(dataIn); + answer[name] = UnmarshalPrimitive(dataIn); + } + return answer; + } + + } + + public static void MarshalPrimitive(BinaryWriter dataOut, Object value) + { + if (value == null) + { + WriteByte(NULL, dataOut); + } + else if (value is bool) + { + WriteByte(BOOLEAN_TYPE, dataOut); + WriteBoolean((bool) value, dataOut); + } + else if (value is byte) + { + WriteByte(BYTE_TYPE, dataOut); + WriteByte(((Byte)value), dataOut); + } + else if (value is char) + { + WriteByte(CHAR_TYPE, dataOut); + WriteChar((char) value, dataOut); + } + else if (value is short) + { + WriteByte(SHORT_TYPE, dataOut); + WriteShort((short) value, dataOut); + } + else if (value is int) + { + WriteByte(INTEGER_TYPE, dataOut); + WriteInt((int) value, dataOut); + } + else if (value is long) + { + WriteByte(LONG_TYPE, dataOut); + WriteLong((long) value, dataOut); + } + else if (value is float) + { + WriteByte(FLOAT_TYPE, dataOut); + WriteFloat((float) value, dataOut); + } + else if (value is double) + { + WriteByte(DOUBLE_TYPE, dataOut); + WriteDouble((double) value, dataOut); + } + else if (value is byte[]) + { + byte[] data = (byte[]) value; + WriteByte(BYTE_ARRAY_TYPE, dataOut); + WriteInt(data.Length, dataOut); + dataOut.Write(data); + } + else if (value is string) + { + WriteByte(STRING_TYPE, dataOut); + WriteUTF8((string) value, dataOut); + } + else + { + throw new IOException("Object is not a primitive: " + value); + } + } + + public static Object UnmarshalPrimitive(BinaryReader dataIn) + { + Object value=null; + switch (ReadByte(dataIn)) + { + case BYTE_TYPE: + value = ReadByte(dataIn); + break; + case BOOLEAN_TYPE: + value = ReadBoolean(dataIn); + break; + case CHAR_TYPE: + value = ReadChar(dataIn); + break; + case SHORT_TYPE: + value = ReadShort(dataIn); + break; + case INTEGER_TYPE: + value = ReadInt(dataIn); + break; + case LONG_TYPE: + value = ReadLong(dataIn); + break; + case FLOAT_TYPE: + value = ReadFloat(dataIn); + break; + case DOUBLE_TYPE: + value = ReadDouble(dataIn); + break; + case BYTE_ARRAY_TYPE: + int size = ReadInt(dataIn); + byte[] data = new byte[size]; + dataIn.Read(data, 0, size); + value = data; + break; + case STRING_TYPE: + value = ReadUTF8(dataIn); + break; + } + return value; + } + + private static Object ReadDouble(BinaryReader dataIn) + { + // TODO: Implement this method + return dataIn.ReadDouble(); + } + + /// + /// Method ReadFloat + /// + /// A BinaryReader + /// An Object + private static Object ReadFloat(BinaryReader dataIn) + { + // TODO: Implement this method + return (float) dataIn.ReadDouble(); + } + + private static Object ReadBoolean(BinaryReader dataIn) + { + // TODO: Implement this method + return dataIn.ReadBoolean(); + } + + private static void WriteDouble(double value, BinaryWriter dataOut) + { + // TODO: Implement this method + dataOut.Write(value); + } + + private static void WriteFloat(float value, BinaryWriter dataOut) + { + // TODO: Implement this method + dataOut.Write(value); + } + + private static void WriteBoolean(bool value, BinaryWriter dataOut) + { + // TODO: Implement this method + dataOut.Write(value); + } + + + public static void WriteUTF8(String text, BinaryWriter dataOut) + { + if (text != null) + { + int strlen = text.Length; + int utflen = 0; + int c, count = 0; + + char[] charr = text.ToCharArray(); + + for (int i = 0; i < strlen; i++) + { + c = charr[i]; + if ((c >= 0x0001) && (c <= 0x007F)) + { + utflen++; + } + else if (c > 0x07FF) + { + utflen += 3; + } + else + { + utflen += 2; + } + } + + WriteInt(utflen, dataOut); + byte[] bytearr = new byte[utflen]; + /* + byte[] bytearr = new byte[utflen + 4]; + bytearr[count++] = (byte) ((utflen >>> 24) & 0xFF); + bytearr[count++] = (byte) ((utflen >>> 16) & 0xFF); + bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); + bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF); + */ + for (int i = 0; i < strlen; i++) + { + c = charr[i]; + if ((c >= 0x0001) && (c <= 0x007F)) + { + bytearr[count++] = (byte) c; + } + else if (c > 0x07FF) + { + bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } + else + { + bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } + } + dataOut.Write(bytearr); + + } + else + { + WriteInt(-1, dataOut); + } + } + + public static String ReadUTF8(BinaryReader dataIn) + { + int utflen = ReadInt(dataIn); + if (utflen > -1) + { + StringBuilder str = new StringBuilder(utflen); + + byte[] bytearr = new byte[utflen]; + int c, char2, char3; + int count = 0; + + dataIn.Read(bytearr, 0, utflen); + + while (count < utflen) + { + c = bytearr[count] & 0xff; + switch (c >> 4) + { + case 0: + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + /* 0xxxxxxx */ + count++; + str.Append((char) c); + break; + case 12: + case 13: + /* 110x xxxx 10xx xxxx */ + count += 2; + if (count > utflen) + { + throw CreateDataFormatException(); + } + char2 = bytearr[count - 1]; + if ((char2 & 0xC0) != 0x80) + { + throw CreateDataFormatException(); + } + str.Append((char) (((c & 0x1F) << 6) | (char2 & 0x3F))); + break; + case 14: + /* 1110 xxxx 10xx xxxx 10xx xxxx */ + count += 3; + if (count > utflen) + { + throw CreateDataFormatException(); + } + char2 = bytearr[count - 2]; + char3 = bytearr[count - 1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) + { + throw CreateDataFormatException(); + } + str.Append((char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0))); + break; + default : + /* 10xx xxxx, 1111 xxxx */ + throw CreateDataFormatException(); + } + } +// The number of chars produced may be less than utflen + return str.ToString(); + } + else + { + return null; + } + } + + private static Exception CreateDataFormatException() + { + // TODO: implement a better exception + return new Exception("Data format error!"); + } + + + /// + /// Converts the object to a String + /// + public static string ToString(MessageId id) + { + return ToString(id.ProducerId) + ":" + id.ProducerSequenceId; + } + /// + /// Converts the object to a String + /// + public static string ToString(ProducerId id) + { + return id.ConnectionId + ":" + id.SessionId + ":" + id.Value; + } + + + /// + /// Converts the given transaction ID into a String + /// + public static String ToString(TransactionId txnId) + { + if (txnId is LocalTransactionId) + { + LocalTransactionId ltxnId = (LocalTransactionId) txnId; + return "" + ltxnId.Value; + } + else if (txnId is XATransactionId) + { + XATransactionId xaTxnId = (XATransactionId) txnId; + return "XID:" + xaTxnId.FormatId + ":" + ToHexFromBytes(xaTxnId.GlobalTransactionId) + ":" + ToHexFromBytes(xaTxnId.BranchQualifier); + } + return null; + } + + /// + /// Creates the byte array into hexidecimal + /// + public static String ToHexFromBytes(byte[] data) + { + StringBuilder buffer = new StringBuilder(data.Length * 2); + for (int i = 0; i < data.Length; i++) + { + buffer.Append(HEX_TABLE[0xFF & data[i]]); + } + return buffer.ToString(); + } + } } + diff --git a/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs b/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs index ada65d31c9..2255d17bcb 100644 --- a/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs @@ -18,7 +18,7 @@ using System.Collections; using OpenWire.Client.Commands; using System; -namespace OpenWire.Client +namespace OpenWire.Client.Core { /// /// Handles the multi-threaded dispatching between the transport and the consumers @@ -42,8 +42,10 @@ namespace OpenWire.Client /// An IMessage public IMessage DequeueNoWait() { - lock (queue) { - if (queue.Peek() != null) { + lock (queue) + { + if (queue.Peek() != null) + { return (IMessage) queue.Dequeue(); } } @@ -69,7 +71,7 @@ namespace OpenWire.Client { return (IMessage) queue.Dequeue(); } - + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/MessagePropertyHelper.cs b/openwire-dotnet/src/OpenWire.Client/Core/MessagePropertyHelper.cs new file mode 100644 index 0000000000..46258688d9 --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/Core/MessagePropertyHelper.cs @@ -0,0 +1,57 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +using System; +using System.Collections; + +using OpenWire.Client; +using OpenWire.Client.Core; +using OpenWire.Client.Commands; + +namespace OpenWire.Client.Core +{ + public delegate object PropertyGetter(ActiveMQMessage message); + public delegate void PropertySetter(ActiveMQMessage message, object value); + + public class MessagePropertyHelper + { + private IDictionary setters = new Hashtable(); + private IDictionary getters = new Hashtable(); + + public MessagePropertyHelper() + { + // TODO find all of the JMS properties via introspection + } + + + public object GetObjectProperty(ActiveMQMessage message, string name) { + object getter = getters[name]; + if (getter != null) { + } + return message.Properties[name]; + } + + public void SetObjectProperty(ActiveMQMessage message, string name, object value) { + PropertySetter setter = (PropertySetter) setters[name]; + if (setter != null) { + setter(message, value); + } + else { + message.Properties[name] = value; + } + } + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/Core/PrimitiveMap.cs b/openwire-dotnet/src/OpenWire.Client/Core/PrimitiveMap.cs new file mode 100644 index 0000000000..151ec83bef --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/Core/PrimitiveMap.cs @@ -0,0 +1,241 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +using System; +using System.Collections; + +using OpenWire.Client; +using OpenWire.Client.Core; + +namespace OpenWire.Client.Core +{ + /// + /// A default implementation of IPrimitiveMap + /// + public class PrimitiveMap : IPrimitiveMap + { + private IDictionary dictionary = new Hashtable(); + + + /// + /// Unmarshalls the map from the given data or if the data is null just + /// return an empty map + /// + public static PrimitiveMap Unmarshal(byte[] data) + { + PrimitiveMap answer = new PrimitiveMap(); + answer.dictionary = DataStreamMarshaller.UnmarshalPrimitiveMap(data); + return answer; + } + + public byte[] Marshal() + { + return DataStreamMarshaller.MarshalPrimitiveMap(dictionary); + } + + + public void Clear() + { + dictionary.Clear(); + } + + public bool Contains(Object key) + { + return dictionary.Contains(key); + } + + public void Remove(Object key) + { + dictionary.Remove(key); + } + + + public int Count + { + get { + return dictionary.Count; + } + } + + public ICollection Keys + { + get { + return dictionary.Keys; + } + } + + public ICollection Values + { + get { + return dictionary.Values; + } + } + + public object this[string key] + { + get { + return GetValue(key); + } + set { + CheckValidType(value); + SetValue(key, value); + } + } + + public string GetString(string key) + { + Object value = GetValue(key); + CheckValueType(value, typeof(string)); + return (string) value; + } + + public void SetString(string key, string value) + { + SetValue(key, value); + } + + public bool GetBool(String key) + { + Object value = GetValue(key); + CheckValueType(value, typeof(bool)); + return (bool) value; + } + + public void SetByte(String key, bool value) + { + SetValue(key, value); + } + + public byte GetByte(String key) + { + Object value = GetValue(key); + CheckValueType(value, typeof(byte)); + return (byte) value; + } + + public void SetByte(String key, byte value) + { + SetValue(key, value); + } + + public char GetChar(String key) + { + Object value = GetValue(key); + CheckValueType(value, typeof(char)); + return (char) value; + } + + public void SetChar(String key, char value) + { + SetValue(key, value); + } + + public short GetShort(String key) + { + Object value = GetValue(key); + CheckValueType(value, typeof(short)); + return (short) value; + } + + public void SetShort(String key, short value) + { + SetValue(key, value); + } + + public int GetInt(String key) + { + Object value = GetValue(key); + CheckValueType(value, typeof(int)); + return (int) value; + } + + public void SetInt(String key, int value) + { + SetValue(key, value); + } + + public long GetLong(String key) + { + Object value = GetValue(key); + CheckValueType(value, typeof(long)); + return (long) value; + } + + public void SetLong(String key, long value) + { + SetValue(key, value); + } + + public float GetFloat(String key) + { + Object value = GetValue(key); + CheckValueType(value, typeof(float)); + return (float) value; + } + + public void SetFloat(String key, float value) + { + SetValue(key, value); + } + + public double GetDouble(String key) + { + Object value = GetValue(key); + CheckValueType(value, typeof(double)); + return (double) value; + } + + public void SetDouble(String key, double value) + { + SetValue(key, value); + } + + + + + protected virtual void SetValue(String key, Object value) + { + dictionary[key] = value; + } + + + protected virtual Object GetValue(String key) + { + return dictionary[key]; + } + + protected virtual void CheckValueType(Object value, Type type) + { + if (! type.IsInstanceOfType(value)) + { + throw new OpenWireException("Expected type: " + type.Name + " but was: " + value); + } + } + + protected virtual void CheckValidType(Object value) + { + if (value != null) + { + Type type = value.GetType(); + if (! type.IsPrimitive && !type.IsValueType && !type.IsAssignableFrom(typeof(string))) + { + throw new OpenWireException("Invalid type: " + type.Name + " for value: " + value); + } + } + } + + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/IStopable.cs b/openwire-dotnet/src/OpenWire.Client/IBytesMessage.cs old mode 100755 new mode 100644 similarity index 72% rename from openwire-dotnet/src/OpenWire.Client/IStopable.cs rename to openwire-dotnet/src/OpenWire.Client/IBytesMessage.cs index c824724df6..d357c1119a --- a/openwire-dotnet/src/OpenWire.Client/IStopable.cs +++ b/openwire-dotnet/src/OpenWire.Client/IBytesMessage.cs @@ -13,11 +13,22 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ +*/ +using System; +using OpenWire.Client.Commands; + namespace OpenWire.Client { - public interface IStopable + /// + /// Represents a binary based message + /// + public interface IBytesMessage : IMessage { - void Stop(); + + byte[] Content + { + get; + set; + } } } diff --git a/openwire-dotnet/src/OpenWire.Client/IConnection.cs b/openwire-dotnet/src/OpenWire.Client/IConnection.cs index 8ec0eb9a47..e64893f12c 100755 --- a/openwire-dotnet/src/OpenWire.Client/IConnection.cs +++ b/openwire-dotnet/src/OpenWire.Client/IConnection.cs @@ -20,14 +20,14 @@ using OpenWire.Client.Commands; namespace OpenWire.Client { public enum AcknowledgementMode { - Unknown, AutoAcknowledge, ClientAcknowledge, Transactional + Unknown, AutoAcknowledge, ClientAcknowledge, Transactional } /// /// Represents a connection with a message broker /// - public interface IConnection : IDisposable, IStartable, IStopable { + public interface IConnection : IDisposable, IStartable { /// /// Creates a new session to work on this connection @@ -44,20 +44,20 @@ namespace OpenWire.Client { bool Transacted { get; - set; + set; } AcknowledgementMode AcknowledgementMode { get; - set; + set; } - String ClientId - { - get; - set; - } + String ClientId + { + get; + set; + } - - } + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/IMapMessage.cs b/openwire-dotnet/src/OpenWire.Client/IMapMessage.cs new file mode 100644 index 0000000000..a42eae84a5 --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/IMapMessage.cs @@ -0,0 +1,33 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +using System; +using OpenWire.Client.Commands; + +namespace OpenWire.Client +{ + /// + /// Represents a Map message which contains key and value pairs which are + /// of primitive types + /// + public interface IMapMessage : IMessage + { + IPrimitiveMap Body { + get; + } + } +} + diff --git a/openwire-dotnet/src/OpenWire.Client/IMessage.cs b/openwire-dotnet/src/OpenWire.Client/IMessage.cs index 9a132f4211..37d838defb 100755 --- a/openwire-dotnet/src/OpenWire.Client/IMessage.cs +++ b/openwire-dotnet/src/OpenWire.Client/IMessage.cs @@ -17,16 +17,145 @@ using System; using OpenWire.Client.Commands; -namespace OpenWire.Client { +namespace OpenWire.Client +{ /// /// Represents a message either to be sent to a message broker or received from a message broker /// - public interface IMessage { - - IDestination FromDestination { - get; + public interface IMessage + { + + /// + /// Provides access to the message properties (headers) + /// + IPrimitiveMap Properties { + get; } - - - } + + /// + /// The correlation ID used to correlate messages from conversations or long running business processes + /// + string JMSCorrelationID + { + get; + set; + } + + /// + /// The destination of the message + /// + IDestination JMSDestination + { + get; + } + + /// + /// The time in milliseconds that this message should expire in + /// + long JMSExpiration + { + get; + set; + } + + /// + /// The message ID which is set by the provider + /// + string JMSMessageId + { + get; + } + + /// + /// Whether or not this message is persistent + /// + bool JMSPersistent + { + get; + set; + } + + /// + /// The Priority on this message + /// + byte JMSPriority + { + get; + set; + } + + /// + /// Returns true if this message has been redelivered to this or another consumer before being acknowledged successfully. + /// + bool JMSRedelivered + { + get; + } + + + /// + /// The destination that the consumer of this message should send replies to + /// + IDestination JMSReplyTo + { + get; + set; + } + + + /// + /// The timestamp the broker added to the message + /// + long JMSTimestamp + { + get; + } + + /// + /// The type name of this message + /// + string JMSType + { + get; + set; + } + + + // JMS Extension headers + + /// + /// Returns the number of times this message has been redelivered to other consumers without being acknowledged successfully. + /// + int JMSXDeliveryCount + { + get; + } + + + /// + /// The Message Group ID used to group messages together to the same consumer for the same group ID value + /// + string JMSXGroupID + { + get; + set; + } + /// + /// The Message Group Sequence counter to indicate the position in a group + /// + int JMSXGroupSeq + { + get; + set; + } + + /// + /// Returns the ID of the producers transaction + /// + string JMSXProducerTXID + { + get; + } + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/IPrimitiveMap.cs b/openwire-dotnet/src/OpenWire.Client/IPrimitiveMap.cs new file mode 100644 index 0000000000..0f86621fa7 --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/IPrimitiveMap.cs @@ -0,0 +1,86 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +using System; +using System.Collections; + +using OpenWire.Client.Commands; + +namespace OpenWire.Client +{ + /// + /// Represents a Map of primitive types where the keys are all string instances + /// and the values are strings or numbers. + /// + public interface IPrimitiveMap + { + + void Clear(); + + bool Contains(object key); + + void Remove(object key); + + int Count + { + get; + } + + ICollection Keys + { + get; + } + + ICollection Values + { + get; + } + + object this[string key] + { + get; + set; + } + + string GetString(string key); + void SetString(string key, string value); + + bool GetBool(string key); + void SetByte(string key, bool value); + + byte GetByte(string key); + void SetByte(string key, byte value); + + char GetChar(string key); + void SetChar(string key, char value); + + short GetShort(string key); + void SetShort(string key, short value); + + int GetInt(string key); + void SetInt(string key, int value); + + long GetLong(string key); + void SetLong(string key, long value); + + float GetFloat(string key); + void SetFloat(string key, float value); + + double GetDouble(string key); + void SetDouble(string key, double value); + + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/ISession.cs b/openwire-dotnet/src/OpenWire.Client/ISession.cs index 9d13713d47..a8b27c709e 100755 --- a/openwire-dotnet/src/OpenWire.Client/ISession.cs +++ b/openwire-dotnet/src/OpenWire.Client/ISession.cs @@ -17,34 +17,39 @@ using System; using OpenWire.Client.Commands; -namespace OpenWire.Client { +namespace OpenWire.Client +{ /// /// Represents a single unit of work on an IConnection. /// So the ISession can be used to perform transactional receive and sends /// - public interface ISession : IDisposable { - + public interface ISession : IDisposable + { + + + + /// /// Creates a producer of messages /// IMessageProducer CreateProducer(); - + /// /// Creates a producer of messages on a given destination /// - IMessageProducer CreateProducer(IDestination destination); - + IMessageProducer CreateProducer(IDestination destination); + /// /// Creates a consumer of messages on a given destination /// - IMessageConsumer CreateConsumer(IDestination destination); - + IMessageConsumer CreateConsumer(IDestination destination); + /// /// Creates a consumer of messages on a given destination with a selector /// IMessageConsumer CreateConsumer(IDestination destination, string selector); - - /// + + /// /// Creates a named durable consumer of messages on a given destination with a selector /// IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal); @@ -52,26 +57,56 @@ namespace OpenWire.Client { /// /// Returns the queue for the given name /// - IQueue GetQueue(string name); + IQueue GetQueue(string name); /// /// Returns the topic for the given name /// - ITopic GetTopic(string name); + ITopic GetTopic(string name); + + + /// + /// Creates a temporary queue + /// + ITemporaryQueue CreateTemporaryQueue(); + /// + /// Creates a temporary topic + /// + ITemporaryTopic CreateTemporaryTopic(); + + + // Factory methods to create messages + /// /// Creates a new message with an empty body /// IMessage CreateMessage(); - + /// /// Creates a new text message with an empty body /// ITextMessage CreateTextMessage(); - + /// /// Creates a new text message with the given body /// ITextMessage CreateTextMessage(string text); - } + + /// + /// Creates a new Map message which contains primitive key and value pairs + /// + IMapMessage CreateMapMessage(); + + /// + /// Creates a new binary message + /// + IBytesMessage CreateBytesMessage(); + + /// + /// Creates a new binary message with the given body + /// + IBytesMessage CreateBytesMessage(byte[] body); + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/ITextMessage.cs b/openwire-dotnet/src/OpenWire.Client/ITextMessage.cs index bf5459a6a3..ab3c450aa7 100755 --- a/openwire-dotnet/src/OpenWire.Client/ITextMessage.cs +++ b/openwire-dotnet/src/OpenWire.Client/ITextMessage.cs @@ -17,15 +17,18 @@ using System; using OpenWire.Client.Commands; -namespace OpenWire.Client { - /// - /// Represents a text based message - /// - public interface ITextMessage : IMessage { - - string Text { - get; - set; - } - } +namespace OpenWire.Client +{ + /// + /// Represents a text based message + /// + public interface ITextMessage : IMessage + { + + string Text + { + get; + set; + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs index 90dfca9abf..49ee118323 100755 --- a/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs +++ b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs @@ -18,6 +18,7 @@ using System; using System.Collections; using System.Threading; using OpenWire.Client.Commands; +using OpenWire.Client.Core; namespace OpenWire.Client { diff --git a/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs b/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs index 76799c49f1..3496e3eb31 100755 --- a/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs +++ b/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs @@ -28,13 +28,13 @@ namespace OpenWire.Client private Session session; private ProducerInfo info; private long messageCounter; - + bool persistent; long timeToLive; int priority; bool disableMessageID; bool disableMessageTimestamp; - + public MessageProducer(Session session, ProducerInfo info) { this.session = session; @@ -57,7 +57,7 @@ namespace OpenWire.Client ActiveMQMessage activeMessage = (ActiveMQMessage) message; activeMessage.MessageId = id; activeMessage.ProducerId = info.ProducerId; - activeMessage.Destination = (ActiveMQDestination) destination; + activeMessage.Destination = ActiveMQDestination.Transform(destination); session.DoSend(destination, message); } @@ -71,14 +71,13 @@ namespace OpenWire.Client { get { return persistent; } set { this.persistent = value; } - } + } public long TimeToLive { get { return timeToLive; } set { this.timeToLive = value; } - } - +} public int Priority { get { return priority; } diff --git a/openwire-dotnet/src/OpenWire.Client/Session.cs b/openwire-dotnet/src/OpenWire.Client/Session.cs index 473911d095..7f1b57bc76 100755 --- a/openwire-dotnet/src/OpenWire.Client/Session.cs +++ b/openwire-dotnet/src/OpenWire.Client/Session.cs @@ -124,6 +124,18 @@ namespace OpenWire.Client return new ActiveMQTopic(name); } + public ITemporaryQueue CreateTemporaryQueue() + { + return new ActiveMQTempQueue(connection.CreateTemporaryDestinationName()); + } + + public ITemporaryTopic CreateTemporaryTopic() + { + return new ActiveMQTempTopic(connection.CreateTemporaryDestinationName()); + } + + + public IMessage CreateMessage() { ActiveMQMessage answer = new ActiveMQMessage(); @@ -146,6 +158,26 @@ namespace OpenWire.Client return answer; } + public IMapMessage CreateMapMessage() + { + return new ActiveMQMapMessage(); + } + + public IBytesMessage CreateBytesMessage() + { + return new ActiveMQBytesMessage(); + } + + public IBytesMessage CreateBytesMessage(byte[] body) + { + ActiveMQBytesMessage answer = new ActiveMQBytesMessage(); + answer.Content = body; + return answer; + } + + + + // Implementation methods public void DoSend(IDestination destination, IMessage message) { @@ -181,7 +213,7 @@ namespace OpenWire.Client id.Value = ++consumerCounter; } answer.ConsumerId = id; - answer.Destination = (ActiveMQDestination) destination; + answer.Destination = ActiveMQDestination.Transform(destination); answer.Selector = selector; answer.PrefetchSize = prefetchSize; @@ -200,7 +232,7 @@ namespace OpenWire.Client id.Value = ++producerCounter; } answer.ProducerId = id; - answer.Destination = (ActiveMQDestination) destination; + answer.Destination = ActiveMQDestination.Transform(destination); return answer; } diff --git a/openwire-dotnet/tests/OpenWire.Client/BytesMessageTest.cs b/openwire-dotnet/tests/OpenWire.Client/BytesMessageTest.cs new file mode 100644 index 0000000000..4975ce0bd4 --- /dev/null +++ b/openwire-dotnet/tests/OpenWire.Client/BytesMessageTest.cs @@ -0,0 +1,58 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +using System; +using System.IO; + +using NUnit.Framework; + +using OpenWire.Client; +using OpenWire.Client.Core; + +namespace OpenWire.Client +{ + [ TestFixture ] + public class BytesMessageTest : TestSupport + { + byte[] expected = {1, 2, 3, 4, 5, 6, 7, 8}; + + [ Test ] + public override void SendAndSyncReceive() + { + base.SendAndSyncReceive(); + } + + protected override IMessage CreateMessage(ISession session) + { + IBytesMessage request = session.CreateBytesMessage(expected); + return request; + } + + protected override void AssertValidMessage(IMessage message) + { + Assert.IsTrue(message is IBytesMessage, "Did not receive a IBytesMessage: " + message); + + Console.WriteLine("Received IBytesMessage: " + message); + + IBytesMessage bytesMessage = (IBytesMessage) message; + byte[] actual = bytesMessage.Content; + Console.WriteLine("Received message with content: " + actual); + Assert.AreEqual(expected, actual, "the message content"); + } + + } +} + diff --git a/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs b/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs index 7c9cf4e3c6..2770948820 100644 --- a/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs +++ b/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs @@ -27,46 +27,28 @@ namespace OpenWire.Client [ TestFixture ] public class ClientTest : TestSupport { + string expected = "Hello World!"; + [ Test ] - public void SendAndSyncReceive() + public override void SendAndSyncReceive() { - IConnectionFactory factory = new ConnectionFactory("localhost", 61616); - - Assert.IsTrue(factory != null, "no factory created"); - - using (IConnection connection = factory.CreateConnection()) - { - try - { - Assert.IsTrue(connection != null, "no connection created"); - Console.WriteLine("Connected to ActiveMQ!"); - - ISession session = connection.CreateSession(); - - IDestination destination = session.GetQueue("FOO.BAR"); - Assert.IsTrue(destination != null, "No queue available!"); - - IMessageConsumer consumer = session.CreateConsumer(destination); - - IMessageProducer producer = session.CreateProducer(destination); - - string expected = "Hello World!"; - ITextMessage request = session.CreateTextMessage(expected); - - producer.Send(request); - - ITextMessage message = (ITextMessage) consumer.Receive(); - - Assert.IsNotNull(message, "No message returned!"); - - Assert.AreEqual(expected, message.Text, "the message text"); - } - catch (Exception e) - { - Console.WriteLine("Caught: " + e); - } - } + base.SendAndSyncReceive(); } + + protected override IMessage CreateMessage(ISession session) + { + IMessage request = session.CreateTextMessage(expected); + return request; + } + + protected override void AssertValidMessage(IMessage message) + { + ITextMessage textMessage = (ITextMessage) message; + String text = textMessage.Text; + Console.WriteLine("Received message with text: " + text); + Assert.AreEqual(expected, text, "the message text"); + } + } } diff --git a/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs b/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs new file mode 100644 index 0000000000..cb5815dbe9 --- /dev/null +++ b/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs @@ -0,0 +1,127 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.IO; + +using NUnit.Framework; + +using OpenWire.Client; +using OpenWire.Client.Core; + +namespace OpenWire.Client +{ + [ TestFixture ] + public class JMSPropertyTest : TestSupport + { + // standard JMS properties + string expectedText = "Hey this works!"; + string correlationID = "abc"; + ITemporaryQueue replyTo; + bool persistent = true; + byte priority = 5; + String type = "FooType"; + String groupID = "MyGroup"; + int groupSeq = 1; + + // custom properties + string customText = "Cheese"; + bool custom1 = true; + byte custom2 = 12; + short custom3 = 0x1234; + int custom4 = 0x12345678; + long custom5 = 0x1234567812345678; + char custom6 = 'J'; + + [ Test ] + public override void SendAndSyncReceive() + { + base.SendAndSyncReceive(); + } + + protected override IMessage CreateMessage(ISession session) + { + ITextMessage message = session.CreateTextMessage(expectedText); + replyTo = session.CreateTemporaryQueue(); + + // lets set the headers + message.JMSCorrelationID = correlationID; + message.JMSReplyTo = replyTo; + message.JMSPersistent = persistent; + message.JMSPriority = priority; + message.JMSType = type; + message.JMSXGroupID = groupID; + message.JMSXGroupSeq = groupSeq; + + // lets set the custom headers + message.Properties["customText"] = customText; + message.Properties["custom1"] = custom1; + message.Properties["custom2"] = custom2; + message.Properties["custom3"] = custom3; + message.Properties["custom4"] = custom4; + message.Properties["custom5"] = custom5; + message.Properties["custom6"] = custom6; + + return message; + } + + protected override void AssertValidMessage(IMessage message) + { + Assert.IsTrue(message is ITextMessage, "Did not receive a ITextMessage!"); + + Console.WriteLine("Received Message: " + message); + + ITextMessage textMessage = (ITextMessage) message; + String text = textMessage.Text; + Assert.AreEqual(expectedText, text, "the message text"); + + // compare standard JMS headers + Assert.AreEqual(correlationID, message.JMSCorrelationID, "JMSCorrelationID"); + Assert.AreEqual(replyTo, message.JMSReplyTo, "JMSReplyTo"); + Assert.AreEqual(persistent, message.JMSPersistent, "JMSPersistent"); + Assert.AreEqual(priority, message.JMSPriority, "JMSPriority"); + Assert.AreEqual(type, message.JMSType, "JMSType"); + Assert.AreEqual(groupID, message.JMSXGroupID, "JMSXGroupID"); + Assert.AreEqual(groupSeq, message.JMSXGroupSeq, "JMSXGroupSeq"); + + // compare custom headers + Assert.AreEqual(customText, message.Properties["customText"], "customText"); + Assert.AreEqual(custom1, message.Properties["custom1"], "custom1"); + Assert.AreEqual(custom2, message.Properties["custom2"], "custom2"); + Assert.AreEqual(custom3, message.Properties["custom3"], "custom3"); + Assert.AreEqual(custom4, message.Properties["custom4"], "custom4"); + // TODO + //Assert.AreEqual(custom5, message.Properties["custom5"], "custom5"); + Assert.AreEqual(custom4, message.Properties["custom6"], "custom6"); + + Assert.AreEqual(custom1, message.Properties.GetBool("custom1"), "custom1"); + Assert.AreEqual(custom2, message.Properties.GetByte("custom2"), "custom2"); + Assert.AreEqual(custom3, message.Properties.GetShort("custom3"), "custom3"); + Assert.AreEqual(custom4, message.Properties.GetInt("custom4"), "custom4"); + //Assert.AreEqual(custom5, message.Properties.GetLong("custom5"), "custom5"); + Assert.AreEqual(custom4, message.Properties.GetChar("custom6"), "custom6"); + + // lets now look at some standard JMS headers + Console.WriteLine("JMSExpiration: " + message.JMSExpiration); + Console.WriteLine("JMSMessageId: " + message.JMSMessageId); + Console.WriteLine("JMSRedelivered: " + message.JMSRedelivered); + Console.WriteLine("JMSTimestamp: " + message.JMSTimestamp); + Console.WriteLine("JMSXDeliveryCount: " + message.JMSXDeliveryCount); + Console.WriteLine("JMSXProducerTXID: " + message.JMSXProducerTXID); + } + } +} + diff --git a/openwire-dotnet/tests/OpenWire.Client/MapMessageTest.cs b/openwire-dotnet/tests/OpenWire.Client/MapMessageTest.cs new file mode 100644 index 0000000000..4e7d68cd26 --- /dev/null +++ b/openwire-dotnet/tests/OpenWire.Client/MapMessageTest.cs @@ -0,0 +1,65 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.IO; + +using NUnit.Framework; + +using OpenWire.Client; +using OpenWire.Client.Core; + +namespace OpenWire.Client +{ + [ TestFixture ] + public class MapMessageTest : TestSupport + { + bool a = true; + byte b = 123; + char c = 'c'; + short d = 0x1234; + int e = 0x12345678; + long f = 0x1234567812345678; + string g = "Hello World!"; + + [ Test ] + public override void SendAndSyncReceive() + { + base.SendAndSyncReceive(); + } + + protected override IMessage CreateMessage(ISession session) + { + IMapMessage request = session.CreateMapMessage(); + return request; + } + + protected override void AssertValidMessage(IMessage message) + { + Assert.IsTrue(message is IMapMessage, "Did not receive a MapMessage!"); + + Console.WriteLine("Received MapMessage: " + message); + + IMapMessage mapMessage = (IMapMessage) message; + + /* + String text = mapMessage.Text; + Assert.AreEqual(expected, text, "the message text"); + */ + } + + } +} diff --git a/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs b/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs index 5ecaa6e1eb..e5ddcd58af 100644 --- a/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs +++ b/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs @@ -17,13 +17,72 @@ using System; using System.IO; +using NUnit.Framework; + using OpenWire.Client; +using OpenWire.Client.Core; -namespace OpenWire.Client { - /// - /// useful base class for test cases - /// - public abstract class TestSupport { - } +namespace OpenWire.Client +{ + + /// + /// useful base class for test cases + /// + [ TestFixture ] + public abstract class TestSupport + { + + [ Test ] + public virtual void SendAndSyncReceive() + { + IConnectionFactory factory = new ConnectionFactory("localhost", 61616); + + Assert.IsTrue(factory != null, "no factory created"); + + using (IConnection connection = factory.CreateConnection()) + { + try + { + Assert.IsTrue(connection != null, "no connection created"); + Console.WriteLine("Connected to ActiveMQ!"); + + ISession session = connection.CreateSession(); + + IDestination destination = CreateDestination(session); + Assert.IsTrue(destination != null, "No queue available!"); + + IMessageConsumer consumer = session.CreateConsumer(destination); + + IMessageProducer producer = session.CreateProducer(destination); + + IMessage request = CreateMessage(session); + + producer.Send(request); + + IMessage message = consumer.Receive(); + Assert.IsNotNull(message, "No message returned!"); + + AssertValidMessage(message); + } + catch (Exception e) + { + Console.WriteLine("Caught: " + e); + } + } + } + + protected virtual IDestination CreateDestination(ISession session) + { + string name = "Test.DotNet." + GetType().Name; + IDestination destination = session.GetQueue(name); + + Console.WriteLine("Using queue: " + destination); + return destination; + } + + protected abstract IMessage CreateMessage(ISession session); + + protected abstract void AssertValidMessage(IMessage message); + } }