working OpenWire.Net client! While not all of JMS is supported just yet, the test program shows the creation of a Connection, Session, Consumer, Producer and sending and receiving a message (using the synchronous dispatch)

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@380220 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-02-23 20:36:12 +00:00
parent 2ce853a42d
commit 33094e694a
55 changed files with 878 additions and 290 deletions

View File

@ -36,10 +36,12 @@ namespace OpenWire.Client.Commands
public const byte ID_ActiveMQObjectMessage = 26;
public override string ToString() {
return GetType().Name + "["
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -4,39 +4,75 @@ using System.Collections;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client.Commands {
public class ActiveMQTextMessage : ActiveMQMessage, ITextMessage {
public const byte ID_ActiveMQTextMessage = 28;
private String text;
public ActiveMQTextMessage() {
}
public ActiveMQTextMessage(String text) {
this.text = text;
}
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
public override byte GetDataStructureType() {
return ID_ActiveMQTextMessage;
}
// Properties
public string Text {
get {
if (text == null) {
// TODO parse from the content
}
return text;
namespace OpenWire.Client.Commands
{
public class ActiveMQTextMessage : ActiveMQMessage, ITextMessage
{
public const byte ID_ActiveMQTextMessage = 28;
private String text;
public ActiveMQTextMessage()
{
}
public ActiveMQTextMessage(String text)
{
this.Text = text;
}
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
public override byte GetDataStructureType()
{
return ID_ActiveMQTextMessage;
}
// Properties
public string Text
{
get {
if (text == null)
{
// now lets read the content
byte[] data = this.Content;
if (data != null)
{
// TODO assume that the text is ASCII
char[] chars = new char[data.Length];
for (int i = 0; i < chars.Length; i++)
{
chars[i] = (char) data[i];
}
set { this.text = value; }
}
}
text = new String(chars);
}
}
return text;
}
set {
this.text = value;
byte[] data = null;
if (text != null)
{
// TODO assume that the text is ASCII
data = new byte[text.Length];
// now lets write the bytes
char[] chars = text.ToCharArray();
for (int i = 0; i < chars.Length; i++)
{
data[i] = (byte) chars[i];
}
}
this.Content = data;
}
}
}
}

View File

@ -37,10 +37,35 @@ namespace OpenWire.Client.Commands
string value;
public override int GetHashCode() {
int answer = 0;
answer = (answer * 37) + HashCode(Value);
return answer;
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override bool Equals(object that) {
if (that is BrokerId) {
return Equals((BrokerId) that);
}
return false;
}
public virtual bool Equals(BrokerId that) {
if (! Equals(this.Value, that.Value)) return false;
return true;
}
public override string ToString() {
return GetType().Name + "["
+ " Value=" + Value
+ " ]";
}
public override byte GetDataStructureType() {

View File

@ -41,10 +41,17 @@ namespace OpenWire.Client.Commands
string brokerName;
bool slaveBroker;
public override string ToString() {
return GetType().Name + "["
+ " BrokerId=" + BrokerId
+ " BrokerURL=" + BrokerURL
+ " PeerBrokerInfos=" + PeerBrokerInfos
+ " BrokerName=" + BrokerName
+ " SlaveBroker=" + SlaveBroker
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -38,10 +38,14 @@ namespace OpenWire.Client.Commands
BrokerError exception;
ConnectionId connectionId;
public override string ToString() {
return GetType().Name + "["
+ " Exception=" + Exception
+ " ConnectionId=" + ConnectionId
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -37,10 +37,35 @@ namespace OpenWire.Client.Commands
string value;
public override int GetHashCode() {
int answer = 0;
answer = (answer * 37) + HashCode(Value);
return answer;
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override bool Equals(object that) {
if (that is ConnectionId) {
return Equals((ConnectionId) that);
}
return false;
}
public virtual bool Equals(ConnectionId that) {
if (! Equals(this.Value, that.Value)) return false;
return true;
}
public override string ToString() {
return GetType().Name + "["
+ " Value=" + Value
+ " ]";
}
public override byte GetDataStructureType() {

View File

@ -41,10 +41,17 @@ namespace OpenWire.Client.Commands
string userName;
BrokerId[] brokerPath;
public override string ToString() {
return GetType().Name + "["
+ " ConnectionId=" + ConnectionId
+ " ClientId=" + ClientId
+ " Password=" + Password
+ " UserName=" + UserName
+ " BrokerPath=" + BrokerPath
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -39,10 +39,41 @@ namespace OpenWire.Client.Commands
long sessionId;
long value;
public override int GetHashCode() {
int answer = 0;
answer = (answer * 37) + HashCode(ConnectionId);
answer = (answer * 37) + HashCode(SessionId);
answer = (answer * 37) + HashCode(Value);
return answer;
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override bool Equals(object that) {
if (that is ConsumerId) {
return Equals((ConsumerId) that);
}
return false;
}
public virtual bool Equals(ConsumerId that) {
if (! Equals(this.ConnectionId, that.ConnectionId)) return false;
if (! Equals(this.SessionId, that.SessionId)) return false;
if (! Equals(this.Value, that.Value)) return false;
return true;
}
public override string ToString() {
return GetType().Name + "["
+ " ConnectionId=" + ConnectionId
+ " SessionId=" + SessionId
+ " Value=" + Value
+ " ]";
}
public override byte GetDataStructureType() {

View File

@ -49,10 +49,25 @@ namespace OpenWire.Client.Commands
BrokerId[] brokerPath;
bool networkSubscription;
public override string ToString() {
return GetType().Name + "["
+ " ConsumerId=" + ConsumerId
+ " Browser=" + Browser
+ " Destination=" + Destination
+ " PrefetchSize=" + PrefetchSize
+ " DispatchAsync=" + DispatchAsync
+ " Selector=" + Selector
+ " SubcriptionName=" + SubcriptionName
+ " NoLocal=" + NoLocal
+ " Exclusive=" + Exclusive
+ " Retroactive=" + Retroactive
+ " Priority=" + Priority
+ " BrokerPath=" + BrokerPath
+ " NetworkSubscription=" + NetworkSubscription
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -37,10 +37,13 @@ namespace OpenWire.Client.Commands
string command;
public override string ToString() {
return GetType().Name + "["
+ " Command=" + Command
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -37,10 +37,13 @@ namespace OpenWire.Client.Commands
DataStructure[] data;
public override string ToString() {
return GetType().Name + "["
+ " Data=" + Data
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -37,10 +37,13 @@ namespace OpenWire.Client.Commands
DataStructure data;
public override string ToString() {
return GetType().Name + "["
+ " Data=" + Data
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -41,10 +41,17 @@ namespace OpenWire.Client.Commands
long timeout;
BrokerId[] brokerPath;
public override string ToString() {
return GetType().Name + "["
+ " ConnectionId=" + ConnectionId
+ " Destination=" + Destination
+ " OperationType=" + OperationType
+ " Timeout=" + Timeout
+ " BrokerPath=" + BrokerPath
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -38,10 +38,14 @@ namespace OpenWire.Client.Commands
string serviceName;
string brokerName;
public override string ToString() {
return GetType().Name + "["
+ " ServiceName=" + ServiceName
+ " BrokerName=" + BrokerName
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -37,10 +37,13 @@ namespace OpenWire.Client.Commands
BrokerError exception;
public override string ToString() {
return GetType().Name + "["
+ " Exception=" + Exception
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -36,10 +36,12 @@ namespace OpenWire.Client.Commands
public const byte ID_FlushCommand = 15;
public override string ToString() {
return GetType().Name + "["
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -37,10 +37,13 @@ namespace OpenWire.Client.Commands
int result;
public override string ToString() {
return GetType().Name + "["
+ " Result=" + Result
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -38,10 +38,14 @@ namespace OpenWire.Client.Commands
ActiveMQDestination destination;
MessageAck messageAck;
public override string ToString() {
return GetType().Name + "["
+ " Destination=" + Destination
+ " MessageAck=" + MessageAck
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -42,10 +42,18 @@ namespace OpenWire.Client.Commands
string clientId;
TransactionId transactionId;
public override string ToString() {
return GetType().Name + "["
+ " Destination=" + Destination
+ " MessageId=" + MessageId
+ " MessageSequenceId=" + MessageSequenceId
+ " SubscritionName=" + SubscritionName
+ " ClientId=" + ClientId
+ " TransactionId=" + TransactionId
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -37,10 +37,13 @@ namespace OpenWire.Client.Commands
string message;
public override string ToString() {
return GetType().Name + "["
+ " Message=" + Message
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -39,10 +39,15 @@ namespace OpenWire.Client.Commands
byte type;
bool wasPrepared;
public override string ToString() {
return GetType().Name + "["
+ " TransactionId=" + TransactionId
+ " Type=" + Type
+ " WasPrepared=" + WasPrepared
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -36,10 +36,12 @@ namespace OpenWire.Client.Commands
public const byte ID_KeepAliveInfo = 10;
public override string ToString() {
return GetType().Name + "["
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -38,10 +38,38 @@ namespace OpenWire.Client.Commands
long value;
ConnectionId connectionId;
public override int GetHashCode() {
int answer = 0;
answer = (answer * 37) + HashCode(Value);
answer = (answer * 37) + HashCode(ConnectionId);
return answer;
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override bool Equals(object that) {
if (that is LocalTransactionId) {
return Equals((LocalTransactionId) that);
}
return false;
}
public virtual bool Equals(LocalTransactionId that) {
if (! Equals(this.Value, that.Value)) return false;
if (! Equals(this.ConnectionId, that.ConnectionId)) return false;
return true;
}
public override string ToString() {
return GetType().Name + "["
+ " Value=" + Value
+ " ConnectionId=" + ConnectionId
+ " ]";
}
public override byte GetDataStructureType() {

View File

@ -61,10 +61,37 @@ namespace OpenWire.Client.Commands
string userID;
bool recievedByDFBridge;
public override string ToString() {
return GetType().Name + "["
+ " ProducerId=" + ProducerId
+ " Destination=" + Destination
+ " TransactionId=" + TransactionId
+ " OriginalDestination=" + OriginalDestination
+ " MessageId=" + MessageId
+ " OriginalTransactionId=" + OriginalTransactionId
+ " GroupID=" + GroupID
+ " GroupSequence=" + GroupSequence
+ " CorrelationId=" + CorrelationId
+ " Persistent=" + Persistent
+ " Expiration=" + Expiration
+ " Priority=" + Priority
+ " ReplyTo=" + ReplyTo
+ " Timestamp=" + Timestamp
+ " Type=" + Type
+ " Content=" + Content
+ " MarshalledProperties=" + MarshalledProperties
+ " DataStructure=" + DataStructure
+ " TargetConsumerId=" + TargetConsumerId
+ " Compressed=" + Compressed
+ " RedeliveryCounter=" + RedeliveryCounter
+ " BrokerPath=" + BrokerPath
+ " Arrival=" + Arrival
+ " UserID=" + UserID
+ " RecievedByDFBridge=" + RecievedByDFBridge
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -43,10 +43,19 @@ namespace OpenWire.Client.Commands
MessageId lastMessageId;
int messageCount;
public override string ToString() {
return GetType().Name + "["
+ " Destination=" + Destination
+ " TransactionId=" + TransactionId
+ " ConsumerId=" + ConsumerId
+ " AckType=" + AckType
+ " FirstMessageId=" + FirstMessageId
+ " LastMessageId=" + LastMessageId
+ " MessageCount=" + MessageCount
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -40,10 +40,16 @@ namespace OpenWire.Client.Commands
Message message;
int redeliveryCounter;
public override string ToString() {
return GetType().Name + "["
+ " ConsumerId=" + ConsumerId
+ " Destination=" + Destination
+ " Message=" + Message
+ " RedeliveryCounter=" + RedeliveryCounter
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -40,10 +40,16 @@ namespace OpenWire.Client.Commands
long deliverySequenceId;
MessageId messageId;
public override string ToString() {
return GetType().Name + "["
+ " ConsumerId=" + ConsumerId
+ " Destination=" + Destination
+ " DeliverySequenceId=" + DeliverySequenceId
+ " MessageId=" + MessageId
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -39,10 +39,41 @@ namespace OpenWire.Client.Commands
long producerSequenceId;
long brokerSequenceId;
public override int GetHashCode() {
int answer = 0;
answer = (answer * 37) + HashCode(ProducerId);
answer = (answer * 37) + HashCode(ProducerSequenceId);
answer = (answer * 37) + HashCode(BrokerSequenceId);
return answer;
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override bool Equals(object that) {
if (that is MessageId) {
return Equals((MessageId) that);
}
return false;
}
public virtual bool Equals(MessageId that) {
if (! Equals(this.ProducerId, that.ProducerId)) return false;
if (! Equals(this.ProducerSequenceId, that.ProducerSequenceId)) return false;
if (! Equals(this.BrokerSequenceId, that.BrokerSequenceId)) return false;
return true;
}
public override string ToString() {
return GetType().Name + "["
+ " ProducerId=" + ProducerId
+ " ProducerSequenceId=" + ProducerSequenceId
+ " BrokerSequenceId=" + BrokerSequenceId
+ " ]";
}
public override byte GetDataStructureType() {

View File

@ -39,10 +39,41 @@ namespace OpenWire.Client.Commands
long value;
long sessionId;
public override int GetHashCode() {
int answer = 0;
answer = (answer * 37) + HashCode(ConnectionId);
answer = (answer * 37) + HashCode(Value);
answer = (answer * 37) + HashCode(SessionId);
return answer;
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override bool Equals(object that) {
if (that is ProducerId) {
return Equals((ProducerId) that);
}
return false;
}
public virtual bool Equals(ProducerId that) {
if (! Equals(this.ConnectionId, that.ConnectionId)) return false;
if (! Equals(this.Value, that.Value)) return false;
if (! Equals(this.SessionId, that.SessionId)) return false;
return true;
}
public override string ToString() {
return GetType().Name + "["
+ " ConnectionId=" + ConnectionId
+ " Value=" + Value
+ " SessionId=" + SessionId
+ " ]";
}
public override byte GetDataStructureType() {

View File

@ -39,10 +39,15 @@ namespace OpenWire.Client.Commands
ActiveMQDestination destination;
BrokerId[] brokerPath;
public override string ToString() {
return GetType().Name + "["
+ " ProducerId=" + ProducerId
+ " Destination=" + Destination
+ " BrokerPath=" + BrokerPath
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -37,10 +37,13 @@ namespace OpenWire.Client.Commands
DataStructure objectId;
public override string ToString() {
return GetType().Name + "["
+ " ObjectId=" + ObjectId
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -39,10 +39,15 @@ namespace OpenWire.Client.Commands
string subcriptionName;
string clientId;
public override string ToString() {
return GetType().Name + "["
+ " ConnectionId=" + ConnectionId
+ " SubcriptionName=" + SubcriptionName
+ " ClientId=" + ClientId
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -37,10 +37,13 @@ namespace OpenWire.Client.Commands
short correlationId;
public override string ToString() {
return GetType().Name + "["
+ " CorrelationId=" + CorrelationId
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -38,10 +38,38 @@ namespace OpenWire.Client.Commands
string connectionId;
long value;
public override int GetHashCode() {
int answer = 0;
answer = (answer * 37) + HashCode(ConnectionId);
answer = (answer * 37) + HashCode(Value);
return answer;
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override bool Equals(object that) {
if (that is SessionId) {
return Equals((SessionId) that);
}
return false;
}
public virtual bool Equals(SessionId that) {
if (! Equals(this.ConnectionId, that.ConnectionId)) return false;
if (! Equals(this.Value, that.Value)) return false;
return true;
}
public override string ToString() {
return GetType().Name + "["
+ " ConnectionId=" + ConnectionId
+ " Value=" + Value
+ " ]";
}
public override byte GetDataStructureType() {

View File

@ -37,10 +37,13 @@ namespace OpenWire.Client.Commands
SessionId sessionId;
public override string ToString() {
return GetType().Name + "["
+ " SessionId=" + SessionId
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -36,10 +36,12 @@ namespace OpenWire.Client.Commands
public const byte ID_ShutdownInfo = 11;
public override string ToString() {
return GetType().Name + "["
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -40,10 +40,16 @@ namespace OpenWire.Client.Commands
string selector;
string subcriptionName;
public override string ToString() {
return GetType().Name + "["
+ " ClientId=" + ClientId
+ " Destination=" + Destination
+ " Selector=" + Selector
+ " SubcriptionName=" + SubcriptionName
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -36,10 +36,32 @@ namespace OpenWire.Client.Commands
public const byte ID_TransactionId = 0;
public override int GetHashCode() {
int answer = 0;
return answer;
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override bool Equals(object that) {
if (that is TransactionId) {
return Equals((TransactionId) that);
}
return false;
}
public virtual bool Equals(TransactionId that) {
return true;
}
public override string ToString() {
return GetType().Name + "["
+ " ]";
}
public override byte GetDataStructureType() {

View File

@ -39,10 +39,15 @@ namespace OpenWire.Client.Commands
TransactionId transactionId;
byte type;
public override string ToString() {
return GetType().Name + "["
+ " ConnectionId=" + ConnectionId
+ " TransactionId=" + TransactionId
+ " Type=" + Type
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -39,10 +39,15 @@ namespace OpenWire.Client.Commands
int version;
int options;
public override string ToString() {
return GetType().Name + "["
+ " Magic=" + Magic
+ " Version=" + Version
+ " Options=" + Options
+ " ]";
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override byte GetDataStructureType() {

View File

@ -39,10 +39,41 @@ namespace OpenWire.Client.Commands
byte[] globalTransactionId;
byte[] branchQualifier;
public override int GetHashCode() {
int answer = 0;
answer = (answer * 37) + HashCode(FormatId);
answer = (answer * 37) + HashCode(GlobalTransactionId);
answer = (answer * 37) + HashCode(BranchQualifier);
return answer;
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
}
public override bool Equals(object that) {
if (that is XATransactionId) {
return Equals((XATransactionId) that);
}
return false;
}
public virtual bool Equals(XATransactionId that) {
if (! Equals(this.FormatId, that.FormatId)) return false;
if (! Equals(this.GlobalTransactionId, that.GlobalTransactionId)) return false;
if (! Equals(this.BranchQualifier, that.BranchQualifier)) return false;
return true;
}
public override string ToString() {
return GetType().Name + "["
+ " FormatId=" + FormatId
+ " GlobalTransactionId=" + GlobalTransactionId
+ " BranchQualifier=" + BranchQualifier
+ " ]";
}
public override byte GetDataStructureType() {

View File

@ -15,13 +15,15 @@ namespace OpenWire.Client
private ITransport transport;
private ConnectionInfo info;
private WireFormatInfo wireFormatInfo = new WireFormatInfo();
IList sessions = new ArrayList();
private BrokerInfo brokerInfo; // from broker
private WireFormatInfo brokerWireFormatInfo; // from broker
private IList sessions = new ArrayList();
private bool transacted;
private bool connected;
private bool closed;
private AcknowledgementMode acknowledgementMode;
private long sessionCounter;
private IDictionary consumers = new Hashtable(); // TODO threadsafe
private IDictionary consumers = new Hashtable();// TODO threadsafe
public Connection(ITransport transport, ConnectionInfo info)
@ -29,6 +31,7 @@ namespace OpenWire.Client
this.transport = transport;
this.info = info;
this.transport.Command += new CommandHandler(OnCommand);
this.transport.Start();
}
@ -135,18 +138,13 @@ namespace OpenWire.Client
}
if (!connected)
{
Console.WriteLine("ConnectionId: " + info.ConnectionId.Value);
Console.WriteLine("ClientID: " + info.ClientId);
Console.WriteLine("About to send WireFormatInfo: " + wireFormatInfo);
// lets configure the wire format
wireFormatInfo.Magic = CreateMagicBytes();
wireFormatInfo.Version = 1;
transport.Oneway(wireFormatInfo);
Console.WriteLine("About to send ConnectionInfo: " + info);
// now lets send the connection and see if we get an ack/nak
SyncRequest(info);
Console.WriteLine("Received connection info response");
connected = true;
}
}
@ -158,7 +156,6 @@ namespace OpenWire.Client
/// <param name="consumer">A MessageConsumer</param>
public void AddConsumer(ConsumerId consumerId, MessageConsumer consumer)
{
Console.WriteLine("#### Adding consumerId: " + consumerId.Value + " session: " + consumerId.SessionId + " with consumer: " + consumer);
consumers[consumerId] = consumer;
}
@ -180,22 +177,30 @@ namespace OpenWire.Client
/// <param name="command">A Command</param>
protected void OnCommand(ITransport transport, Command command)
{
if (command is MessageDispatch) {
if (command is MessageDispatch)
{
MessageDispatch dispatch = (MessageDispatch) command;
ConsumerId consumerId = dispatch.ConsumerId;
MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
if (consumer == null) {
Console.WriteLine("No such consumer active: " + consumerId);
Console.WriteLine("No such consumer active: " + consumerId.Value);
Console.WriteLine("No such consumer active: " + consumerId.SessionId);
if (consumer == null)
{
Console.WriteLine("ERROR: No such consumer active: " + consumerId);
}
else {
else
{
ActiveMQMessage message = (ActiveMQMessage) dispatch.Message;
consumer.Dispatch(message);
}
}
else {
Console.WriteLine("Unknown command: " + command);
else if (command is WireFormatInfo) {
this.brokerWireFormatInfo = (WireFormatInfo) command;
}
else if (command is BrokerInfo) {
this.brokerInfo = (BrokerInfo) command;
}
else
{
Console.WriteLine("ERROR:ÊUnknown command: " + command);
}
}

View File

@ -74,7 +74,7 @@ namespace OpenWire.Client
// Implementation methods
protected ConnectionInfo CreateConnectionInfo(string userName, string password)
protected virtual ConnectionInfo CreateConnectionInfo(string userName, string password)
{
ConnectionInfo answer = new ConnectionInfo();
ConnectionId connectionId = new ConnectionId();

View File

@ -109,5 +109,18 @@ namespace OpenWire.Client.Core
}
return packetTypeStr;
}
// Helper methods
public int HashCode(object value)
{
if (value != null)
{
return value.GetHashCode();
}
else
{
return -1;
}
}
}
}

View File

@ -0,0 +1,59 @@
using System.Collections;
using OpenWire.Client.Commands;
using System;
namespace OpenWire.Client
{
/// <summary>
/// Handles the multi-threaded dispatching between the transport and the consumers
/// </summary>
public class Dispatcher
{
Queue queue = Queue.Synchronized( new Queue() );
/// <summary>
/// Method Enqueue
/// </summary>
/// <param name="message">An ActiveMQMessage</param>
public void Enqueue(ActiveMQMessage message)
{
queue.Enqueue(message);
}
/// <summary>
/// Method DequeueNoWait
/// </summary>
/// <returns>An IMessage</retutns>
public IMessage DequeueNoWait()
{
lock (queue) {
if (queue.Peek() != null) {
return (IMessage) queue.Dequeue();
}
}
return null;
}
/// <summary>
/// Method Dequeue
/// </summary>
/// <param name="timeout">A long</param>
/// <returns>An IMessage</retutns>
public IMessage Dequeue(long timeout)
{
// TODO
throw new Exception("Not implemented yet");
}
/// <summary>
/// Method Dequeue
/// </summary>
/// <returns>An IMessage</retutns>
public IMessage Dequeue()
{
return (IMessage) queue.Dequeue();
}
}
}

View File

@ -4,22 +4,24 @@ using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core {
namespace OpenWire.Client.Core
{
public delegate void CommandHandler(ITransport sender, Command command);
public delegate void ExceptionHandler(ITransport sender, Exception command);
/// <summary>
/// Represents the logical networking transport layer.
/// </summary>
public interface ITransport : IStartable, IDisposable
{
void Oneway(Command command);
public delegate void CommandHandler(ITransport sender, Command command);
public delegate void ExceptionHandler(ITransport sender, Exception command);
/// <summary>
/// Represents the logical networking transport layer.
/// </summary>
public interface ITransport : IDisposable {
void Oneway(Command command);
FutureResponse AsyncRequest(Command command);
Response Request(Command command);
event CommandHandler Command;
event ExceptionHandler Exception;
}
FutureResponse AsyncRequest(Command command);
Response Request(Command command);
event CommandHandler Command;
event ExceptionHandler Exception;
}
}

View File

@ -65,7 +65,7 @@ namespace OpenWire.Client.Core
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + dataType);
Console.WriteLine("Parsing type: " + dataType + " with: " + dsm);
//Console.WriteLine("Parsing type: " + dataType + " with: " + dsm);
Object data = dsm.CreateObject();
BooleanStream bs = new BooleanStream();
bs.Unmarshal(dis);
@ -102,7 +102,7 @@ namespace OpenWire.Client.Core
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
Console.WriteLine("Marshalling type: " + type + " with structure: " + o);
//Console.WriteLine("Marshalling type: " + type + " with structure: " + o);
return 1 + dsm.Marshal1(this, o, bs);
}

View File

@ -21,35 +21,51 @@ namespace OpenWire.Client.Core
public class SocketTransport : ITransport
{
private readonly object transmissionLock = new object();
private readonly Socket socket;
private Socket socket;
private OpenWireFormat wireformat = new OpenWireFormat();
private readonly BinaryReader socketReader;
private readonly BinaryWriter socketWriter;
private readonly Thread readThread;
private BinaryReader socketReader;
private BinaryWriter socketWriter;
private Thread readThread;
private bool closed;
private IDictionary requestMap = new Hashtable(); // TODO threadsafe
private short nextCommandId;
private bool started;
public event CommandHandler Command;
public event ExceptionHandler Exception;
public SocketTransport(string host, int port)
{
Console.WriteLine("Opening socket to: " + host + " on port: " + port);
//Console.WriteLine("Opening socket to: " + host + " on port: " + port);
socket = Connect(host, port);
NetworkStream networkStream = new NetworkStream(socket);
socketWriter = new BinaryWriter(networkStream);
socketReader = new BinaryReader(networkStream);
/*
socketWriter = new BinaryWriter(new NetworkStream(socket));
socketReader = new BinaryReader(new NetworkStream(socket));
*/
// now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop));
readThread.Start();
}
/// <summary>
/// Method Start
/// </summary>
public void Start()
{
if (!started)
{
started = true;
NetworkStream networkStream = new NetworkStream(socket);
socketWriter = new BinaryWriter(networkStream);
socketReader = new BinaryReader(networkStream);
/*
socketWriter = new BinaryWriter(new NetworkStream(socket));
socketReader = new BinaryReader(new NetworkStream(socket));
*/
// now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop));
readThread.Start();
}
}
public void Oneway(Command command)
{
command.CommandId = GetNextCommandId();
@ -75,7 +91,6 @@ namespace OpenWire.Client.Core
public void Dispose()
{
Console.WriteLine("Closing the socket");
lock (transmissionLock)
{
socket.Close();
@ -87,23 +102,12 @@ namespace OpenWire.Client.Core
public void ReadLoop()
{
Console.WriteLine("Starting to read commands from ActiveMQ");
while (!closed)
{
Command command = null;
try
{
command = (Command) wireformat.Unmarshal(socketReader);
if (command != null)
{
Console.WriteLine("Received command: " + command);
if (command is RemoveInfo)
{
RemoveInfo info = (RemoveInfo) command;
Console.WriteLine("Remove CommandId: " + info.CommandId);
Console.WriteLine("Remove ObjectID: " + info.ObjectId);
}
}
}
catch (EndOfStreamException e)
{
@ -115,11 +119,15 @@ namespace OpenWire.Client.Core
// stream closed
break;
}
catch (IOException e)
{
// error, assume closing
break;
}
if (command is Response)
{
Console.WriteLine("Received response!: " + command);
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap[response.CommandId];
FutureResponse future = (FutureResponse) requestMap[response.CorrelationId];
if (future != null)
{
if (response is ExceptionResponse)
@ -142,7 +150,7 @@ namespace OpenWire.Client.Core
}
else
{
Console.WriteLine("Unknown response ID: " + response.CommandId);
Console.WriteLine("ERROR: Unknown response ID: " + response.CommandId + " for response: " + response);
}
}
else
@ -153,7 +161,7 @@ namespace OpenWire.Client.Core
}
else
{
Console.WriteLine("No handler available to process command: " + command);
Console.WriteLine("ERROR: No handler available to process command: " + command);
}
}
}
@ -166,7 +174,7 @@ namespace OpenWire.Client.Core
{
lock (transmissionLock)
{
Console.WriteLine("Sending command: " + command + " with ID: " + command.CommandId + " response: " + command.ResponseRequired);
//Console.WriteLine("Sending command: " + command + " with ID: " + command.CommandId + " response: " + command.ResponseRequired);
wireformat.Marshal(command, socketWriter);
socketWriter.Flush();
@ -202,3 +210,4 @@ namespace OpenWire.Client.Core
}
}
}

View File

@ -0,0 +1,7 @@
namespace OpenWire.Client
{
public interface IStartable
{
void Start();
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Threading;
using OpenWire.Client.Commands;
@ -13,6 +14,7 @@ namespace OpenWire.Client
private Session session;
private ConsumerInfo info;
private bool closed;
private Dispatcher dispatcher = new Dispatcher();
public event MessageHandler Listener;
@ -28,22 +30,25 @@ namespace OpenWire.Client
/// <param name="message">An ActiveMQMessage</param>
public void Dispatch(ActiveMQMessage message)
{
Console.WriteLine("Dispatching message to consumer: " + message);
dispatcher.Enqueue(message);
}
public IMessage Receive()
{
CheckClosed();
Thread.Sleep(60000);
// TODO
return null;
return dispatcher.Dequeue();
}
public IMessage Receive(long timeout)
{
CheckClosed();
return dispatcher.Dequeue(timeout);
}
public IMessage ReceiveNoWait()
{
CheckClosed();
// TODO
return null;
return dispatcher.DequeueNoWait();
}
public void Dispose()

View File

@ -38,9 +38,6 @@ namespace OpenWire.Client
activeMessage.ProducerId = info.ProducerId;
activeMessage.Destination = (ActiveMQDestination) destination;
Console.WriteLine("About to send message with MessageId: " + activeMessage.MessageId);
Console.WriteLine("About to send message with ProducerId: " + activeMessage.ProducerId);
Console.WriteLine("About to send message with Destination: " + activeMessage.Destination);
session.DoSend(destination, message);
}

View File

@ -43,12 +43,12 @@ namespace OpenWire.Client
{
if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
{
MessageAck ack = new MessageAck();
// TODO complete packet
MessageAck ack = CreateMessageAck(message);
connection.SyncRequest(ack);
}
}
public IMessageConsumer CreateConsumer(IDestination destination)
{
return CreateConsumer(destination, null);
@ -57,10 +57,22 @@ namespace OpenWire.Client
public IMessageConsumer CreateConsumer(IDestination destination, string selector)
{
ConsumerInfo command = CreateConsumerInfo(destination, selector);
connection.SyncRequest(command);
MessageConsumer consumer = new MessageConsumer(this, command);
connection.AddConsumer(command.ConsumerId, consumer);
return consumer;
ConsumerId consumerId = command.ConsumerId;
try
{
MessageConsumer consumer = new MessageConsumer(this, command);
// lets register the consumer first in case we start dispatching messages immediately
connection.AddConsumer(consumerId, consumer);
connection.SyncRequest(command);
return consumer;
}
catch (Exception e)
{
connection.RemoveConsumer(consumerId);
throw e;
}
}
public IQueue GetQueue(string name)
@ -105,26 +117,21 @@ namespace OpenWire.Client
public void DisposeOf(DataStructure objectId)
{
Console.WriteLine("Disposing of session: " + objectId + " with datatype: " + objectId.GetDataStructureType());
/*
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
connection.SyncRequest(command);
*/
// TODO dispose of all the session first?
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
connection.SyncRequest(command);
}
public void DisposeOf(ConsumerId objectId)
{
Console.WriteLine("Disposing of consumer: " + objectId);
connection.RemoveConsumer(objectId);
/*
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
connection.SyncRequest(command);
*/
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
connection.SyncRequest(command);
}
protected ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
{
ConsumerInfo answer = new ConsumerInfo();
ConsumerId id = new ConsumerId();
@ -143,7 +150,7 @@ namespace OpenWire.Client
return answer;
}
protected ProducerInfo CreateProducerInfo(IDestination destination)
protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
{
ProducerInfo answer = new ProducerInfo();
ProducerId id = new ProducerId();
@ -158,6 +165,13 @@ namespace OpenWire.Client
return answer;
}
protected virtual MessageAck CreateMessageAck(Message message)
{
MessageAck ack = new MessageAck();
// TODO complete packet
return ack;
}
/// <summary>
/// Configures the message command
/// </summary>

View File

@ -8,25 +8,9 @@ using OpenWire.Client.Core;
namespace OpenWire.Client
{
[ TestFixture ]
public class ClientTest : TestSupport
{
[ Test ]
public void CreateOpenWireFormat()
{
OpenWireFormat format = new OpenWireFormat();
Assert.IsTrue(format != null);
}
[ Test ]
public void CreateConnectionFactory()
{
IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
Assert.IsTrue(factory != null, "created valid factory: " + factory);
}
[ Test ]
public void SendAndSyncReceive()
{
@ -34,47 +18,32 @@ namespace OpenWire.Client
Assert.IsTrue(factory != null, "no factory created");
Console.WriteLine("Worked!");
using (IConnection connection = factory.CreateConnection())
{
try
{
Assert.IsTrue(connection != null, "no connection created");
Console.WriteLine("Created a connection!");
Console.WriteLine("Connected to ActiveMQ!");
ISession session = connection.CreateSession();
Console.WriteLine("Created a session: " + session);
IDestination destination = session.GetQueue("FOO.BAR");
Assert.IsTrue(destination != null, "No queue available!");
Console.WriteLine("Using destination: " + destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
Console.WriteLine("Created consumer!: " + consumer);
IMessageProducer producer = session.CreateProducer(destination);
Console.WriteLine("Created producer!: " + producer);
string expected = "Hello World!";
ITextMessage request = session.CreateTextMessage(expected);
Console.WriteLine("### About to send message: " + request);
producer.Send(request);
Console.WriteLine("### Sent message!");
ITextMessage message = (ITextMessage) consumer.Receive();
if (message == null)
{
Console.WriteLine("### No message!!");
}
else
{
Console.WriteLine("### Received message: " + message + " of type: " + message.GetType());
String actual = message.Text;
Console.WriteLine("### Message text is: " + actual);
}
Assert.IsNotNull(message, "No message returned!");
Assert.AreEqual(expected, message.Text, "the message text");
}
catch (Exception e)
{

View File

@ -0,0 +1,45 @@
using NUnit.Framework;
using OpenWire.Client.Commands;
using System.Collections;
namespace OpenWire.Client
{
[TestFixture]
public class CommandTest
{
[Test]
public void TestCommand()
{
ConsumerId value1 = new ConsumerId();
value1.ConnectionId = "abc";
value1.SessionId = 123;
value1.Value = 456;
ConsumerId value2 = new ConsumerId();
value2.ConnectionId = "abc";
value2.SessionId = 123;
value2.Value = 456;
ConsumerId value3 = new ConsumerId();
value3.ConnectionId = "abc";
value3.SessionId = 123;
value3.Value = 457;
Assert.AreEqual(value1, value2, "value1 and value2 should be equal");
Assert.AreEqual(value1.GetHashCode(), value2.GetHashCode(), "value1 and value2 hash codes should be equal");
Assert.IsTrue(!value1.Equals(value3), "value1 and value3 should not be equal");
Assert.IsTrue(!value3.Equals(value2), "value3 and value2 should not be equal");
// now lets test an IDictionary
IDictionary dictionary = new Hashtable();
dictionary[value1] = value3;
// now lets lookup with a copy
object actual = dictionary[value2];
Assert.AreEqual(value3, actual, "Should have found item in Map using value2 as a key");
}
}
}

View File

@ -3,6 +3,7 @@ using System.IO;
using OpenWire.Client;
using OpenWire.Client.Core;
using OpenWire.Client.Commands;
namespace openwire_dotnet
{
@ -32,10 +33,14 @@ namespace openwire_dotnet
IMessageProducer producer = session.CreateProducer(destination);
string expected = "Hello World!";
ITextMessage request = session.CreateTextMessage(expected);
producer.Send(request);
ITextMessage message = (ITextMessage) consumer.Receive();
Console.WriteLine("### About to receive message...");
ActiveMQTextMessage message = (ActiveMQTextMessage) consumer.Receive();
if (message == null)
{
Console.WriteLine("### No message!!");