added the standard JMS headers to IMessage together with support for custom message headers and support for temporary destinations, IMapMessage and IBytesMessage

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@380652 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-02-24 12:02:46 +00:00
parent 172b2d11fa
commit f0300f1251
28 changed files with 2429 additions and 749 deletions

View File

@ -22,24 +22,14 @@ using OpenWire.Client.Core;
namespace OpenWire.Client.Commands namespace OpenWire.Client.Commands
{ {
public class ActiveMQBytesMessage : ActiveMQMessage public class ActiveMQBytesMessage : ActiveMQMessage, IBytesMessage
{ {
public const byte ID_ActiveMQBytesMessage = 24; public const byte ID_ActiveMQBytesMessage = 24;
public override byte GetDataStructureType()
{
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
public override byte GetDataStructureType() {
return ID_ActiveMQBytesMessage; return ID_ActiveMQBytesMessage;
} }
// Properties
} }
} }

View File

@ -18,12 +18,14 @@ using System;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
using OpenWire.Client.Core; using OpenWire.Client.Core;
namespace OpenWire.Client.Commands { namespace OpenWire.Client.Commands
{
/// <summary> /// <summary>
/// Summary description for ActiveMQDestination. /// Summary description for ActiveMQDestination.
/// </summary> /// </summary>
public abstract class ActiveMQDestination : AbstractCommand, IDestination { public abstract class ActiveMQDestination : AbstractCommand, IDestination
{
/** /**
* Topic Destination object * Topic Destination object
@ -89,7 +91,8 @@ namespace OpenWire.Client.Commands {
/** /**
* The Default Constructor * The Default Constructor
*/ */
protected ActiveMQDestination() { protected ActiveMQDestination()
{
} }
/** /**
@ -97,7 +100,8 @@ namespace OpenWire.Client.Commands {
* *
* @param name * @param name
*/ */
protected ActiveMQDestination(String name) { protected ActiveMQDestination(String name)
{
this.physicalName = name; this.physicalName = name;
this.advisory = name != null && name.StartsWith(ADVISORY_PREFIX); this.advisory = name != null && name.StartsWith(ADVISORY_PREFIX);
} }
@ -107,71 +111,82 @@ namespace OpenWire.Client.Commands {
/** /**
* @return Returns the advisory. * @return Returns the advisory.
*/ */
public bool IsAdvisory() { public bool IsAdvisory()
{
return advisory; return advisory;
} }
/** /**
* @param advisory The advisory to set. * @param advisory The advisory to set.
*/ */
public void SetAdvisory(bool advisory) { public void SetAdvisory(bool advisory)
{
this.advisory = advisory; this.advisory = advisory;
} }
/** /**
* @return true if this is a destination for Consumer advisories * @return true if this is a destination for Consumer advisories
*/ */
public bool IsConsumerAdvisory() { public bool IsConsumerAdvisory()
{
return IsAdvisory() && physicalName.StartsWith(CONSUMER_ADVISORY_PREFIX); return IsAdvisory() && physicalName.StartsWith(CONSUMER_ADVISORY_PREFIX);
} }
/** /**
* @return true if this is a destination for Producer advisories * @return true if this is a destination for Producer advisories
*/ */
public bool IsProducerAdvisory() { public bool IsProducerAdvisory()
{
return IsAdvisory() && physicalName.StartsWith(PRODUCER_ADVISORY_PREFIX); return IsAdvisory() && physicalName.StartsWith(PRODUCER_ADVISORY_PREFIX);
} }
/** /**
* @return true if this is a destination for Connection advisories * @return true if this is a destination for Connection advisories
*/ */
public bool IsConnectionAdvisory() { public bool IsConnectionAdvisory()
{
return IsAdvisory() && physicalName.StartsWith(CONNECTION_ADVISORY_PREFIX); return IsAdvisory() && physicalName.StartsWith(CONNECTION_ADVISORY_PREFIX);
} }
/** /**
* @return Returns the exclusive. * @return Returns the exclusive.
*/ */
public bool IsExclusive() { public bool IsExclusive()
{
return exclusive; return exclusive;
} }
/** /**
* @param exclusive The exclusive to set. * @param exclusive The exclusive to set.
*/ */
public void SetExclusive(bool exclusive) { public void SetExclusive(bool exclusive)
{
this.exclusive = exclusive; this.exclusive = exclusive;
} }
/** /**
* @return Returns the ordered. * @return Returns the ordered.
*/ */
public bool IsOrdered() { public bool IsOrdered()
{
return ordered; return ordered;
} }
/** /**
* @param ordered The ordered to set. * @param ordered The ordered to set.
*/ */
public void SetOrdered(bool ordered) { public void SetOrdered(bool ordered)
{
this.ordered = ordered; this.ordered = ordered;
} }
/** /**
* @return Returns the orderedTarget. * @return Returns the orderedTarget.
*/ */
public String GetOrderedTarget() { public String GetOrderedTarget()
{
return orderedTarget; return orderedTarget;
} }
/** /**
* @param orderedTarget The orderedTarget to set. * @param orderedTarget The orderedTarget to set.
*/ */
public void SetOrderedTarget(String orderedTarget) { public void SetOrderedTarget(String orderedTarget)
{
this.orderedTarget = orderedTarget; this.orderedTarget = orderedTarget;
} }
/** /**
@ -180,10 +195,14 @@ namespace OpenWire.Client.Commands {
* *
* @return a descriptive string for this queue or topic * @return a descriptive string for this queue or topic
*/ */
public static String Inspect(ActiveMQDestination destination) { public static String Inspect(ActiveMQDestination destination)
if (destination is ITopic) { {
if (destination is ITopic)
{
return "Topic(" + destination.ToString() + ")"; return "Topic(" + destination.ToString() + ")";
} else { }
else
{
return "Queue(" + destination.ToString() + ")"; return "Queue(" + destination.ToString() + ")";
} }
} }
@ -193,19 +212,31 @@ namespace OpenWire.Client.Commands {
* @return @throws JMSException * @return @throws JMSException
* @throws javax.jms.JMSException * @throws javax.jms.JMSException
*/ */
public static ActiveMQDestination Transform(IDestination destination) { public static ActiveMQDestination Transform(IDestination destination)
{
ActiveMQDestination result = null; ActiveMQDestination result = null;
if (destination != null) { if (destination != null)
if (destination is ActiveMQDestination) { {
if (destination is ActiveMQDestination)
{
result = (ActiveMQDestination) destination; result = (ActiveMQDestination) destination;
} else { }
if (destination is ITemporaryQueue) { else
{
if (destination is ITemporaryQueue)
{
result = new ActiveMQTempQueue(((IQueue) destination).QueueName); result = new ActiveMQTempQueue(((IQueue) destination).QueueName);
} else if (destination is ITemporaryTopic) { }
else if (destination is ITemporaryTopic)
{
result = new ActiveMQTempTopic(((ITopic) destination).TopicName); result = new ActiveMQTempTopic(((ITopic) destination).TopicName);
} else if (destination is IQueue) { }
else if (destination is IQueue)
{
result = new ActiveMQQueue(((IQueue) destination).QueueName); result = new ActiveMQQueue(((IQueue) destination).QueueName);
} else if (destination is ITopic) { }
else if (destination is ITopic)
{
result = new ActiveMQTopic(((ITopic) destination).TopicName); result = new ActiveMQTopic(((ITopic) destination).TopicName);
} }
} }
@ -219,15 +250,23 @@ namespace OpenWire.Client.Commands {
* @param pyhsicalName * @param pyhsicalName
* @return * @return
*/ */
public static ActiveMQDestination CreateDestination(int type, String pyhsicalName) { public static ActiveMQDestination CreateDestination(int type, String pyhsicalName)
{
ActiveMQDestination result = null; ActiveMQDestination result = null;
if (type == ACTIVEMQ_TOPIC) { if (type == ACTIVEMQ_TOPIC)
{
result = new ActiveMQTopic(pyhsicalName); result = new ActiveMQTopic(pyhsicalName);
} else if (type == ACTIVEMQ_TEMPORARY_TOPIC) { }
else if (type == ACTIVEMQ_TEMPORARY_TOPIC)
{
result = new ActiveMQTempTopic(pyhsicalName); result = new ActiveMQTempTopic(pyhsicalName);
} else if (type == ACTIVEMQ_QUEUE) { }
else if (type == ACTIVEMQ_QUEUE)
{
result = new ActiveMQQueue(pyhsicalName); result = new ActiveMQQueue(pyhsicalName);
} else { }
else
{
result = new ActiveMQTempQueue(pyhsicalName); result = new ActiveMQTempQueue(pyhsicalName);
} }
return result; return result;
@ -239,7 +278,8 @@ namespace OpenWire.Client.Commands {
* @param clientId * @param clientId
* @return * @return
*/ */
public static String CreateTemporaryName(String clientId) { public static String CreateTemporaryName(String clientId)
{
return TEMP_PREFIX + clientId + TEMP_POSTFIX; return TEMP_PREFIX + clientId + TEMP_POSTFIX;
} }
@ -249,15 +289,19 @@ namespace OpenWire.Client.Commands {
* @param destination * @param destination
* @return the clientId or null if not a temporary destination * @return the clientId or null if not a temporary destination
*/ */
public static String GetClientId(ActiveMQDestination destination) { public static String GetClientId(ActiveMQDestination destination)
{
String answer = null; String answer = null;
if (destination != null && destination.IsTemporary()) { if (destination != null && destination.IsTemporary())
{
String name = destination.PhysicalName; String name = destination.PhysicalName;
int start = name.IndexOf(TEMP_PREFIX); int start = name.IndexOf(TEMP_PREFIX);
if (start >= 0) { if (start >= 0)
{
start += TEMP_PREFIX.Length; start += TEMP_PREFIX.Length;
int stop = name.LastIndexOf(TEMP_POSTFIX); int stop = name.LastIndexOf(TEMP_POSTFIX);
if (stop > start && stop < name.Length) { if (stop > start && stop < name.Length)
{
answer = name.Substring(start, stop); answer = name.Substring(start, stop);
} }
} }
@ -270,8 +314,10 @@ namespace OpenWire.Client.Commands {
* @param o object to compare * @param o object to compare
* @return 1 if this is less than o else 0 if they are equal or -1 if this is less than o * @return 1 if this is less than o else 0 if they are equal or -1 if this is less than o
*/ */
public int CompareTo(Object o) { public int CompareTo(Object o)
if (o is ActiveMQDestination) { {
if (o is ActiveMQDestination)
{
return CompareTo((ActiveMQDestination) o); return CompareTo((ActiveMQDestination) o);
} }
return -1; return -1;
@ -283,23 +329,34 @@ namespace OpenWire.Client.Commands {
* @param that another destination to compare against * @param that another destination to compare against
* @return 1 if this is less than o else 0 if they are equal or -1 if this is less than o * @return 1 if this is less than o else 0 if they are equal or -1 if this is less than o
*/ */
public int CompareTo(ActiveMQDestination that) { public int CompareTo(ActiveMQDestination that)
{
int answer = 0; int answer = 0;
if (physicalName != that.physicalName) { if (physicalName != that.physicalName)
if (physicalName == null) { {
if (physicalName == null)
{
return -1; return -1;
} else if (that.physicalName == null) { }
else if (that.physicalName == null)
{
return 1; return 1;
} }
answer = physicalName.CompareTo(that.physicalName); answer = physicalName.CompareTo(that.physicalName);
} }
if (answer == 0) { if (answer == 0)
if (IsTopic()) { {
if (that.IsQueue()) { if (IsTopic())
{
if (that.IsQueue())
{
return 1; return 1;
} }
} else { }
if (that.IsTopic()) { else
{
if (that.IsTopic())
{
return -1; return -1;
} }
} }
@ -315,7 +372,8 @@ namespace OpenWire.Client.Commands {
public abstract int GetDestinationType(); public abstract int GetDestinationType();
public String PhysicalName { public String PhysicalName
{
get { return this.physicalName; } get { return this.physicalName; }
set { this.physicalName = value; } set { this.physicalName = value; }
} }
@ -326,7 +384,8 @@ namespace OpenWire.Client.Commands {
* @return true/false * @return true/false
*/ */
public bool IsTemporary() { public bool IsTemporary()
{
return GetDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC return GetDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC
|| GetDestinationType() == ACTIVEMQ_TEMPORARY_QUEUE; || GetDestinationType() == ACTIVEMQ_TEMPORARY_QUEUE;
} }
@ -337,7 +396,8 @@ namespace OpenWire.Client.Commands {
* @return true/false * @return true/false
*/ */
public bool IsTopic() { public bool IsTopic()
{
return GetDestinationType() == ACTIVEMQ_TOPIC return GetDestinationType() == ACTIVEMQ_TOPIC
|| GetDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC; || GetDestinationType() == ACTIVEMQ_TEMPORARY_TOPIC;
} }
@ -347,7 +407,8 @@ namespace OpenWire.Client.Commands {
* *
* @return true/false * @return true/false
*/ */
public bool IsQueue() { public bool IsQueue()
{
return !IsTopic(); return !IsTopic();
} }
@ -361,7 +422,8 @@ namespace OpenWire.Client.Commands {
* *
* @return true if this destination represents a collection of child destinations. * @return true if this destination represents a collection of child destinations.
*/ */
public bool IsComposite() { public bool IsComposite()
{
return physicalName.IndexOf(COMPOSITE_SEPARATOR) > 0; return physicalName.IndexOf(COMPOSITE_SEPARATOR) > 0;
} }
@ -400,7 +462,8 @@ namespace OpenWire.Client.Commands {
* @return string representation of this instance * @return string representation of this instance
*/ */
public override String ToString() { public override String ToString()
{
return this.physicalName; return this.physicalName;
} }
@ -408,13 +471,16 @@ namespace OpenWire.Client.Commands {
* @return hashCode for this instance * @return hashCode for this instance
*/ */
public override int GetHashCode() { public override int GetHashCode()
{
int answer = 37; int answer = 37;
if (this.physicalName != null) { if (this.physicalName != null)
{
answer = physicalName.GetHashCode(); answer = physicalName.GetHashCode();
} }
if (IsTopic()) { if (IsTopic())
{
answer ^= 0xfabfab; answer ^= 0xfabfab;
} }
return answer; return answer;
@ -427,9 +493,11 @@ namespace OpenWire.Client.Commands {
* @return true if this instance and obj are equivalent * @return true if this instance and obj are equivalent
*/ */
public override bool Equals(Object obj) { public override bool Equals(Object obj)
{
bool result = this == obj; bool result = this == obj;
if (!result && obj != null && obj is ActiveMQDestination) { if (!result && obj != null && obj is ActiveMQDestination)
{
ActiveMQDestination other = (ActiveMQDestination) obj; ActiveMQDestination other = (ActiveMQDestination) obj;
result = this.GetDestinationType() == other.GetDestinationType() result = this.GetDestinationType() == other.GetDestinationType()
&& this.physicalName.Equals(other.physicalName); && this.physicalName.Equals(other.physicalName);
@ -441,8 +509,10 @@ namespace OpenWire.Client.Commands {
/** /**
* @return true if the destination matches multiple possible destinations * @return true if the destination matches multiple possible destinations
*/ */
public bool IsWildcard() { public bool IsWildcard()
if (physicalName != null) { {
if (physicalName != null)
{
return physicalName.IndexOf(DestinationFilter.ANY_CHILD) >= 0 return physicalName.IndexOf(DestinationFilter.ANY_CHILD) >= 0
|| physicalName.IndexOf(DestinationFilter.ANY_DESCENDENT) >= 0; || physicalName.IndexOf(DestinationFilter.ANY_DESCENDENT) >= 0;
} }

View File

@ -22,24 +22,40 @@ using OpenWire.Client.Core;
namespace OpenWire.Client.Commands namespace OpenWire.Client.Commands
{ {
public class ActiveMQMapMessage : ActiveMQMessage public class ActiveMQMapMessage : ActiveMQMessage, IMapMessage
{ {
public const byte ID_ActiveMQMapMessage = 25; public const byte ID_ActiveMQMapMessage = 25;
private PrimitiveMap body;
public override byte GetDataStructureType()
// TODO generate Equals method {
// TODO generate GetHashCode method
// TODO generate ToString method
public override byte GetDataStructureType() {
return ID_ActiveMQMapMessage; return ID_ActiveMQMapMessage;
} }
public IPrimitiveMap Body
{
get {
if (body == null)
{
body = PrimitiveMap.Unmarshal(Content);
}
return body;
}
}
// Properties public void BeforeMarshall(OpenWireFormat wireFormat)
{
base.BeforeMarshall(wireFormat);
if (body == null) {
Content = null;
}
else {
Content = body.Marshal();
}
}
} }
} }

View File

@ -20,49 +20,281 @@ using System.Collections;
using OpenWire.Client; using OpenWire.Client;
using OpenWire.Client.Core; using OpenWire.Client.Core;
namespace OpenWire.Client.Commands { namespace OpenWire.Client.Commands
public class ActiveMQMessage : Message, IMessage, MarshallAware { {
public class ActiveMQMessage : Message, IMessage, MarshallAware
{
public const byte ID_ActiveMQMessage = 23; public const byte ID_ActiveMQMessage = 23;
public static ActiveMQMessage Transform(IMessage message) { protected static MessagePropertyHelper propertyHelper = new MessagePropertyHelper();
private PrimitiveMap properties;
public static ActiveMQMessage Transform(IMessage message)
{
return (ActiveMQMessage) message; return (ActiveMQMessage) message;
} }
// TODO generate Equals method // TODO generate Equals method
// TODO generate GetHashCode method // TODO generate GetHashCode method
// TODO generate ToString method
public override byte GetDataStructureType() {
public override byte GetDataStructureType()
{
return ID_ActiveMQMessage; return ID_ActiveMQMessage;
} }
public override bool IsMarshallAware() { // Properties
return true;
public IPrimitiveMap Properties
{
get {
if (properties == null)
{
properties = PrimitiveMap.Unmarshal(MarshalledProperties);
}
return properties;
}
} }
// Properties public IDestination FromDestination
public IDestination FromDestination { {
get { return Destination; } get { return Destination; }
set { this.Destination = ActiveMQDestination.Transform(value); } set { this.Destination = ActiveMQDestination.Transform(value); }
} }
public void BeforeMarshall(OpenWireFormat wireFormat) {
// IMessage interface
// JMS headers
/// <summary>
/// The correlation ID used to correlate messages with conversations or long running business processes
/// </summary>
public string JMSCorrelationID
{
get {
return CorrelationId;
}
set {
CorrelationId = value;
}
} }
public void AfterMarshall(OpenWireFormat wireFormat) { /// <summary>
/// The destination of the message
/// </summary>
public IDestination JMSDestination
{
get {
return OriginalDestination;
}
} }
public void BeforeUnmarshall(OpenWireFormat wireFormat) { /// <summary>
/// The time in milliseconds that this message should expire in
/// </summary>
public long JMSExpiration
{
get {
return Expiration;
}
set {
Expiration = value;
} }
public void AfterUnmarshall(OpenWireFormat wireFormat) {
} }
public void SetMarshalledForm(OpenWireFormat wireFormat, byte[] data) { /// <summary>
/// The message ID which is set by the provider
/// </summary>
public string JMSMessageId
{
get {
return DataStreamMarshaller.ToString(MessageId);
}
} }
public byte[] GetMarshalledForm(OpenWireFormat wireFormat) { /// <summary>
/// Whether or not this message is persistent
/// </summary>
public bool JMSPersistent
{
get {
return Persistent;
}
set {
Persistent = value;
}
}
/// <summary>
/// The Priority on this message
/// </summary>
public byte JMSPriority
{
get {
return Priority;
}
set {
Priority = value;
}
}
/// <summary>
/// Returns true if this message has been redelivered to this or another consumer before being acknowledged successfully.
/// </summary>
public bool JMSRedelivered
{
get {
return RedeliveryCounter > 0;
}
}
/// <summary>
/// The destination that the consumer of this message should send replies to
/// </summary>
public IDestination JMSReplyTo
{
get {
return ReplyTo;
}
set {
ReplyTo = ActiveMQDestination.Transform(value);
}
}
/// <summary>
/// The timestamp the broker added to the message
/// </summary>
public long JMSTimestamp
{
get {
return Timestamp;
}
}
/// <summary>
/// The type name of this message
/// </summary>
public string JMSType
{
get {
return Type;
}
set {
Type = value;
}
}
// JMS Extension headers
/// <summary>
/// Returns the number of times this message has been redelivered to other consumers without being acknowledged successfully.
/// </summary>
public int JMSXDeliveryCount
{
get {
return RedeliveryCounter + 1;
}
}
/// <summary>
/// The Message Group ID used to group messages together to the same consumer for the same group ID value
/// </summary>
public string JMSXGroupID
{
get {
return GroupID;
}
set {
GroupID = value;
}
}
/// <summary>
/// The Message Group Sequence counter to indicate the position in a group
/// </summary>
public int JMSXGroupSeq
{
get {
return GroupSequence;
}
set {
GroupSequence = value;
}
}
/// <summary>
/// Returns the ID of the producers transaction
/// </summary>
public string JMSXProducerTXID
{
get {
TransactionId txnId = OriginalTransactionId;
if (txnId == null)
{
txnId = TransactionId;
}
if (txnId != null)
{
return DataStreamMarshaller.ToString(txnId);
}
return null; return null;
} }
} }
public object GetObjectProperty(string name)
{
return propertyHelper.GetObjectProperty(this, name);
}
public void SetObjectProperty(string name, object value)
{
propertyHelper.SetObjectProperty(this, name, value);
}
// MarshallAware interface
public override bool IsMarshallAware()
{
return true;
}
public void BeforeMarshall(OpenWireFormat wireFormat)
{
MarshalledProperties = null;
if (properties != null)
{
MarshalledProperties = properties.Marshal();
}
}
public void AfterMarshall(OpenWireFormat wireFormat)
{
}
public void BeforeUnmarshall(OpenWireFormat wireFormat)
{
}
public void AfterUnmarshall(OpenWireFormat wireFormat)
{
}
public void SetMarshalledForm(OpenWireFormat wireFormat, byte[] data)
{
}
public byte[] GetMarshalledForm(OpenWireFormat wireFormat)
{
return null;
}
}
} }

View File

@ -35,19 +35,17 @@ namespace OpenWire.Client.Commands
{ {
public const byte ID_ActiveMQTempDestination = 0; public const byte ID_ActiveMQTempDestination = 0;
public ActiveMQTempDestination() : base()
{
}
public ActiveMQTempDestination(String name) : base(name)
{
}
// TODO generate Equals method public override byte GetDataStructureType()
// TODO generate GetHashCode method {
// TODO generate ToString method
public override byte GetDataStructureType() {
return ID_ActiveMQTempDestination; return ID_ActiveMQTempDestination;
} }
// Properties
} }
} }

View File

@ -22,15 +22,16 @@ using OpenWire.Client.Core;
namespace OpenWire.Client.Commands namespace OpenWire.Client.Commands
{ {
/// <summary> /// <summary>
/// Summary description for ActiveMQTempQueue. /// A Temporary Queue
/// </summary> /// </summary>
public class ActiveMQTempQueue : ActiveMQDestination, ITemporaryQueue public class ActiveMQTempQueue : ActiveMQTempDestination, ITemporaryQueue
{ {
public const byte ID_ActiveMQTempQueue = 102; public const byte ID_ActiveMQTempQueue = 102;
public ActiveMQTempQueue() : base() public ActiveMQTempQueue() : base()
{ {
} }
public ActiveMQTempQueue(String name) : base(name) public ActiveMQTempQueue(String name) : base(name)
{ {
} }

View File

@ -22,15 +22,16 @@ using OpenWire.Client.Core;
namespace OpenWire.Client.Commands namespace OpenWire.Client.Commands
{ {
/// <summary> /// <summary>
/// Summary description for ActiveMQTempTopic. /// A Temporary Topic
/// </summary> /// </summary>
public class ActiveMQTempTopic : ActiveMQDestination, ITemporaryTopic public class ActiveMQTempTopic : ActiveMQTempDestination, ITemporaryTopic
{ {
public const byte ID_ActiveMQTempTopic = 103; public const byte ID_ActiveMQTempTopic = 103;
public ActiveMQTempTopic() : base() public ActiveMQTempTopic() : base()
{ {
} }
public ActiveMQTempTopic(String name) : base(name) public ActiveMQTempTopic(String name) : base(name)
{ {
} }

View File

@ -36,27 +36,32 @@ namespace OpenWire.Client.Commands
public const byte ID_TransactionId = 0; public const byte ID_TransactionId = 0;
public override int GetHashCode() { public override int GetHashCode()
{
int answer = 0; int answer = 0;
return answer; return answer;
} }
public override bool Equals(object that) { public override bool Equals(object that)
if (that is TransactionId) { {
if (that is TransactionId)
{
return Equals((TransactionId) that); return Equals((TransactionId) that);
} }
return false; return false;
} }
public virtual bool Equals(TransactionId that) { public virtual bool Equals(TransactionId that)
{
return true; return true;
} }
public override string ToString() { public override string ToString()
{
return GetType().Name + "[" return GetType().Name + "["
+ " ]"; + " ]";
@ -64,12 +69,9 @@ namespace OpenWire.Client.Commands
public override byte GetDataStructureType() { public override byte GetDataStructureType()
{
return ID_TransactionId; return ID_TransactionId;
} }
// Properties
} }
} }

View File

@ -1,19 +1,3 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System; using System;
using System.Collections; using System.Collections;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
@ -39,6 +23,7 @@ namespace OpenWire.Client
private bool closed; private bool closed;
private AcknowledgementMode acknowledgementMode; private AcknowledgementMode acknowledgementMode;
private long sessionCounter; private long sessionCounter;
private long temporaryDestinationCounter;
private IDictionary consumers = new Hashtable(); // TODO threadsafe private IDictionary consumers = new Hashtable(); // TODO threadsafe
@ -57,6 +42,7 @@ namespace OpenWire.Client
{ {
} }
/// <summary> /// <summary>
/// Stop message delivery for this connection. /// Stop message delivery for this connection.
/// </summary> /// </summary>
@ -159,6 +145,18 @@ namespace OpenWire.Client
return answer; return answer;
} }
/// <summary>
/// Creates a new temporary destination name
/// </summary>
public String CreateTemporaryDestinationName()
{
lock (this)
{
return info.ConnectionId.Value + ":" + (++temporaryDestinationCounter);
}
}
protected void CheckConnected() protected void CheckConnected()
{ {
if (closed) if (closed)

View File

@ -17,10 +17,12 @@
using System; using System;
using System.IO; using System.IO;
using System.Net; using System.Net;
using System.Text;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
using OpenWire.Client.Core; using OpenWire.Client.Core;
using OpenWire.Client.IO; using OpenWire.Client.IO;
using System.Collections;
namespace OpenWire.Client.Core namespace OpenWire.Client.Core
{ {
@ -30,6 +32,36 @@ namespace OpenWire.Client.Core
/// </summary> /// </summary>
public abstract class DataStreamMarshaller public abstract class DataStreamMarshaller
{ {
public const byte NULL = 0;
public const byte BOOLEAN_TYPE = 1;
public const byte BYTE_TYPE = 2;
public const byte CHAR_TYPE = 3;
public const byte SHORT_TYPE = 4;
public const byte INTEGER_TYPE = 5;
public const byte LONG_TYPE = 6;
public const byte DOUBLE_TYPE = 7;
public const byte FLOAT_TYPE = 8;
public const byte STRING_TYPE = 9;
public const byte BYTE_ARRAY_TYPE = 10;
private static String[] HEX_TABLE = new String[]{
"00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "0a", "0b", "0c", "0d", "0e", "0f",
"10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "1a", "1b", "1c", "1d", "1e", "1f",
"20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "2a", "2b", "2c", "2d", "2e", "2f",
"30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "3a", "3b", "3c", "3d", "3e", "3f",
"40", "41", "42", "43", "44", "45", "46", "47", "48", "49", "4a", "4b", "4c", "4d", "4e", "4f",
"50", "51", "52", "53", "54", "55", "56", "57", "58", "59", "5a", "5b", "5c", "5d", "5e", "5f",
"60", "61", "62", "63", "64", "65", "66", "67", "68", "69", "6a", "6b", "6c", "6d", "6e", "6f",
"70", "71", "72", "73", "74", "75", "76", "77", "78", "79", "7a", "7b", "7c", "7d", "7e", "7f",
"80", "81", "82", "83", "84", "85", "86", "87", "88", "89", "8a", "8b", "8c", "8d", "8e", "8f",
"90", "91", "92", "93", "94", "95", "96", "97", "98", "99", "9a", "9b", "9c", "9d", "9e", "9f",
"a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9", "aa", "ab", "ac", "ad", "ae", "af",
"b0", "b1", "b2", "b3", "b4", "b5", "b6", "b7", "b8", "b9", "ba", "bb", "bc", "bd", "be", "bf",
"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "ca", "cb", "cc", "cd", "ce", "cf",
"d0", "d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9", "da", "db", "dc", "dd", "de", "df",
"e0", "e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "ea", "eb", "ec", "ed", "ee", "ef",
"f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "fa", "fb", "fc", "fd", "fe", "ff",
};
public abstract DataStructure CreateObject(); public abstract DataStructure CreateObject();
public abstract byte GetDataStructureType(); public abstract byte GetDataStructureType();
@ -155,6 +187,21 @@ namespace OpenWire.Client.Core
if (bs.ReadBoolean()) if (bs.ReadBoolean())
{ {
if (bs.ReadBoolean()) if (bs.ReadBoolean())
{
return ReadAsciiString(dataIn);
}
else
{
return ReadUTF8(dataIn);
}
}
else
{
return null;
}
}
protected virtual String ReadAsciiString(BinaryReader dataIn)
{ {
int size = ReadShort(dataIn); int size = ReadShort(dataIn);
byte[] data = new byte[size]; byte[] data = new byte[size];
@ -166,16 +213,6 @@ namespace OpenWire.Client.Core
} }
return new String(text); return new String(text);
} }
else
{
return dataIn.ReadString();
}
}
else
{
return null;
}
}
protected virtual int WriteString(String value, BooleanStream bs) protected virtual int WriteString(String value, BooleanStream bs)
{ {
@ -236,7 +273,8 @@ namespace OpenWire.Client.Core
WriteShort((short) value.Length, dataOut); WriteShort((short) value.Length, dataOut);
// now lets write the bytes // now lets write the bytes
char[] chars = value.ToCharArray(); char[] chars = value.ToCharArray();
for (int i = 0; i < chars.Length; i++) { for (int i = 0; i < chars.Length; i++)
{
WriteByte((byte) chars[i], dataOut); WriteByte((byte) chars[i], dataOut);
} }
} }
@ -280,7 +318,7 @@ namespace OpenWire.Client.Core
public static void WriteChar(char value, BinaryWriter dataOut) public static void WriteChar(char value, BinaryWriter dataOut)
{ {
dataOut.Write(SwitchEndian(value)); dataOut.Write(SwitchEndian((short) value));
} }
public static void WriteShort(short value, BinaryWriter dataOut) public static void WriteShort(short value, BinaryWriter dataOut)
@ -313,7 +351,8 @@ namespace OpenWire.Client.Core
public static long SwitchEndian(long x) public static long SwitchEndian(long x)
{ {
long answer = 0; long answer = 0;
for (int i = 0; i < 8; i++) { for (int i = 0; i < 8; i++)
{
long lowest = x & 0xff; long lowest = x & 0xff;
x >>= 8; x >>= 8;
answer <<= 8; answer <<= 8;
@ -528,5 +567,418 @@ namespace OpenWire.Client.Core
WriteString(o.Message, dataOut, bs); WriteString(o.Message, dataOut, bs);
} }
} }
/// <summary>
/// Marshals the primitive type map to a byte array
/// </summary>
public static byte[] MarshalPrimitiveMap(IDictionary map)
{
if (map == null)
{
return null;
}
else
{
MemoryStream memoryStream = new MemoryStream();
MarshalPrimitiveMap(map, new BinaryWriter(memoryStream));
return memoryStream.GetBuffer();
} }
} }
public static void MarshalPrimitiveMap(IDictionary map, BinaryWriter dataOut)
{
if (map == null)
{
WriteInt(-1, dataOut);
}
else
{
WriteInt(map.Count, dataOut);
foreach (DictionaryEntry entry in map)
{
String name = (String) entry.Key;
WriteUTF8(name, dataOut);
Object value = entry.Value;
MarshalPrimitive(dataOut, value);
}
}}
/// <summary>
/// Unmarshals the primitive type map from the given byte array
/// </summary>
public static IDictionary UnmarshalPrimitiveMap(byte[] data)
{
if (data == null)
{
return new Hashtable();
}
else
{
return UnmarshalPrimitiveMap(new BinaryReader(new MemoryStream(data)));
}
}
public static IDictionary UnmarshalPrimitiveMap(BinaryReader dataIn)
{
int size = ReadInt(dataIn);
if (size < 0)
{
return null;
}
else
{
IDictionary answer = new Hashtable(size);
for (int i=0; i < size; i++)
{
String name = ReadUTF8(dataIn);
answer[name] = UnmarshalPrimitive(dataIn);
}
return answer;
}
}
public static void MarshalPrimitive(BinaryWriter dataOut, Object value)
{
if (value == null)
{
WriteByte(NULL, dataOut);
}
else if (value is bool)
{
WriteByte(BOOLEAN_TYPE, dataOut);
WriteBoolean((bool) value, dataOut);
}
else if (value is byte)
{
WriteByte(BYTE_TYPE, dataOut);
WriteByte(((Byte)value), dataOut);
}
else if (value is char)
{
WriteByte(CHAR_TYPE, dataOut);
WriteChar((char) value, dataOut);
}
else if (value is short)
{
WriteByte(SHORT_TYPE, dataOut);
WriteShort((short) value, dataOut);
}
else if (value is int)
{
WriteByte(INTEGER_TYPE, dataOut);
WriteInt((int) value, dataOut);
}
else if (value is long)
{
WriteByte(LONG_TYPE, dataOut);
WriteLong((long) value, dataOut);
}
else if (value is float)
{
WriteByte(FLOAT_TYPE, dataOut);
WriteFloat((float) value, dataOut);
}
else if (value is double)
{
WriteByte(DOUBLE_TYPE, dataOut);
WriteDouble((double) value, dataOut);
}
else if (value is byte[])
{
byte[] data = (byte[]) value;
WriteByte(BYTE_ARRAY_TYPE, dataOut);
WriteInt(data.Length, dataOut);
dataOut.Write(data);
}
else if (value is string)
{
WriteByte(STRING_TYPE, dataOut);
WriteUTF8((string) value, dataOut);
}
else
{
throw new IOException("Object is not a primitive: " + value);
}
}
public static Object UnmarshalPrimitive(BinaryReader dataIn)
{
Object value=null;
switch (ReadByte(dataIn))
{
case BYTE_TYPE:
value = ReadByte(dataIn);
break;
case BOOLEAN_TYPE:
value = ReadBoolean(dataIn);
break;
case CHAR_TYPE:
value = ReadChar(dataIn);
break;
case SHORT_TYPE:
value = ReadShort(dataIn);
break;
case INTEGER_TYPE:
value = ReadInt(dataIn);
break;
case LONG_TYPE:
value = ReadLong(dataIn);
break;
case FLOAT_TYPE:
value = ReadFloat(dataIn);
break;
case DOUBLE_TYPE:
value = ReadDouble(dataIn);
break;
case BYTE_ARRAY_TYPE:
int size = ReadInt(dataIn);
byte[] data = new byte[size];
dataIn.Read(data, 0, size);
value = data;
break;
case STRING_TYPE:
value = ReadUTF8(dataIn);
break;
}
return value;
}
private static Object ReadDouble(BinaryReader dataIn)
{
// TODO: Implement this method
return dataIn.ReadDouble();
}
/// <summary>
/// Method ReadFloat
/// </summary>
/// <param name="dataIn">A BinaryReader</param>
/// <returns>An Object</retutns>
private static Object ReadFloat(BinaryReader dataIn)
{
// TODO: Implement this method
return (float) dataIn.ReadDouble();
}
private static Object ReadBoolean(BinaryReader dataIn)
{
// TODO: Implement this method
return dataIn.ReadBoolean();
}
private static void WriteDouble(double value, BinaryWriter dataOut)
{
// TODO: Implement this method
dataOut.Write(value);
}
private static void WriteFloat(float value, BinaryWriter dataOut)
{
// TODO: Implement this method
dataOut.Write(value);
}
private static void WriteBoolean(bool value, BinaryWriter dataOut)
{
// TODO: Implement this method
dataOut.Write(value);
}
public static void WriteUTF8(String text, BinaryWriter dataOut)
{
if (text != null)
{
int strlen = text.Length;
int utflen = 0;
int c, count = 0;
char[] charr = text.ToCharArray();
for (int i = 0; i < strlen; i++)
{
c = charr[i];
if ((c >= 0x0001) && (c <= 0x007F))
{
utflen++;
}
else if (c > 0x07FF)
{
utflen += 3;
}
else
{
utflen += 2;
}
}
WriteInt(utflen, dataOut);
byte[] bytearr = new byte[utflen];
/*
byte[] bytearr = new byte[utflen + 4];
bytearr[count++] = (byte) ((utflen >>> 24) & 0xFF);
bytearr[count++] = (byte) ((utflen >>> 16) & 0xFF);
bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
*/
for (int i = 0; i < strlen; i++)
{
c = charr[i];
if ((c >= 0x0001) && (c <= 0x007F))
{
bytearr[count++] = (byte) c;
}
else if (c > 0x07FF)
{
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
}
else
{
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
}
}
dataOut.Write(bytearr);
}
else
{
WriteInt(-1, dataOut);
}
}
public static String ReadUTF8(BinaryReader dataIn)
{
int utflen = ReadInt(dataIn);
if (utflen > -1)
{
StringBuilder str = new StringBuilder(utflen);
byte[] bytearr = new byte[utflen];
int c, char2, char3;
int count = 0;
dataIn.Read(bytearr, 0, utflen);
while (count < utflen)
{
c = bytearr[count] & 0xff;
switch (c >> 4)
{
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
case 7:
/* 0xxxxxxx */
count++;
str.Append((char) c);
break;
case 12:
case 13:
/* 110x xxxx 10xx xxxx */
count += 2;
if (count > utflen)
{
throw CreateDataFormatException();
}
char2 = bytearr[count - 1];
if ((char2 & 0xC0) != 0x80)
{
throw CreateDataFormatException();
}
str.Append((char) (((c & 0x1F) << 6) | (char2 & 0x3F)));
break;
case 14:
/* 1110 xxxx 10xx xxxx 10xx xxxx */
count += 3;
if (count > utflen)
{
throw CreateDataFormatException();
}
char2 = bytearr[count - 2];
char3 = bytearr[count - 1];
if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
{
throw CreateDataFormatException();
}
str.Append((char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
break;
default :
/* 10xx xxxx, 1111 xxxx */
throw CreateDataFormatException();
}
}
// The number of chars produced may be less than utflen
return str.ToString();
}
else
{
return null;
}
}
private static Exception CreateDataFormatException()
{
// TODO: implement a better exception
return new Exception("Data format error!");
}
/// <summary>
/// Converts the object to a String
/// </summary>
public static string ToString(MessageId id)
{
return ToString(id.ProducerId) + ":" + id.ProducerSequenceId;
}
/// <summary>
/// Converts the object to a String
/// </summary>
public static string ToString(ProducerId id)
{
return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
}
/// <summary>
/// Converts the given transaction ID into a String
/// </summary>
public static String ToString(TransactionId txnId)
{
if (txnId is LocalTransactionId)
{
LocalTransactionId ltxnId = (LocalTransactionId) txnId;
return "" + ltxnId.Value;
}
else if (txnId is XATransactionId)
{
XATransactionId xaTxnId = (XATransactionId) txnId;
return "XID:" + xaTxnId.FormatId + ":" + ToHexFromBytes(xaTxnId.GlobalTransactionId) + ":" + ToHexFromBytes(xaTxnId.BranchQualifier);
}
return null;
}
/// <summary>
/// Creates the byte array into hexidecimal
/// </summary>
public static String ToHexFromBytes(byte[] data)
{
StringBuilder buffer = new StringBuilder(data.Length * 2);
for (int i = 0; i < data.Length; i++)
{
buffer.Append(HEX_TABLE[0xFF & data[i]]);
}
return buffer.ToString();
}
}
}

View File

@ -18,7 +18,7 @@ using System.Collections;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
using System; using System;
namespace OpenWire.Client namespace OpenWire.Client.Core
{ {
/// <summary> /// <summary>
/// Handles the multi-threaded dispatching between the transport and the consumers /// Handles the multi-threaded dispatching between the transport and the consumers
@ -42,8 +42,10 @@ namespace OpenWire.Client
/// <returns>An IMessage</retutns> /// <returns>An IMessage</retutns>
public IMessage DequeueNoWait() public IMessage DequeueNoWait()
{ {
lock (queue) { lock (queue)
if (queue.Peek() != null) { {
if (queue.Peek() != null)
{
return (IMessage) queue.Dequeue(); return (IMessage) queue.Dequeue();
} }
} }

View File

@ -0,0 +1,57 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using OpenWire.Client;
using OpenWire.Client.Core;
using OpenWire.Client.Commands;
namespace OpenWire.Client.Core
{
public delegate object PropertyGetter(ActiveMQMessage message);
public delegate void PropertySetter(ActiveMQMessage message, object value);
public class MessagePropertyHelper
{
private IDictionary setters = new Hashtable();
private IDictionary getters = new Hashtable();
public MessagePropertyHelper()
{
// TODO find all of the JMS properties via introspection
}
public object GetObjectProperty(ActiveMQMessage message, string name) {
object getter = getters[name];
if (getter != null) {
}
return message.Properties[name];
}
public void SetObjectProperty(ActiveMQMessage message, string name, object value) {
PropertySetter setter = (PropertySetter) setters[name];
if (setter != null) {
setter(message, value);
}
else {
message.Properties[name] = value;
}
}
}
}

View File

@ -0,0 +1,241 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core
{
/// <summary>
/// A default implementation of IPrimitiveMap
/// </summary>
public class PrimitiveMap : IPrimitiveMap
{
private IDictionary dictionary = new Hashtable();
/// <summary>
/// Unmarshalls the map from the given data or if the data is null just
/// return an empty map
/// </summary>
public static PrimitiveMap Unmarshal(byte[] data)
{
PrimitiveMap answer = new PrimitiveMap();
answer.dictionary = DataStreamMarshaller.UnmarshalPrimitiveMap(data);
return answer;
}
public byte[] Marshal()
{
return DataStreamMarshaller.MarshalPrimitiveMap(dictionary);
}
public void Clear()
{
dictionary.Clear();
}
public bool Contains(Object key)
{
return dictionary.Contains(key);
}
public void Remove(Object key)
{
dictionary.Remove(key);
}
public int Count
{
get {
return dictionary.Count;
}
}
public ICollection Keys
{
get {
return dictionary.Keys;
}
}
public ICollection Values
{
get {
return dictionary.Values;
}
}
public object this[string key]
{
get {
return GetValue(key);
}
set {
CheckValidType(value);
SetValue(key, value);
}
}
public string GetString(string key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(string));
return (string) value;
}
public void SetString(string key, string value)
{
SetValue(key, value);
}
public bool GetBool(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(bool));
return (bool) value;
}
public void SetByte(String key, bool value)
{
SetValue(key, value);
}
public byte GetByte(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(byte));
return (byte) value;
}
public void SetByte(String key, byte value)
{
SetValue(key, value);
}
public char GetChar(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(char));
return (char) value;
}
public void SetChar(String key, char value)
{
SetValue(key, value);
}
public short GetShort(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(short));
return (short) value;
}
public void SetShort(String key, short value)
{
SetValue(key, value);
}
public int GetInt(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(int));
return (int) value;
}
public void SetInt(String key, int value)
{
SetValue(key, value);
}
public long GetLong(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(long));
return (long) value;
}
public void SetLong(String key, long value)
{
SetValue(key, value);
}
public float GetFloat(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(float));
return (float) value;
}
public void SetFloat(String key, float value)
{
SetValue(key, value);
}
public double GetDouble(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(double));
return (double) value;
}
public void SetDouble(String key, double value)
{
SetValue(key, value);
}
protected virtual void SetValue(String key, Object value)
{
dictionary[key] = value;
}
protected virtual Object GetValue(String key)
{
return dictionary[key];
}
protected virtual void CheckValueType(Object value, Type type)
{
if (! type.IsInstanceOfType(value))
{
throw new OpenWireException("Expected type: " + type.Name + " but was: " + value);
}
}
protected virtual void CheckValidType(Object value)
{
if (value != null)
{
Type type = value.GetType();
if (! type.IsPrimitive && !type.IsValueType && !type.IsAssignableFrom(typeof(string)))
{
throw new OpenWireException("Invalid type: " + type.Name + " for value: " + value);
}
}
}
}
}

View File

@ -14,10 +14,21 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client namespace OpenWire.Client
{ {
public interface IStopable /// <summary>
/// Represents a binary based message
/// </summary>
public interface IBytesMessage : IMessage
{ {
void Stop();
byte[] Content
{
get;
set;
}
} }
} }

View File

@ -27,7 +27,7 @@ namespace OpenWire.Client {
/// <summary> /// <summary>
/// Represents a connection with a message broker /// Represents a connection with a message broker
/// </summary> /// </summary>
public interface IConnection : IDisposable, IStartable, IStopable { public interface IConnection : IDisposable, IStartable {
/// <summary> /// <summary>
/// Creates a new session to work on this connection /// Creates a new session to work on this connection

View File

@ -0,0 +1,33 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client
{
/// <summary>
/// Represents a Map message which contains key and value pairs which are
/// of primitive types
/// </summary>
public interface IMapMessage : IMessage
{
IPrimitiveMap Body {
get;
}
}
}

View File

@ -17,16 +17,145 @@
using System; using System;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
namespace OpenWire.Client { namespace OpenWire.Client
{
/// <summary> /// <summary>
/// Represents a message either to be sent to a message broker or received from a message broker /// Represents a message either to be sent to a message broker or received from a message broker
/// </summary> /// </summary>
public interface IMessage { public interface IMessage
{
IDestination FromDestination { /// <summary>
/// Provides access to the message properties (headers)
/// </summary>
IPrimitiveMap Properties {
get;
}
/// <summary>
/// The correlation ID used to correlate messages from conversations or long running business processes
/// </summary>
string JMSCorrelationID
{
get;
set;
}
/// <summary>
/// The destination of the message
/// </summary>
IDestination JMSDestination
{
get;
}
/// <summary>
/// The time in milliseconds that this message should expire in
/// </summary>
long JMSExpiration
{
get;
set;
}
/// <summary>
/// The message ID which is set by the provider
/// </summary>
string JMSMessageId
{
get;
}
/// <summary>
/// Whether or not this message is persistent
/// </summary>
bool JMSPersistent
{
get;
set;
}
/// <summary>
/// The Priority on this message
/// </summary>
byte JMSPriority
{
get;
set;
}
/// <summary>
/// Returns true if this message has been redelivered to this or another consumer before being acknowledged successfully.
/// </summary>
bool JMSRedelivered
{
get; get;
} }
/// <summary>
/// The destination that the consumer of this message should send replies to
/// </summary>
IDestination JMSReplyTo
{
get;
set;
}
/// <summary>
/// The timestamp the broker added to the message
/// </summary>
long JMSTimestamp
{
get;
}
/// <summary>
/// The type name of this message
/// </summary>
string JMSType
{
get;
set;
}
// JMS Extension headers
/// <summary>
/// Returns the number of times this message has been redelivered to other consumers without being acknowledged successfully.
/// </summary>
int JMSXDeliveryCount
{
get;
}
/// <summary>
/// The Message Group ID used to group messages together to the same consumer for the same group ID value
/// </summary>
string JMSXGroupID
{
get;
set;
}
/// <summary>
/// The Message Group Sequence counter to indicate the position in a group
/// </summary>
int JMSXGroupSeq
{
get;
set;
}
/// <summary>
/// Returns the ID of the producers transaction
/// </summary>
string JMSXProducerTXID
{
get;
}
} }
} }

View File

@ -0,0 +1,86 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using OpenWire.Client.Commands;
namespace OpenWire.Client
{
/// <summary>
/// Represents a Map of primitive types where the keys are all string instances
/// and the values are strings or numbers.
/// </summary>
public interface IPrimitiveMap
{
void Clear();
bool Contains(object key);
void Remove(object key);
int Count
{
get;
}
ICollection Keys
{
get;
}
ICollection Values
{
get;
}
object this[string key]
{
get;
set;
}
string GetString(string key);
void SetString(string key, string value);
bool GetBool(string key);
void SetByte(string key, bool value);
byte GetByte(string key);
void SetByte(string key, byte value);
char GetChar(string key);
void SetChar(string key, char value);
short GetShort(string key);
void SetShort(string key, short value);
int GetInt(string key);
void SetInt(string key, int value);
long GetLong(string key);
void SetLong(string key, long value);
float GetFloat(string key);
void SetFloat(string key, float value);
double GetDouble(string key);
void SetDouble(string key, double value);
}
}

View File

@ -17,12 +17,17 @@
using System; using System;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
namespace OpenWire.Client { namespace OpenWire.Client
{
/// <summary> /// <summary>
/// Represents a single unit of work on an IConnection. /// Represents a single unit of work on an IConnection.
/// So the ISession can be used to perform transactional receive and sends /// So the ISession can be used to perform transactional receive and sends
/// </summary> /// </summary>
public interface ISession : IDisposable { public interface ISession : IDisposable
{
/// <summary> /// <summary>
/// Creates a producer of messages /// Creates a producer of messages
@ -59,6 +64,20 @@ namespace OpenWire.Client {
/// </summary> /// </summary>
ITopic GetTopic(string name); ITopic GetTopic(string name);
/// <summary>
/// Creates a temporary queue
/// </summary>
ITemporaryQueue CreateTemporaryQueue();
/// <summary>
/// Creates a temporary topic
/// </summary>
ITemporaryTopic CreateTemporaryTopic();
// Factory methods to create messages
/// <summary> /// <summary>
/// Creates a new message with an empty body /// Creates a new message with an empty body
/// </summary> /// </summary>
@ -73,5 +92,21 @@ namespace OpenWire.Client {
/// Creates a new text message with the given body /// Creates a new text message with the given body
/// </summary> /// </summary>
ITextMessage CreateTextMessage(string text); ITextMessage CreateTextMessage(string text);
/// <summary>
/// Creates a new Map message which contains primitive key and value pairs
/// </summary>
IMapMessage CreateMapMessage();
/// <summary>
/// Creates a new binary message
/// </summary>
IBytesMessage CreateBytesMessage();
/// <summary>
/// Creates a new binary message with the given body
/// </summary>
IBytesMessage CreateBytesMessage(byte[] body);
} }
} }

View File

@ -17,13 +17,16 @@
using System; using System;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
namespace OpenWire.Client { namespace OpenWire.Client
{
/// <summary> /// <summary>
/// Represents a text based message /// Represents a text based message
/// </summary> /// </summary>
public interface ITextMessage : IMessage { public interface ITextMessage : IMessage
{
string Text { string Text
{
get; get;
set; set;
} }

View File

@ -18,6 +18,7 @@ using System;
using System.Collections; using System.Collections;
using System.Threading; using System.Threading;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client namespace OpenWire.Client
{ {

View File

@ -57,7 +57,7 @@ namespace OpenWire.Client
ActiveMQMessage activeMessage = (ActiveMQMessage) message; ActiveMQMessage activeMessage = (ActiveMQMessage) message;
activeMessage.MessageId = id; activeMessage.MessageId = id;
activeMessage.ProducerId = info.ProducerId; activeMessage.ProducerId = info.ProducerId;
activeMessage.Destination = (ActiveMQDestination) destination; activeMessage.Destination = ActiveMQDestination.Transform(destination);
session.DoSend(destination, message); session.DoSend(destination, message);
} }
@ -78,7 +78,6 @@ namespace OpenWire.Client
get { return timeToLive; } get { return timeToLive; }
set { this.timeToLive = value; } set { this.timeToLive = value; }
} }
public int Priority public int Priority
{ {
get { return priority; } get { return priority; }

View File

@ -124,6 +124,18 @@ namespace OpenWire.Client
return new ActiveMQTopic(name); return new ActiveMQTopic(name);
} }
public ITemporaryQueue CreateTemporaryQueue()
{
return new ActiveMQTempQueue(connection.CreateTemporaryDestinationName());
}
public ITemporaryTopic CreateTemporaryTopic()
{
return new ActiveMQTempTopic(connection.CreateTemporaryDestinationName());
}
public IMessage CreateMessage() public IMessage CreateMessage()
{ {
ActiveMQMessage answer = new ActiveMQMessage(); ActiveMQMessage answer = new ActiveMQMessage();
@ -146,6 +158,26 @@ namespace OpenWire.Client
return answer; return answer;
} }
public IMapMessage CreateMapMessage()
{
return new ActiveMQMapMessage();
}
public IBytesMessage CreateBytesMessage()
{
return new ActiveMQBytesMessage();
}
public IBytesMessage CreateBytesMessage(byte[] body)
{
ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
answer.Content = body;
return answer;
}
// Implementation methods // Implementation methods
public void DoSend(IDestination destination, IMessage message) public void DoSend(IDestination destination, IMessage message)
{ {
@ -181,7 +213,7 @@ namespace OpenWire.Client
id.Value = ++consumerCounter; id.Value = ++consumerCounter;
} }
answer.ConsumerId = id; answer.ConsumerId = id;
answer.Destination = (ActiveMQDestination) destination; answer.Destination = ActiveMQDestination.Transform(destination);
answer.Selector = selector; answer.Selector = selector;
answer.PrefetchSize = prefetchSize; answer.PrefetchSize = prefetchSize;
@ -200,7 +232,7 @@ namespace OpenWire.Client
id.Value = ++producerCounter; id.Value = ++producerCounter;
} }
answer.ProducerId = id; answer.ProducerId = id;
answer.Destination = (ActiveMQDestination) destination; answer.Destination = ActiveMQDestination.Transform(destination);
return answer; return answer;
} }

View File

@ -0,0 +1,58 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using NUnit.Framework;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client
{
[ TestFixture ]
public class BytesMessageTest : TestSupport
{
byte[] expected = {1, 2, 3, 4, 5, 6, 7, 8};
[ Test ]
public override void SendAndSyncReceive()
{
base.SendAndSyncReceive();
}
protected override IMessage CreateMessage(ISession session)
{
IBytesMessage request = session.CreateBytesMessage(expected);
return request;
}
protected override void AssertValidMessage(IMessage message)
{
Assert.IsTrue(message is IBytesMessage, "Did not receive a IBytesMessage: " + message);
Console.WriteLine("Received IBytesMessage: " + message);
IBytesMessage bytesMessage = (IBytesMessage) message;
byte[] actual = bytesMessage.Content;
Console.WriteLine("Received message with content: " + actual);
Assert.AreEqual(expected, actual, "the message content");
}
}
}

View File

@ -27,46 +27,28 @@ namespace OpenWire.Client
[ TestFixture ] [ TestFixture ]
public class ClientTest : TestSupport public class ClientTest : TestSupport
{ {
[ Test ]
public void SendAndSyncReceive()
{
IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
Assert.IsTrue(factory != null, "no factory created");
using (IConnection connection = factory.CreateConnection())
{
try
{
Assert.IsTrue(connection != null, "no connection created");
Console.WriteLine("Connected to ActiveMQ!");
ISession session = connection.CreateSession();
IDestination destination = session.GetQueue("FOO.BAR");
Assert.IsTrue(destination != null, "No queue available!");
IMessageConsumer consumer = session.CreateConsumer(destination);
IMessageProducer producer = session.CreateProducer(destination);
string expected = "Hello World!"; string expected = "Hello World!";
ITextMessage request = session.CreateTextMessage(expected);
producer.Send(request); [ Test ]
public override void SendAndSyncReceive()
ITextMessage message = (ITextMessage) consumer.Receive();
Assert.IsNotNull(message, "No message returned!");
Assert.AreEqual(expected, message.Text, "the message text");
}
catch (Exception e)
{ {
Console.WriteLine("Caught: " + e); base.SendAndSyncReceive();
} }
protected override IMessage CreateMessage(ISession session)
{
IMessage request = session.CreateTextMessage(expected);
return request;
} }
protected override void AssertValidMessage(IMessage message)
{
ITextMessage textMessage = (ITextMessage) message;
String text = textMessage.Text;
Console.WriteLine("Received message with text: " + text);
Assert.AreEqual(expected, text, "the message text");
} }
} }
} }

View File

@ -0,0 +1,127 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using NUnit.Framework;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client
{
[ TestFixture ]
public class JMSPropertyTest : TestSupport
{
// standard JMS properties
string expectedText = "Hey this works!";
string correlationID = "abc";
ITemporaryQueue replyTo;
bool persistent = true;
byte priority = 5;
String type = "FooType";
String groupID = "MyGroup";
int groupSeq = 1;
// custom properties
string customText = "Cheese";
bool custom1 = true;
byte custom2 = 12;
short custom3 = 0x1234;
int custom4 = 0x12345678;
long custom5 = 0x1234567812345678;
char custom6 = 'J';
[ Test ]
public override void SendAndSyncReceive()
{
base.SendAndSyncReceive();
}
protected override IMessage CreateMessage(ISession session)
{
ITextMessage message = session.CreateTextMessage(expectedText);
replyTo = session.CreateTemporaryQueue();
// lets set the headers
message.JMSCorrelationID = correlationID;
message.JMSReplyTo = replyTo;
message.JMSPersistent = persistent;
message.JMSPriority = priority;
message.JMSType = type;
message.JMSXGroupID = groupID;
message.JMSXGroupSeq = groupSeq;
// lets set the custom headers
message.Properties["customText"] = customText;
message.Properties["custom1"] = custom1;
message.Properties["custom2"] = custom2;
message.Properties["custom3"] = custom3;
message.Properties["custom4"] = custom4;
message.Properties["custom5"] = custom5;
message.Properties["custom6"] = custom6;
return message;
}
protected override void AssertValidMessage(IMessage message)
{
Assert.IsTrue(message is ITextMessage, "Did not receive a ITextMessage!");
Console.WriteLine("Received Message: " + message);
ITextMessage textMessage = (ITextMessage) message;
String text = textMessage.Text;
Assert.AreEqual(expectedText, text, "the message text");
// compare standard JMS headers
Assert.AreEqual(correlationID, message.JMSCorrelationID, "JMSCorrelationID");
Assert.AreEqual(replyTo, message.JMSReplyTo, "JMSReplyTo");
Assert.AreEqual(persistent, message.JMSPersistent, "JMSPersistent");
Assert.AreEqual(priority, message.JMSPriority, "JMSPriority");
Assert.AreEqual(type, message.JMSType, "JMSType");
Assert.AreEqual(groupID, message.JMSXGroupID, "JMSXGroupID");
Assert.AreEqual(groupSeq, message.JMSXGroupSeq, "JMSXGroupSeq");
// compare custom headers
Assert.AreEqual(customText, message.Properties["customText"], "customText");
Assert.AreEqual(custom1, message.Properties["custom1"], "custom1");
Assert.AreEqual(custom2, message.Properties["custom2"], "custom2");
Assert.AreEqual(custom3, message.Properties["custom3"], "custom3");
Assert.AreEqual(custom4, message.Properties["custom4"], "custom4");
// TODO
//Assert.AreEqual(custom5, message.Properties["custom5"], "custom5");
Assert.AreEqual(custom4, message.Properties["custom6"], "custom6");
Assert.AreEqual(custom1, message.Properties.GetBool("custom1"), "custom1");
Assert.AreEqual(custom2, message.Properties.GetByte("custom2"), "custom2");
Assert.AreEqual(custom3, message.Properties.GetShort("custom3"), "custom3");
Assert.AreEqual(custom4, message.Properties.GetInt("custom4"), "custom4");
//Assert.AreEqual(custom5, message.Properties.GetLong("custom5"), "custom5");
Assert.AreEqual(custom4, message.Properties.GetChar("custom6"), "custom6");
// lets now look at some standard JMS headers
Console.WriteLine("JMSExpiration: " + message.JMSExpiration);
Console.WriteLine("JMSMessageId: " + message.JMSMessageId);
Console.WriteLine("JMSRedelivered: " + message.JMSRedelivered);
Console.WriteLine("JMSTimestamp: " + message.JMSTimestamp);
Console.WriteLine("JMSXDeliveryCount: " + message.JMSXDeliveryCount);
Console.WriteLine("JMSXProducerTXID: " + message.JMSXProducerTXID);
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using NUnit.Framework;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client
{
[ TestFixture ]
public class MapMessageTest : TestSupport
{
bool a = true;
byte b = 123;
char c = 'c';
short d = 0x1234;
int e = 0x12345678;
long f = 0x1234567812345678;
string g = "Hello World!";
[ Test ]
public override void SendAndSyncReceive()
{
base.SendAndSyncReceive();
}
protected override IMessage CreateMessage(ISession session)
{
IMapMessage request = session.CreateMapMessage();
return request;
}
protected override void AssertValidMessage(IMessage message)
{
Assert.IsTrue(message is IMapMessage, "Did not receive a MapMessage!");
Console.WriteLine("Received MapMessage: " + message);
IMapMessage mapMessage = (IMapMessage) message;
/*
String text = mapMessage.Text;
Assert.AreEqual(expected, text, "the message text");
*/
}
}
}

View File

@ -17,13 +17,72 @@
using System; using System;
using System.IO; using System.IO;
using OpenWire.Client; using NUnit.Framework;
namespace OpenWire.Client { using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client
{
/// <summary> /// <summary>
/// useful base class for test cases /// useful base class for test cases
/// </summary> /// </summary>
public abstract class TestSupport { [ TestFixture ]
public abstract class TestSupport
{
[ Test ]
public virtual void SendAndSyncReceive()
{
IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
Assert.IsTrue(factory != null, "no factory created");
using (IConnection connection = factory.CreateConnection())
{
try
{
Assert.IsTrue(connection != null, "no connection created");
Console.WriteLine("Connected to ActiveMQ!");
ISession session = connection.CreateSession();
IDestination destination = CreateDestination(session);
Assert.IsTrue(destination != null, "No queue available!");
IMessageConsumer consumer = session.CreateConsumer(destination);
IMessageProducer producer = session.CreateProducer(destination);
IMessage request = CreateMessage(session);
producer.Send(request);
IMessage message = consumer.Receive();
Assert.IsNotNull(message, "No message returned!");
AssertValidMessage(message);
}
catch (Exception e)
{
Console.WriteLine("Caught: " + e);
}
}
}
protected virtual IDestination CreateDestination(ISession session)
{
string name = "Test.DotNet." + GetType().Name;
IDestination destination = session.GetQueue(name);
Console.WriteLine("Using queue: " + destination);
return destination;
}
protected abstract IMessage CreateMessage(ISession session);
protected abstract void AssertValidMessage(IMessage message);
} }
} }