From 8ba8da23883d57d93df61d9731a445f90d53b4a0 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Thu, 23 Feb 2006 18:16:15 +0000 Subject: [PATCH] updated working .Net code which is capable of creating connections, sessions, producers and consumers - not quite completed the consumer side yet but we can send messages now! :) git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@380183 13f79535-47bb-0310-9956-ffa450edef68 --- .../Commands/ActiveMQMessage.cs | 8 +- .../OpenWire.Client/Commands/ActiveMQQueue.cs | 60 +- .../Commands/ActiveMQTempQueue.cs | 60 +- .../Commands/ActiveMQTempTopic.cs | 59 +- .../OpenWire.Client/Commands/ActiveMQTopic.cs | 60 +- .../OpenWire.Client/Commands/BaseCommand.cs | 81 +- .../src/OpenWire.Client/Connection.cs | 313 +++--- .../src/OpenWire.Client/ConnectionFactory.cs | 176 ++-- .../OpenWire.Client/Core/AbstractCommand.cs | 188 ++-- .../src/OpenWire.Client/Core/BooleanStream.cs | 194 ++-- .../src/OpenWire.Client/Core/Command.cs | 37 +- .../Core/DataStreamMarshaller.cs | 899 ++++++++++-------- .../OpenWire.Client/Core/FutureResponse.cs | 107 ++- .../OpenWire.Client/Core/OpenWireFormat.cs | 279 +++--- .../OpenWire.Client/Core/SocketTransport.cs | 309 +++--- .../src/OpenWire.Client/IMessageConsumer.cs | 39 +- .../IO/BaseCommandMarshaller.cs | 4 +- .../IO/BrokerInfoMarshaller.cs | 2 +- .../IO/ConnectionInfoMarshaller.cs | 2 +- .../IO/ConsumerInfoMarshaller.cs | 10 +- .../IO/DataArrayResponseMarshaller.cs | 2 +- .../IO/DestinationInfoMarshaller.cs | 6 +- .../IO/IntegerResponseMarshaller.cs | 4 +- .../IO/JournalTransactionMarshaller.cs | 4 +- .../IO/MessageAckMarshaller.cs | 8 +- .../IO/MessageDispatchMarshaller.cs | 4 +- .../OpenWire.Client/IO/MessageMarshaller.cs | 18 +- .../IO/ProducerInfoMarshaller.cs | 2 +- .../OpenWire.Client/IO/ResponseMarshaller.cs | 4 +- .../IO/TransactionInfoMarshaller.cs | 4 +- .../IO/WireFormatInfoMarshaller.cs | 8 +- .../IO/XATransactionIdMarshaller.cs | 8 +- .../src/OpenWire.Client/MessageConsumer.cs | 93 +- .../src/OpenWire.Client/MessageProducer.cs | 74 +- .../src/OpenWire.Client/Session.cs | 260 +++-- .../tests/OpenWire.Client/ClientTest.cs | 111 ++- .../tests/OpenWire.Client/EndianTest.cs | 60 ++ .../tests/OpenWire.Client/TestMain.cs | 59 ++ 38 files changed, 2166 insertions(+), 1450 deletions(-) create mode 100644 openwire-dotnet/tests/OpenWire.Client/EndianTest.cs create mode 100644 openwire-dotnet/tests/OpenWire.Client/TestMain.cs diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs index 5d583dc44e..d71716c14d 100644 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs @@ -22,13 +22,13 @@ namespace OpenWire.Client.Commands { public override bool IsMarshallAware() { - return true; - } + return true; + } // Properties public IDestination FromDestination { - get { return Destination; } - set { this.Destination = ActiveMQDestination.Transform(value); } + get { return Destination; } + set { this.Destination = ActiveMQDestination.Transform(value); } } public void BeforeMarshall(OpenWireFormat wireFormat) { diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs index c367491c46..b1e2e06b85 100755 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs @@ -3,28 +3,40 @@ using OpenWire.Client; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client.Commands { - /// - /// Summary description for ActiveMQQueue. - /// - public class ActiveMQQueue : ActiveMQDestination, IQueue { - public const byte ID_ActiveMQQueue = 100; - - public ActiveMQQueue() : base() { - } - public ActiveMQQueue(String name) : base(name) { - } - - public String QueueName { - get { return PhysicalName; } - } - - public override int GetDestinationType() { - return ACTIVEMQ_QUEUE; - } - - public override ActiveMQDestination CreateDestination(String name) { - return new ActiveMQQueue(name); - } - } +namespace OpenWire.Client.Commands +{ + /// + /// Summary description for ActiveMQQueue. + /// + public class ActiveMQQueue : ActiveMQDestination, IQueue + { + public const byte ID_ActiveMQQueue = 100; + + public ActiveMQQueue() : base() + { + } + public ActiveMQQueue(String name) : base(name) + { + } + + public String QueueName + { + get { return PhysicalName; } + } + + public override byte GetDataStructureType() + { + return ID_ActiveMQQueue; + } + + public override int GetDestinationType() + { + return ACTIVEMQ_QUEUE; + } + + public override ActiveMQDestination CreateDestination(String name) + { + return new ActiveMQQueue(name); + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs index da25a2c895..c4baa8f2cd 100755 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs @@ -3,28 +3,40 @@ using OpenWire.Client; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client.Commands { - /// - /// Summary description for ActiveMQTempQueue. - /// - public class ActiveMQTempQueue : ActiveMQDestination, ITemporaryQueue { - public const byte ID_ActiveMQTempQueue = 102; - - public ActiveMQTempQueue() : base() { - } - public ActiveMQTempQueue(String name) : base(name) { - } - - public String GetQueueName() { - return PhysicalName; - } - - public override int GetDestinationType() { - return ACTIVEMQ_QUEUE; - } - - public override ActiveMQDestination CreateDestination(String name) { - return new ActiveMQTempQueue(name); - } - } +namespace OpenWire.Client.Commands +{ + /// + /// Summary description for ActiveMQTempQueue. + /// + public class ActiveMQTempQueue : ActiveMQDestination, ITemporaryQueue + { + public const byte ID_ActiveMQTempQueue = 102; + + public ActiveMQTempQueue() : base() + { + } + public ActiveMQTempQueue(String name) : base(name) + { + } + + public String GetQueueName() + { + return PhysicalName; + } + + public override byte GetDataStructureType() + { + return ID_ActiveMQTempQueue; + } + + public override int GetDestinationType() + { + return ACTIVEMQ_QUEUE; + } + + public override ActiveMQDestination CreateDestination(String name) + { + return new ActiveMQTempQueue(name); + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs index d330cd30da..89a7831431 100755 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs @@ -3,27 +3,40 @@ using OpenWire.Client; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client.Commands { - /// - /// Summary description for ActiveMQTempTopic. - /// - public class ActiveMQTempTopic : ActiveMQDestination, ITemporaryTopic { - public const byte ID_ActiveMQTempTopic = 103; - - public ActiveMQTempTopic() : base() { - } - public ActiveMQTempTopic(String name) : base(name) { - } - - public String GetTopicName() { - return PhysicalName; - } - public override int GetDestinationType() { - return ACTIVEMQ_TOPIC; - } - - public override ActiveMQDestination CreateDestination(String name) { - return new ActiveMQTempTopic(name); - } - } +namespace OpenWire.Client.Commands +{ + /// + /// Summary description for ActiveMQTempTopic. + /// + public class ActiveMQTempTopic : ActiveMQDestination, ITemporaryTopic + { + public const byte ID_ActiveMQTempTopic = 103; + + public ActiveMQTempTopic() : base() + { + } + public ActiveMQTempTopic(String name) : base(name) + { + } + + public String GetTopicName() + { + return PhysicalName; + } + + public override byte GetDataStructureType() + { + return ID_ActiveMQTempTopic; + } + + public override int GetDestinationType() + { + return ACTIVEMQ_TOPIC; + } + + public override ActiveMQDestination CreateDestination(String name) + { + return new ActiveMQTempTopic(name); + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs index 46dcddd55e..6330cd5f18 100755 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs @@ -3,28 +3,40 @@ using OpenWire.Client; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client.Commands { - /// - /// Summary description for ActiveMQTopic. - /// - public class ActiveMQTopic : ActiveMQDestination, ITopic { - public const byte ID_ActiveMQTopic = 101; - - public ActiveMQTopic() : base() { - } - public ActiveMQTopic(String name) : base(name) { - } - - public String TopicName { - get { return PhysicalName; } - } - - public override int GetDestinationType() { - return ACTIVEMQ_TOPIC; - } - - public override ActiveMQDestination CreateDestination(String name) { - return new ActiveMQTopic(name); - } - } +namespace OpenWire.Client.Commands +{ + /// + /// Summary description for ActiveMQTopic. + /// + public class ActiveMQTopic : ActiveMQDestination, ITopic + { + public const byte ID_ActiveMQTopic = 101; + + public ActiveMQTopic() : base() + { + } + public ActiveMQTopic(String name) : base(name) + { + } + + public String TopicName + { + get { return PhysicalName; } + } + + public override byte GetDataStructureType() + { + return ID_ActiveMQTopic; + } + + public override int GetDestinationType() + { + return ACTIVEMQ_TOPIC; + } + + public override ActiveMQDestination CreateDestination(String name) + { + return new ActiveMQTopic(name); + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/BaseCommand.cs b/openwire-dotnet/src/OpenWire.Client/Commands/BaseCommand.cs index 8a1a19d489..9328c147a4 100644 --- a/openwire-dotnet/src/OpenWire.Client/Commands/BaseCommand.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/BaseCommand.cs @@ -13,51 +13,38 @@ using System.Collections; using OpenWire.Client; using OpenWire.Client.Core; -namespace OpenWire.Client.Commands { - - public class BaseCommand : AbstractCommand { - public const byte ID_BaseCommand = 0; - - short commandId; - bool responseRequired; - - - public override int GetHashCode() { - return commandId; - } - - public override bool Equals(Object that) { - if (that is BaseCommand) { - BaseCommand thatCommand = (BaseCommand) that; - return this.GetDataStructureType() == thatCommand.GetDataStructureType() - && this.CommandId == thatCommand.CommandId; - } - return false; - } - - public override String ToString() { - string answer = GetDataStructureTypeAsString(GetDataStructureType()); - if (answer.Length == 0) { - answer = base.ToString(); - } - return answer + ": id = " + CommandId; - } - - public override byte GetDataStructureType() { - return ID_BaseCommand; - } - - - // Properties - - public short CommandId { - get { return commandId; } - set { this.commandId = value; } - } - - public bool ResponseRequired { - get { return responseRequired; } - set { this.responseRequired = value; } - } - } +namespace OpenWire.Client.Commands +{ + + public abstract class BaseCommand : AbstractCommand + { + + public override int GetHashCode() + { + return (CommandId * 37) + GetDataStructureType(); + } + + public override bool Equals(Object that) + { + if (that is BaseCommand) + { + BaseCommand thatCommand = (BaseCommand) that; + return this.GetDataStructureType() == thatCommand.GetDataStructureType() + && this.CommandId == thatCommand.CommandId; + } + return false; + } + + public override String ToString() + { + string answer = GetDataStructureTypeAsString(GetDataStructureType()); + if (answer.Length == 0) + { + answer = base.ToString(); + } + return answer + ": id = " + CommandId; + } + + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Connection.cs b/openwire-dotnet/src/OpenWire.Client/Connection.cs index f81ff481b3..58f731b6ac 100755 --- a/openwire-dotnet/src/OpenWire.Client/Connection.cs +++ b/openwire-dotnet/src/OpenWire.Client/Connection.cs @@ -3,117 +3,214 @@ using System.Collections; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client { +namespace OpenWire.Client +{ + /// + /// Represents a connection with a message broker + /// + public class Connection : IConnection + { + static private char[] MAGIC = new char[] { 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' }; + + private ITransport transport; + private ConnectionInfo info; + private WireFormatInfo wireFormatInfo = new WireFormatInfo(); + IList sessions = new ArrayList(); + private bool transacted; + private bool connected; + private bool closed; + private AcknowledgementMode acknowledgementMode; + private long sessionCounter; + private IDictionary consumers = new Hashtable(); // TODO threadsafe + + + public Connection(ITransport transport, ConnectionInfo info) + { + this.transport = transport; + this.info = info; + this.transport.Command += new CommandHandler(OnCommand); + } + + /// - /// Represents a connection with a message broker + /// Creates a new session to work on this connection /// - public class Connection : IConnection { - - private ConnectionInfo info; - private ITransport transport; - IList sessions = new ArrayList(); - private bool transacted; - private bool connected; - private bool closed; - private AcknowledgementMode acknowledgementMode; - private long sessionCounter; - - public Connection(ITransport transport, ConnectionInfo info) { - this.transport = transport; - this.info = info; + public ISession CreateSession() + { + return CreateSession(transacted, acknowledgementMode); + } + + /// + /// Creates a new session to work on this connection + /// + public ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode) + { + CheckConnected(); + SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode); + SyncRequest(info); + Session session = new Session(this, info); + sessions.Add(session); + return session; + } + + public void Dispose() + { + foreach (Session session in sessions) + { + session.Dispose(); + } + sessions.Clear(); + transport.Dispose(); + closed = true; + } + + // Properties + + public ITransport ITransport + { + get { return transport; } + set { this.transport = value; } + } + + public bool Transacted + { + get { return transacted; } + set { this.transacted = value; } + } + + public AcknowledgementMode AcknowledgementMode + { + get { return acknowledgementMode; } + set { this.acknowledgementMode = value; } + } + + public string ClientId + { + get { return info.ClientId; } + set { + if (connected) + { + throw new OpenWireException("You cannot change the ClientId once the Connection is connected"); } - - /// - /// Creates a new session to work on this connection - /// - public ISession CreateSession() { - return CreateSession(transacted, acknowledgementMode); + info.ClientId = value; + } + } + + // Implementation methods + + /// + /// Performs a synchronous request-response with the broker + /// + public Response SyncRequest(Command command) + { + Response response = transport.Request(command); + if (response is ExceptionResponse) + { + ExceptionResponse exceptionResponse = (ExceptionResponse) response; + // TODO include stack trace + throw new OpenWireException("Request failed: " + exceptionResponse); + } + return response; + } + + + protected SessionInfo CreateSessionInfo(bool transacted, AcknowledgementMode acknowledgementMode) + { + SessionInfo answer = new SessionInfo(); + SessionId sessionId = new SessionId(); + sessionId.ConnectionId = info.ConnectionId.Value; + lock (this) + { + sessionId.Value = ++sessionCounter; + } + answer.SessionId = sessionId; + return answer; + } + + protected void CheckConnected() + { + if (closed) + { + throw new ConnectionClosedException(); + } + if (!connected) + { + Console.WriteLine("ConnectionId: " + info.ConnectionId.Value); + Console.WriteLine("ClientID: " + info.ClientId); + + Console.WriteLine("About to send WireFormatInfo: " + wireFormatInfo); + // lets configure the wire format + wireFormatInfo.Magic = CreateMagicBytes(); + wireFormatInfo.Version = 1; + transport.Oneway(wireFormatInfo); + + Console.WriteLine("About to send ConnectionInfo: " + info); + SyncRequest(info); + Console.WriteLine("Received connection info response"); + connected = true; + } + } + + /// + /// Register a new consumer + /// + /// A ConsumerId + /// A MessageConsumer + public void AddConsumer(ConsumerId consumerId, MessageConsumer consumer) + { + Console.WriteLine("#### Adding consumerId: " + consumerId.Value + " session: " + consumerId.SessionId + " with consumer: " + consumer); + consumers[consumerId] = consumer; + } + + + /// + /// Remove a consumer + /// + /// A ConsumerId + public void RemoveConsumer(ConsumerId consumerId) + { + consumers[consumerId] = null; + } + + + /// + /// Handle incoming commands + /// + /// An ITransport + /// A Command + protected void OnCommand(ITransport transport, Command command) + { + if (command is MessageDispatch) { + MessageDispatch dispatch = (MessageDispatch) command; + ConsumerId consumerId = dispatch.ConsumerId; + MessageConsumer consumer = (MessageConsumer) consumers[consumerId]; + if (consumer == null) { + Console.WriteLine("No such consumer active: " + consumerId); + Console.WriteLine("No such consumer active: " + consumerId.Value); + Console.WriteLine("No such consumer active: " + consumerId.SessionId); } - - /// - /// Creates a new session to work on this connection - /// - public ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode) { - CheckConnected(); - SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode); - SyncRequest(info); - Session session = new Session(this, info); - sessions.Add(session); - return session; + else { + ActiveMQMessage message = (ActiveMQMessage) dispatch.Message; + consumer.Dispatch(message); } - - public void Dispose() { - foreach (Session session in sessions) { - session.Dispose(); - } - sessions.Clear(); - transport.Dispose(); - closed = true; - } - - // Properties - - public ITransport ITransport { - get { return transport; } - set { this.transport = value; } - } - - public bool Transacted { - get { return transacted; } - set { this.transacted = value; } - } - - public AcknowledgementMode AcknowledgementMode { - get { return acknowledgementMode; } - set { this.acknowledgementMode = value; } - } - - public string ClientId { - get { return info.ClientId; } - set { - if (connected) { - throw new OpenWireException("You cannot change the ClientId once the Connection is connected"); - } - info.ClientId = value; - } - } - - // Implementation methods - - /// - /// Performs a synchronous request-response with the broker - /// - public Response SyncRequest(Command command) { - CheckConnected(); - Response response = ITransport.Request(command); - if (response is ExceptionResponse) { - ExceptionResponse exceptionResponse = (ExceptionResponse) response; - // TODO include stack trace - throw new OpenWireException("Request failed: " + exceptionResponse); - } - return response; - } - - - protected SessionInfo CreateSessionInfo(bool transacted, AcknowledgementMode acknowledgementMode) { - SessionInfo answer = new SessionInfo(); - SessionId sessionId = new SessionId(); - sessionId.ConnectionId = info.ConnectionId.Value; - lock (this) { - sessionId.Value = ++sessionCounter; - } - answer.SessionId = sessionId; - return answer; - } - - protected void CheckConnected() { - if (closed) { - throw new ConnectionClosedException(); - } - if (!connected) { - SyncRequest(info); - connected = true; - } - } - } + } + else { + Console.WriteLine("Unknown command: " + command); + } + } + + /// + /// Method CreateMagicBytes + /// + /// A byte[] + private byte[] CreateMagicBytes() + { + byte[] answer = new byte[MAGIC.Length]; + for (int i = 0; i < answer.Length; i++) + { + answer[i] = (byte) MAGIC[i]; + } + return answer; + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs b/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs index 9195b9eee8..7fe5503012 100755 --- a/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs +++ b/openwire-dotnet/src/OpenWire.Client/ConnectionFactory.cs @@ -3,82 +3,102 @@ using System.Collections; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client { - /// - /// Represents a connection with a message broker - /// - public class ConnectionFactory : IConnectionFactory { - private string host = "localhost"; - private int port = 61616; - private string userName; - private string password; - private string clientId; - - public ConnectionFactory() { - } - - public ConnectionFactory(string host, int port) { - this.host = host; - this.port = port; - } - - public IConnection CreateConnection() { - return CreateConnection(userName, password); - } - - public IConnection CreateConnection(string userName, string password) { - ConnectionInfo info = CreateConnectionInfo(userName, password); - ITransport transport = CreateITransport(); - Connection connection = new Connection(transport, info); - connection.ClientId = clientId; - return connection; - } - - // Properties - - public string Host { - get { return host; } - set { host = value; } - } - - public int Port { - get { return port; } - set { port = value; } - } - - public string UserName { - get { return userName; } - set { userName = value; } - } - - public string Password { - get { return password; } - set { password = value; } - } - - public string ClientId { - get { return clientId; } - set { clientId = value; } - } - - // Implementation methods - - protected ConnectionInfo CreateConnectionInfo(string userName, string password) { - ConnectionInfo answer = new ConnectionInfo(); - ConnectionId connectionId = new ConnectionId(); - connectionId.Value = CreateNewConnectionID(); - answer.ConnectionId = connectionId; - answer.UserName = userName; - answer.Password = password; - return answer; - } - - protected string CreateNewConnectionID() { - return Guid.NewGuid().ToString(); - } - - protected ITransport CreateITransport() { - return new SocketTransport(host, port); - } - } +namespace OpenWire.Client +{ + /// + /// Represents a connection with a message broker + /// + public class ConnectionFactory : IConnectionFactory + { + private string host = "localhost"; + private int port = 61616; + private string userName; + private string password; + private string clientId; + + public ConnectionFactory() + { + } + + public ConnectionFactory(string host, int port) + { + this.host = host; + this.port = port; + } + + public IConnection CreateConnection() + { + return CreateConnection(userName, password); + } + + public IConnection CreateConnection(string userName, string password) + { + ConnectionInfo info = CreateConnectionInfo(userName, password); + ITransport transport = CreateTransport(); + Connection connection = new Connection(transport, info); + connection.ClientId = info.ClientId; + return connection; + } + + // Properties + + public string Host + { + get { return host; } + set { host = value; } + } + + public int Port + { + get { return port; } + set { port = value; } + } + + public string UserName + { + get { return userName; } + set { userName = value; } + } + + public string Password + { + get { return password; } + set { password = value; } + } + + public string ClientId + { + get { return clientId; } + set { clientId = value; } + } + + // Implementation methods + + protected ConnectionInfo CreateConnectionInfo(string userName, string password) + { + ConnectionInfo answer = new ConnectionInfo(); + ConnectionId connectionId = new ConnectionId(); + connectionId.Value = CreateNewGuid(); + + answer.ConnectionId = connectionId; + answer.UserName = userName; + answer.Password = password; + answer.ClientId = clientId; + if (clientId == null) + { + answer.ClientId = CreateNewGuid(); + } + return answer; + } + + protected string CreateNewGuid() + { + return Guid.NewGuid().ToString(); + } + + protected ITransport CreateTransport() + { + return new SocketTransport(host, port); + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/AbstractCommand.cs b/openwire-dotnet/src/OpenWire.Client/Core/AbstractCommand.cs index 578c307005..5f45447c18 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/AbstractCommand.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/AbstractCommand.cs @@ -3,85 +3,111 @@ using OpenWire.Client; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client.Core { - /// - /// Summary description for AbstractCommand. - /// - public abstract class AbstractCommand : Command { - - protected AbstractCommand() { - } - - public virtual byte GetDataStructureType() { - return 0; - } - - public virtual bool IsMarshallAware() { - return false; - } - - public static String GetDataStructureTypeAsString(int type) { - String packetTypeStr = ""; - switch (type) { - case ActiveMQMessage.ID_ActiveMQMessage : - packetTypeStr = "ACTIVEMQ_MESSAGE"; - break; - case ActiveMQTextMessage.ID_ActiveMQTextMessage : - packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE"; - break; - case ActiveMQObjectMessage.ID_ActiveMQObjectMessage: - packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE"; - break; - case ActiveMQBytesMessage.ID_ActiveMQBytesMessage : - packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE"; - break; - case ActiveMQStreamMessage.ID_ActiveMQStreamMessage : - packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE"; - break; - case ActiveMQMapMessage.ID_ActiveMQMapMessage : - packetTypeStr = "ACTIVEMQ_MAP_MESSAGE"; - break; - case MessageAck.ID_MessageAck : - packetTypeStr = "ACTIVEMQ_MSG_ACK"; - break; - case Response.ID_Response : - packetTypeStr = "RESPONSE"; - break; - case ConsumerInfo.ID_ConsumerInfo : - packetTypeStr = "CONSUMER_INFO"; - break; - case ProducerInfo.ID_ProducerInfo : - packetTypeStr = "PRODUCER_INFO"; - break; - case TransactionInfo.ID_TransactionInfo : - packetTypeStr = "TRANSACTION_INFO"; - break; - case BrokerInfo.ID_BrokerInfo : - packetTypeStr = "BROKER_INFO"; - break; - case ConnectionInfo.ID_ConnectionInfo : - packetTypeStr = "CONNECTION_INFO"; - break; - case SessionInfo.ID_SessionInfo : - packetTypeStr = "SESSION_INFO"; - break; - case RemoveSubscriptionInfo.ID_RemoveSubscriptionInfo : - packetTypeStr = "DURABLE_UNSUBSCRIBE"; - break; - case IntegerResponse.ID_IntegerResponse : - packetTypeStr = "INT_RESPONSE_RECEIPT_INFO"; - break; - case WireFormatInfo.ID_WireFormatInfo : - packetTypeStr = "WIRE_FORMAT_INFO"; - break; - case RemoveInfo.ID_RemoveInfo : - packetTypeStr = "REMOVE_INFO"; - break; - case KeepAliveInfo.ID_KeepAliveInfo : - packetTypeStr = "KEEP_ALIVE"; - break; - } - return packetTypeStr; - } - } +namespace OpenWire.Client.Core +{ + /// + /// Summary description for AbstractCommand. + /// + public abstract class AbstractCommand : Command + { + private short commandId; + private bool responseRequired; + + + protected AbstractCommand() + { + } + + public virtual byte GetDataStructureType() + { + return 0; + } + + public virtual bool IsMarshallAware() + { + return false; + } + + + + // Properties + + public short CommandId + { + get { return commandId; } + set { this.commandId = value; } + } + + public bool ResponseRequired + { + get { return responseRequired; } + set { this.responseRequired = value; } + } + + public static String GetDataStructureTypeAsString(int type) + { + String packetTypeStr = ""; + switch (type) + { + case ActiveMQMessage.ID_ActiveMQMessage : + packetTypeStr = "ACTIVEMQ_MESSAGE"; + break; + case ActiveMQTextMessage.ID_ActiveMQTextMessage : + packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE"; + break; + case ActiveMQObjectMessage.ID_ActiveMQObjectMessage: + packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE"; + break; + case ActiveMQBytesMessage.ID_ActiveMQBytesMessage : + packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE"; + break; + case ActiveMQStreamMessage.ID_ActiveMQStreamMessage : + packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE"; + break; + case ActiveMQMapMessage.ID_ActiveMQMapMessage : + packetTypeStr = "ACTIVEMQ_MAP_MESSAGE"; + break; + case MessageAck.ID_MessageAck : + packetTypeStr = "ACTIVEMQ_MSG_ACK"; + break; + case Response.ID_Response : + packetTypeStr = "RESPONSE"; + break; + case ConsumerInfo.ID_ConsumerInfo : + packetTypeStr = "CONSUMER_INFO"; + break; + case ProducerInfo.ID_ProducerInfo : + packetTypeStr = "PRODUCER_INFO"; + break; + case TransactionInfo.ID_TransactionInfo : + packetTypeStr = "TRANSACTION_INFO"; + break; + case BrokerInfo.ID_BrokerInfo : + packetTypeStr = "BROKER_INFO"; + break; + case ConnectionInfo.ID_ConnectionInfo : + packetTypeStr = "CONNECTION_INFO"; + break; + case SessionInfo.ID_SessionInfo : + packetTypeStr = "SESSION_INFO"; + break; + case RemoveSubscriptionInfo.ID_RemoveSubscriptionInfo : + packetTypeStr = "DURABLE_UNSUBSCRIBE"; + break; + case IntegerResponse.ID_IntegerResponse : + packetTypeStr = "INT_RESPONSE_RECEIPT_INFO"; + break; + case WireFormatInfo.ID_WireFormatInfo : + packetTypeStr = "WIRE_FORMAT_INFO"; + break; + case RemoveInfo.ID_RemoveInfo : + packetTypeStr = "REMOVE_INFO"; + break; + case KeepAliveInfo.ID_KeepAliveInfo : + packetTypeStr = "KEEP_ALIVE"; + break; + } + return packetTypeStr; + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/BooleanStream.cs b/openwire-dotnet/src/OpenWire.Client/Core/BooleanStream.cs index e5c13a0c42..8546d3855b 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/BooleanStream.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/BooleanStream.cs @@ -5,89 +5,115 @@ using OpenWire.Client.Commands; using OpenWire.Client.Core; using OpenWire.Client.IO; -namespace OpenWire.Client.Core { - /// - /// Represents a stream of boolean flags - /// - public class BooleanStream { - byte[] data = new byte[48]; - short arrayLimit; - short arrayPos; - byte bytePos; - - public bool ReadBoolean() { - byte b = data[arrayPos]; - bool rc = ((b >> bytePos) & 0x01) != 0; - bytePos++; - if (bytePos >= 8) { - bytePos = 0; - arrayPos++; - } - return rc; - } - - public void WriteBoolean(bool value) { - if (bytePos == 0) { - arrayLimit++; - if (arrayLimit >= data.Length) { - // re-grow the array. - byte[] d = new byte[data.Length * 2]; - for (int i = 0; i < data.Length; i++) { - d[i] = data[i]; - } - data = d; - } - } - if (value) { - data[arrayPos] |= (byte) (0x01 << bytePos); - } - bytePos++; - if (bytePos >= 8) { - bytePos = 0; - arrayPos++; - } - } - - public void Marshal(BinaryWriter dataOut) { - if (arrayLimit < 64) { - dataOut.Write((byte) arrayLimit); - } else if (arrayLimit < 256) { // max value of unsigned byte - dataOut.Write((byte) 0xC0); - dataOut.Write((byte) arrayLimit); - } else { - dataOut.Write((byte) 0xE0); - dataOut.Write((short) arrayLimit); - } - - dataOut.Write(data, 0, arrayLimit); - Clear(); - } - - public void Unmarshal(BinaryReader dataIn) { - arrayLimit = dataIn.ReadByte(); - if ((arrayLimit & 0xE0) != 0) { - arrayLimit = dataIn.ReadInt16(); - } else if ((arrayLimit & 0xC0) != 0) { - arrayLimit = (short) (dataIn.ReadByte() & 0xFF); - } - if (data.Length < arrayLimit) { - data = new byte[arrayLimit]; - } - dataIn.Read(data, 0, arrayLimit); - Clear(); - } - - public void Clear() { - arrayPos = 0; - bytePos = 0; - } - - public int MarshalledSize() { - if (arrayLimit < 64) { - return 1 + arrayLimit; - } else { - return 2 + arrayLimit; - } - } +namespace OpenWire.Client.Core +{ + /// + /// Represents a stream of boolean flags + /// + public class BooleanStream + { + byte[] data = new byte[48]; + short arrayLimit; + short arrayPos; + byte bytePos; + + public bool ReadBoolean() + { + byte b = data[arrayPos]; + bool rc = ((b >> bytePos) & 0x01) != 0; + bytePos++; + if (bytePos >= 8) + { + bytePos = 0; + arrayPos++; + } + return rc; } + + public void WriteBoolean(bool value) + { + if (bytePos == 0) + { + arrayLimit++; + if (arrayLimit >= data.Length) + { + // re-grow the array. + byte[] d = new byte[data.Length * 2]; + for (int i = 0; i < data.Length; i++) + { + d[i] = data[i]; + } + data = d; + } + } + if (value) + { + data[arrayPos] |= (byte) (0x01 << bytePos); + } + bytePos++; + if (bytePos >= 8) + { + bytePos = 0; + arrayPos++; + } + } + + public void Marshal(BinaryWriter dataOut) + { + if (arrayLimit < 64) + { + dataOut.Write((byte) arrayLimit); + } + else if (arrayLimit < 256) + { // max value of unsigned byte + dataOut.Write((byte) 0xC0); + dataOut.Write((byte) arrayLimit); + } + else + { + dataOut.Write((byte) 0xE0); + DataStreamMarshaller.WriteShort(arrayLimit, dataOut); + } + + dataOut.Write(data, 0, arrayLimit); + Clear(); + } + + public void Unmarshal(BinaryReader dataIn) + { + arrayLimit = DataStreamMarshaller.ReadByte(dataIn); + if ((arrayLimit & 0xE0) != 0) + { + arrayLimit = DataStreamMarshaller.ReadShort(dataIn); + } + else if ((arrayLimit & 0xC0) != 0) + { + arrayLimit = (short) (DataStreamMarshaller.ReadByte(dataIn) & 0xFF); + } + if (data.Length < arrayLimit) + { + data = new byte[arrayLimit]; + } + dataIn.Read(data, 0, arrayLimit); + Clear(); + } + + public void Clear() + { + arrayPos = 0; + bytePos = 0; + } + + public int MarshalledSize() + { + if (arrayLimit < 64) + { + return 1 + arrayLimit; + } + else + { + return 2 + arrayLimit; + } + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/Command.cs b/openwire-dotnet/src/OpenWire.Client/Core/Command.cs index 83dfa4a90d..efc43c6bf5 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/Command.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/Command.cs @@ -1,22 +1,23 @@ using System; using OpenWire.Client.Core; -namespace OpenWire.Client.Core { - /// - /// An OpenWire command - /// - public interface Command : DataStructure { - - /* TODO - short CommandId { - get; - set; - } - - bool ResponseRequired { - get; - set; - } - */ - } +namespace OpenWire.Client.Core +{ + /// + /// An OpenWire command + /// + public interface Command : DataStructure + { + short CommandId + { + get; + set; + } + + bool ResponseRequired + { + get; + set; + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs index a3fca25083..c29572566e 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs @@ -1,401 +1,516 @@ using System; using System.IO; +using System.Net; using OpenWire.Client.Commands; using OpenWire.Client.Core; using OpenWire.Client.IO; -namespace OpenWire.Client.Core { - /// - /// A base class with useful implementation inheritence methods - /// for creating marshallers of the OpenWire protocol - /// - public abstract class DataStreamMarshaller { - - public abstract DataStructure CreateObject(); - public abstract byte GetDataStructureType(); - - public virtual int Marshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) { - return 0; - } - public virtual void Marshal2( - OpenWireFormat wireFormat, - Object o, - BinaryWriter dataOut, - BooleanStream bs) { - } - - public virtual void Unmarshal( - OpenWireFormat wireFormat, - Object o, - BinaryReader dataIn, - BooleanStream bs) { - } - - public virtual int Marshal1Long(OpenWireFormat wireFormat, long o, BooleanStream bs) { - if (o == 0L) { - bs.WriteBoolean(false); - bs.WriteBoolean(false); - return 0; - } else { - ulong ul = (ulong) o; - if ((ul & 0xFFFFFFFFFFFF0000ul) == 0L) { - bs.WriteBoolean(false); - bs.WriteBoolean(true); - return 2; - } else if ((ul & 0xFFFFFFFF00000000ul) == 0L) { - bs.WriteBoolean(true); - bs.WriteBoolean(false); - return 4; - } else { - bs.WriteBoolean(true); - bs.WriteBoolean(true); - return 8; - } - } - } - - public virtual void Marshal2Long( - OpenWireFormat wireFormat, - long o, - BinaryWriter dataOut, - BooleanStream bs) { - if (bs.ReadBoolean()) { - if (bs.ReadBoolean()) { - dataOut.Write(o); - } else { - dataOut.Write((int) o); - } - } else { - if (bs.ReadBoolean()) { - dataOut.Write((short) o); - } - } - } - public virtual long UnmarshalLong(OpenWireFormat wireFormat, BinaryReader dataIn, BooleanStream bs) { - if (bs.ReadBoolean()) { - if (bs.ReadBoolean()) { - return dataIn.ReadInt64(); - } else { - return dataIn.ReadInt32(); - } - } else { - if (bs.ReadBoolean()) { - return dataIn.ReadInt16(); - } else { - return 0; - } - } - } - - protected virtual DataStructure UnmarshalNestedObject( - OpenWireFormat wireFormat, - BinaryReader dataIn, - BooleanStream bs) { - return wireFormat.UnmarshalNestedObject(dataIn, bs); - } - - protected virtual int Marshal1NestedObject( - OpenWireFormat wireFormat, - DataStructure o, - BooleanStream bs) { - return wireFormat.Marshal1NestedObject(o, bs); - } - - protected virtual void Marshal2NestedObject( - OpenWireFormat wireFormat, - DataStructure o, - BinaryWriter dataOut, - BooleanStream bs) { - wireFormat.Marshal2NestedObject(o, dataOut, bs); - } - - protected virtual DataStructure UnmarshalCachedObject( - OpenWireFormat wireFormat, - BinaryReader dataIn, - BooleanStream bs) { - /* - if (wireFormat.isCacheEnabled()) { - if (bs.ReadBoolean()) { - short index = dataIn.ReadInt16(); - DataStructure value = wireFormat.UnmarshalNestedObject(dataIn, bs); - wireFormat.setInUnmarshallCache(index, value); - return value; - } else { - short index = dataIn.ReadInt16(); - return wireFormat.getFromUnmarshallCache(index); - } - } else { - return wireFormat.UnmarshalNestedObject(dataIn, bs); - } - */ - return wireFormat.UnmarshalNestedObject(dataIn, bs); - } - - protected virtual int Marshal1CachedObject( - OpenWireFormat wireFormat, - DataStructure o, - BooleanStream bs) { - /* - if (wireFormat.isCacheEnabled()) { - Short index = wireFormat.getMarshallCacheIndex(o); - bs.WriteBoolean(index == null); - if (index == null) { - int rc = wireFormat.Marshal1NestedObject(o, bs); - wireFormat.addToMarshallCache(o); - return 2 + rc; - } else { - return 2; - } - } else { - return wireFormat.Marshal1NestedObject(o, bs); - } - */ - return wireFormat.Marshal1NestedObject(o, bs); - } - - protected virtual void Marshal2CachedObject( - OpenWireFormat wireFormat, - DataStructure o, - BinaryWriter dataOut, - BooleanStream bs) { - /* - if (wireFormat.isCacheEnabled()) { - Short index = wireFormat.getMarshallCacheIndex(o); - if (bs.ReadBoolean()) { - dataOut.Write((short)index.shortValue()); - wireFormat.Marshal2NestedObject(o, dataOut, bs); - } else { - dataOut.Write((short)index.shortValue()); - } - } else { - wireFormat.Marshal2NestedObject(o, dataOut, bs); - } - */ - wireFormat.Marshal2NestedObject(o, dataOut, bs); - } - - - - protected virtual String ReadString(BinaryReader dataIn, BooleanStream bs) { - if (bs.ReadBoolean()) { - if (bs.ReadBoolean()) { - int size = dataIn.ReadInt16(); - byte[] data = new byte[size]; - dataIn.Read(data, 0, size); - char[] text = new char[size]; - for (int i = 0; i < size; i++) { - text[i] = (char) data[i]; - } - return new String(text); - } else { - return dataIn.ReadString(); - } - } else { - return null; - } - } - - protected virtual int WriteString(String value, BooleanStream bs) { - bs.WriteBoolean(value != null); - if (value != null) { - int strlen = value.Length; - int utflen = 0; - int c = 0; - bool isOnlyAscii = true; - char[] charr = value.ToCharArray(); - for (int i = 0; i < strlen; i++) { - c = charr[i]; - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - isOnlyAscii = false; - } else { - isOnlyAscii = false; - utflen += 2; - } - } - - if (utflen >= Int16.MaxValue) - throw new IOException("Encountered a String value that is too long to encode."); - - bs.WriteBoolean(isOnlyAscii); - return utflen + 2; - } else { - return 0; - } - } - - protected virtual void WriteString(String value, BinaryWriter dataOut, BooleanStream bs) { - if (bs.ReadBoolean()) { - // If we verified it only holds ascii values - if (bs.ReadBoolean()) { - dataOut.Write((short) value.Length); - dataOut.Write(value); - } else { - dataOut.Write(value); - } - } - } - - protected virtual int MarshalObjectArray( - OpenWireFormat wireFormat, - DataStructure[] objects, - BooleanStream bs) { - if (objects != null) { - int rc = 0; - bs.WriteBoolean(true); - rc += 2; - for (int i = 0; i < objects.Length; i++) { - rc += Marshal1NestedObject(wireFormat, objects[i], bs); - } - return rc; - } else { - bs.WriteBoolean(false); - return 0; - } - } - - protected virtual void MarshalObjectArray( - OpenWireFormat wireFormat, - DataStructure[] objects, - BinaryWriter dataOut, - BooleanStream bs) { - if (bs.ReadBoolean()) { - dataOut.Write((short) objects.Length); - for (int i = 0; i < objects.Length; i++) { - Marshal2NestedObject(wireFormat, objects[i], dataOut, bs); - } - } - } - - protected virtual byte[] ReadBytes(BinaryReader dataIn, bool flag) { - if (flag) { - int size = dataIn.ReadInt32(); - return dataIn.ReadBytes(size); - } else { - return null; - } - } - - protected virtual byte[] ReadBytes(BinaryReader dataIn) { - int size = dataIn.ReadInt32(); - return dataIn.ReadBytes(size); - } - - protected virtual byte[] ReadBytes(BinaryReader dataIn, int size) { - return dataIn.ReadBytes(size); - } - - protected virtual void WriteBytes(byte[] command, BinaryWriter dataOut) { - dataOut.Write(command.Length); - dataOut.Write(command); - } - - protected virtual BrokerError UnmarshalBrokerError( - OpenWireFormat wireFormat, - BinaryReader dataIn, - BooleanStream bs) { - if (bs.ReadBoolean()) { - String clazz = ReadString(dataIn, bs); - String message = ReadString(dataIn, bs); - - BrokerError answer = new BrokerError(); - answer.ExceptionClass = clazz; - answer.Message = message; - return answer; - } else { - return null; - } - } - - protected int MarshalBrokerError(OpenWireFormat wireFormat, BrokerError o, BooleanStream bs) { - if (o == null) { - bs.WriteBoolean(false); - return 0; - } else { - int rc = 0; - bs.WriteBoolean(true); - rc += WriteString(o.ExceptionClass, bs); - rc += WriteString(o.Message, bs); - return rc; - } - } - - protected void MarshalBrokerError( - OpenWireFormat wireFormat, - BrokerError o, - BinaryWriter dataOut, - BooleanStream bs) { - if (bs.ReadBoolean()) { - WriteString(o.ExceptionClass, dataOut, bs); - WriteString(o.Message, dataOut, bs); - } - } - /* - protected virtual ActiveMQDestination ReadDestination(BinaryReader dataIn) { - return (ActiveMQDestination) CommandMarshallerRegistry.ReadCommand(dataIn); - } - - protected virtual void WriteDestination(ActiveMQDestination command, BinaryWriter dataOut) { - CommandMarshallerRegistry.WriteCommand(command, dataOut); - } - - protected virtual BrokerId[] ReadBrokerIds(BinaryReader dataIn) { - int size = dataIn.ReadInt32(); - BrokerId[] answer = new BrokerId[size]; - for (int i = 0; i < size; i++) { - answer[i] = (BrokerId) CommandMarshallerRegistry.BrokerIdMarshaller.ReadCommand(dataIn); - } - return answer; - } - - protected virtual void WriteBrokerIds(BrokerId[] commands, BinaryWriter dataOut) { - int size = commands.Length; - dataOut.Write(size); - for (int i = 0; i < size; i++) { - CommandMarshallerRegistry.BrokerIdMarshaller.WriteCommand(commands[i], dataOut); - } - } - - - protected virtual BrokerInfo[] ReadBrokerInfos(BinaryReader dataIn) { - int size = dataIn.ReadInt32(); - BrokerInfo[] answer = new BrokerInfo[size]; - for (int i = 0; i < size; i++) { - answer[i] = (BrokerInfo) CommandMarshallerRegistry - .BrokerInfoMarshaller - .ReadCommand(dataIn); - } - return answer; - } - - protected virtual void WriteBrokerInfos(BrokerInfo[] commands, BinaryWriter dataOut) { - int size = commands.Length; - dataOut.Write(size); - for (int i = 0; i < size; i++) { - CommandMarshallerRegistry.BrokerInfoMarshaller.WriteCommand(commands[i], dataOut); - } - } - - - protected virtual DataStructure[] ReadDataStructures(BinaryReader dataIn) { - int size = dataIn.ReadInt32(); - DataStructure[] answer = new DataStructure[size]; - for (int i = 0; i < size; i++) { - answer[i] = (DataStructure) CommandMarshallerRegistry.ReadCommand(dataIn); - } - return answer; - } - - protected virtual void WriteDataStructures(DataStructure[] commands, BinaryWriter dataOut) { - int size = commands.Length; - dataOut.Write(size); - for (int i = 0; i < size; i++) { - CommandMarshallerRegistry.WriteCommand((Command) commands[i], dataOut); - } - } - */ +namespace OpenWire.Client.Core +{ + /// + /// A base class with useful implementation inheritence methods + /// for creating marshallers of the OpenWire protocol + /// + public abstract class DataStreamMarshaller + { + + public abstract DataStructure CreateObject(); + public abstract byte GetDataStructureType(); + + public virtual int Marshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) + { + return 0; } + public virtual void Marshal2( + OpenWireFormat wireFormat, + Object o, + BinaryWriter dataOut, + BooleanStream bs) + { + } + + public virtual void Unmarshal( + OpenWireFormat wireFormat, + Object o, + BinaryReader dataIn, + BooleanStream bs) + { + } + + + protected virtual DataStructure UnmarshalNestedObject( + OpenWireFormat wireFormat, + BinaryReader dataIn, + BooleanStream bs) + { + return wireFormat.UnmarshalNestedObject(dataIn, bs); + } + + protected virtual int Marshal1NestedObject( + OpenWireFormat wireFormat, + DataStructure o, + BooleanStream bs) + { + return wireFormat.Marshal1NestedObject(o, bs); + } + + protected virtual void Marshal2NestedObject( + OpenWireFormat wireFormat, + DataStructure o, + BinaryWriter dataOut, + BooleanStream bs) + { + wireFormat.Marshal2NestedObject(o, dataOut, bs); + } + + protected virtual DataStructure UnmarshalCachedObject( + OpenWireFormat wireFormat, + BinaryReader dataIn, + BooleanStream bs) + { + /* + if (wireFormat.isCacheEnabled()) { + if (bs.ReadBoolean()) { + short index = dataInReadShort(dataIn)Int16(); + DataStructure value = wireFormat.UnmarshalNestedObject(dataIn, bs); + wireFormat.setInUnmarshallCache(index, value); + return value; + } else { + short index = ReadShort(dataIn); + return wireFormat.getFromUnmarshallCache(index); + } + } else { + return wireFormat.UnmarshalNestedObject(dataIn, bs); + } + */ + return wireFormat.UnmarshalNestedObject(dataIn, bs); + } + + protected virtual int Marshal1CachedObject( + OpenWireFormat wireFormat, + DataStructure o, + BooleanStream bs) + { + /* + if (wireFormat.isCacheEnabled()) { + Short index = wireFormat.getMarshallCacheIndex(o); + bs.WriteBoolean(index == null); + if (index == null) { + int rc = wireFormat.Marshal1NestedObject(o, bs); + wireFormat.addToMarshallCache(o); + return 2 + rc; + } else { + return 2; + } + } else { + return wireFormat.Marshal1NestedObject(o, bs); + } + */ + return wireFormat.Marshal1NestedObject(o, bs); + } + + protected virtual void Marshal2CachedObject( + OpenWireFormat wireFormat, + DataStructure o, + BinaryWriter dataOut, + BooleanStream bs) + { + /* + if (wireFormat.isCacheEnabled()) { + Short index = wireFormat.getMarshallCacheIndex(o); + if (bs.ReadBoolean()) { + WriteShort(index.shortValue(), dataOut); + wireFormat.Marshal2NestedObject(o, dataOut, bs); + } else { + WriteShort(index.shortValue(), dataOut); + } + } else { + wireFormat.Marshal2NestedObject(o, dataOut, bs); + } + */ + wireFormat.Marshal2NestedObject(o, dataOut, bs); + } + + + + protected virtual String ReadString(BinaryReader dataIn, BooleanStream bs) + { + if (bs.ReadBoolean()) + { + if (bs.ReadBoolean()) + { + int size = ReadShort(dataIn); + byte[] data = new byte[size]; + dataIn.Read(data, 0, size); + char[] text = new char[size]; + for (int i = 0; i < size; i++) + { + text[i] = (char) data[i]; + } + return new String(text); + } + else + { + return dataIn.ReadString(); + } + } + else + { + return null; + } + } + + protected virtual int WriteString(String value, BooleanStream bs) + { + bs.WriteBoolean(value != null); + if (value != null) + { + int strlen = value.Length; + + // TODO until we get UTF8 working, lets just force ASCII + bs.WriteBoolean(true); + return strlen + 2; + + + /* + int utflen = 0; + int c = 0; + bool isOnlyAscii = true; + char[] charr = value.ToCharArray(); + for (int i = 0; i < strlen; i++) + { + c = charr[i]; + if ((c >= 0x0001) && (c <= 0x007F)) + { + utflen++; + } + else if (c > 0x07FF) + { + utflen += 3; + isOnlyAscii = false; + } + else + { + isOnlyAscii = false; + utflen += 2; + } + } + + if (utflen >= Int16.MaxValue) + throw new IOException("Encountered a String value that is too long to encode."); + + bs.WriteBoolean(isOnlyAscii); + return utflen + 2; + */ + } + else + { + return 0; + } + } + + public static void WriteString(String value, BinaryWriter dataOut, BooleanStream bs) + { + if (bs.ReadBoolean()) + { + // If we verified it only holds ascii values + if (bs.ReadBoolean()) + { + WriteShort((short) value.Length, dataOut); + // now lets write the bytes + char[] chars = value.ToCharArray(); + for (int i = 0; i < chars.Length; i++) { + WriteByte((byte) chars[i], dataOut); + } + } + else + { + // TODO how should we properly write a String so that Java will grok it??? + dataOut.Write(value); + } + } + } + + public static byte ReadByte(BinaryReader dataIn) + { + return dataIn.ReadByte(); + } + + public static char ReadChar(BinaryReader dataIn) + { + return (char) ReadShort(dataIn); + } + + public static short ReadShort(BinaryReader dataIn) + { + return SwitchEndian(dataIn.ReadInt16()); + } + + public static int ReadInt(BinaryReader dataIn) + { + return SwitchEndian(dataIn.ReadInt32()); + } + + public static long ReadLong(BinaryReader dataIn) + { + return SwitchEndian(dataIn.ReadInt64()); + } + + public static void WriteByte(byte value, BinaryWriter dataOut) + { + dataOut.Write(value); + } + + public static void WriteChar(char value, BinaryWriter dataOut) + { + dataOut.Write(SwitchEndian(value)); + } + + public static void WriteShort(short value, BinaryWriter dataOut) + { + dataOut.Write(SwitchEndian(value)); + } + + public static void WriteInt(int value, BinaryWriter dataOut) + { + dataOut.Write(SwitchEndian(value)); + } + + /// + /// Switches from one endian to the other + /// + /// An int + /// An int + public static int SwitchEndian(int x) + { + return ((x << 24) | ((x & 0xff00) << 8) | ((x & 0xff0000) >> 8) | (x >> 24)); + } + + public static short SwitchEndian(short x) + { + int low = x & 0xff; + int high = x & 0xff00; + return(short)(high >> 8 | low << 8); + } + + public static long SwitchEndian(long x) + { + long answer = 0; + for (int i = 0; i < 8; i++) { + long lowest = x & 0xff; + x >>= 8; + answer <<= 8; + answer += lowest; + } + return answer; + } + + public static void WriteLong(long value, BinaryWriter dataOut) + { + dataOut.Write(IPAddress.HostToNetworkOrder(value)); + } + + public virtual int Marshal1Long(OpenWireFormat wireFormat, long o, BooleanStream bs) + { + if (o == 0L) + { + bs.WriteBoolean(false); + bs.WriteBoolean(false); + return 0; + } + else + { + ulong ul = (ulong) o; + if ((ul & 0xFFFFFFFFFFFF0000ul) == 0L) + { + bs.WriteBoolean(false); + bs.WriteBoolean(true); + return 2; + } + else if ((ul & 0xFFFFFFFF00000000ul) == 0L) + { + bs.WriteBoolean(true); + bs.WriteBoolean(false); + return 4; + } + else + { + bs.WriteBoolean(true); + bs.WriteBoolean(true); + return 8; + } + } + } + + public virtual void Marshal2Long( + OpenWireFormat wireFormat, + long o, + BinaryWriter dataOut, + BooleanStream bs) + { + if (bs.ReadBoolean()) + { + if (bs.ReadBoolean()) + { + WriteLong(o, dataOut); + } + else + { + WriteInt((int) o, dataOut); + } + } + else + { + if (bs.ReadBoolean()) + { + WriteShort((short) o, dataOut); + } + } + } + public virtual long UnmarshalLong(OpenWireFormat wireFormat, BinaryReader dataIn, BooleanStream bs) + { + if (bs.ReadBoolean()) + { + if (bs.ReadBoolean()) + { + return ReadLong(dataIn); + } + else + { + return ReadInt(dataIn); + } + } + else + { + if (bs.ReadBoolean()) + { + return ReadShort(dataIn); + } + else + { + return 0; + } + } + } + protected virtual int MarshalObjectArray( + OpenWireFormat wireFormat, + DataStructure[] objects, + BooleanStream bs) + { + if (objects != null) + { + int rc = 0; + bs.WriteBoolean(true); + rc += 2; + for (int i = 0; i < objects.Length; i++) + { + rc += Marshal1NestedObject(wireFormat, objects[i], bs); + } + return rc; + } + else + { + bs.WriteBoolean(false); + return 0; + } + } + + protected virtual void MarshalObjectArray( + OpenWireFormat wireFormat, + DataStructure[] objects, + BinaryWriter dataOut, + BooleanStream bs) + { + if (bs.ReadBoolean()) + { + WriteShort((short) objects.Length, dataOut); + for (int i = 0; i < objects.Length; i++) + { + Marshal2NestedObject(wireFormat, objects[i], dataOut, bs); + } + } + } + + protected virtual byte[] ReadBytes(BinaryReader dataIn, bool flag) + { + if (flag) + { + int size = ReadInt(dataIn); + return dataIn.ReadBytes(size); + } + else + { + return null; + } + } + + protected virtual byte[] ReadBytes(BinaryReader dataIn) + { + int size = ReadInt(dataIn); + return dataIn.ReadBytes(size); + } + + protected virtual byte[] ReadBytes(BinaryReader dataIn, int size) + { + return dataIn.ReadBytes(size); + } + + protected virtual void WriteBytes(byte[] command, BinaryWriter dataOut) + { + WriteInt(command.Length, dataOut); + dataOut.Write(command); + } + + protected virtual BrokerError UnmarshalBrokerError( + OpenWireFormat wireFormat, + BinaryReader dataIn, + BooleanStream bs) + { + if (bs.ReadBoolean()) + { + String clazz = ReadString(dataIn, bs); + String message = ReadString(dataIn, bs); + + BrokerError answer = new BrokerError(); + answer.ExceptionClass = clazz; + answer.Message = message; + return answer; + } + else + { + return null; + } + } + + protected int MarshalBrokerError(OpenWireFormat wireFormat, BrokerError o, BooleanStream bs) + { + if (o == null) + { + bs.WriteBoolean(false); + return 0; + } + else + { + int rc = 0; + bs.WriteBoolean(true); + rc += WriteString(o.ExceptionClass, bs); + rc += WriteString(o.Message, bs); + return rc; + } + } + + protected void MarshalBrokerError( + OpenWireFormat wireFormat, + BrokerError o, + BinaryWriter dataOut, + BooleanStream bs) + { + if (bs.ReadBoolean()) + { + WriteString(o.ExceptionClass, dataOut, bs); + WriteString(o.Message, dataOut, bs); + } + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs b/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs index 6eb1f67440..195f92dc1b 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/FutureResponse.cs @@ -4,52 +4,67 @@ using System.Threading; using OpenWire.Client; using OpenWire.Client.Commands; -namespace OpenWire.Client.Core { - /// - /// Handles asynchronous responses - /// - public class FutureResponse : IAsyncResult { - - private Response response; - private Mutex asyncWaitHandle = new Mutex(); - private Object semaphore = new Object(); - private int maxWait = 3000; - private bool isCompleted; - - public WaitHandle AsyncWaitHandle { - get { return asyncWaitHandle; } +namespace OpenWire.Client.Core +{ + /// + /// Handles asynchronous responses + /// + public class FutureResponse : IAsyncResult + { + + private Response response; + private Mutex asyncWaitHandle = new Mutex(); + private Object semaphore = new Object(); + private int maxWait = 3000; + private bool isCompleted; + + public WaitHandle AsyncWaitHandle + { + get { return asyncWaitHandle; } + } + + public object AsyncState + { + get { return response; } + set { Response = (Response) value; } + } + + public bool IsCompleted + { + get { return isCompleted; } + } + + public bool CompletedSynchronously + { + get { return false; } + } + + public Response Response + { + // Blocks the caller until a value has been set + get { + while (response == null) + { + try { + lock (semaphore) + { + Monitor.Wait(semaphore, maxWait); + } + } + catch (Exception e) { + Console.WriteLine("Caught while waiting on monitor: " + e); + } } - - public object AsyncState { - get { return response; } - set { Response = (Response) value; } + return response; + } + set { + lock (semaphore) + { + response = value; + isCompleted = true; + Monitor.PulseAll(semaphore); } - - public bool IsCompleted { - get { return isCompleted; } - } - - public bool CompletedSynchronously { - get { return false; } - } - - public Response Response { - // Blocks the caller until a value has been set - get { - lock (semaphore) { - while (response == null) { - Monitor.Wait(semaphore, maxWait); - } - return response; - } - } - set { - lock (semaphore) { - response = value; - isCompleted = true; - Monitor.PulseAll(semaphore); - } - } - } - } + } + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs b/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs index 1f42d165f2..110046fa35 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs @@ -5,129 +5,166 @@ using OpenWire.Client.Commands; using OpenWire.Client.Core; using OpenWire.Client.IO; -namespace OpenWire.Client.Core { - /// - /// Represents the wire format - /// - public class OpenWireFormat { - private DataStreamMarshaller[] dataMarshallers; - private const byte NULL_TYPE = 0; - - - public void addMarshaller(DataStreamMarshaller marshaller) +namespace OpenWire.Client.Core +{ + /// + /// Represents the wire format + /// + public class OpenWireFormat + { + private DataStreamMarshaller[] dataMarshallers; + private const byte NULL_TYPE = 0; + + + public OpenWireFormat() + { + dataMarshallers = new DataStreamMarshaller[256]; + MarshallerFactory factory = new MarshallerFactory(); + factory.configure(this); + } + + public void addMarshaller(DataStreamMarshaller marshaller) + { + byte type = marshaller.GetDataStructureType(); + dataMarshallers[type & 0xFF] = marshaller; + } + + public void Marshal(Object o, BinaryWriter ds) + { + int size = 1; + if (o != null) + { + DataStructure c = (DataStructure) o; + byte type = c.GetDataStructureType(); + DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; + if (dsm == null) + throw new IOException("Unknown data type: " + type); + + BooleanStream bs = new BooleanStream(); + size += dsm.Marshal1(this, c, bs); + size += bs.MarshalledSize(); + + DataStreamMarshaller.WriteInt(size, ds); + DataStreamMarshaller.WriteByte(type, ds); + bs.Marshal(ds); + dsm.Marshal2(this, c, ds, bs); + } + else + { + DataStreamMarshaller.WriteInt(size, ds); + DataStreamMarshaller.WriteByte(NULL_TYPE, ds); + } + } + + public Object Unmarshal(BinaryReader dis) + { + int size = DataStreamMarshaller.ReadInt(dis); + byte dataType = DataStreamMarshaller.ReadByte(dis); + if (dataType != NULL_TYPE) + { + DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF]; + if (dsm == null) + throw new IOException("Unknown data type: " + dataType); + Console.WriteLine("Parsing type: " + dataType + " with: " + dsm); + Object data = dsm.CreateObject(); + BooleanStream bs = new BooleanStream(); + bs.Unmarshal(dis); + dsm.Unmarshal(this, data, dis, bs); + return data; + } + else + { + return null; + } + } + + public int Marshal1NestedObject(DataStructure o, BooleanStream bs) + { + bs.WriteBoolean(o != null); + if (o == null) + return 0; + + if (o.IsMarshallAware()) + { + MarshallAware ma = (MarshallAware) o; + byte[] sequence = ma.GetMarshalledForm(this); + bs.WriteBoolean(sequence != null); + if (sequence != null) { - dataMarshallers[marshaller.GetDataStructureType()] = marshaller; + return 1 + sequence.Length; + } + } + + byte type = o.GetDataStructureType(); + if (type == 0) { + throw new IOException("No valid data structure type for: " + o + " of type: " + o.GetType()); + } + DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; + if (dsm == null) + throw new IOException("Unknown data type: " + type); + Console.WriteLine("Marshalling type: " + type + " with structure: " + o); + return 1 + dsm.Marshal1(this, o, bs); + } + + public void Marshal2NestedObject(DataStructure o, BinaryWriter ds, BooleanStream bs) + { + if (!bs.ReadBoolean()) + return ; + + byte type = o.GetDataStructureType(); + DataStreamMarshaller.WriteByte(type, ds); + + if (o.IsMarshallAware() && bs.ReadBoolean()) + { + MarshallAware ma = (MarshallAware) o; + byte[] sequence = ma.GetMarshalledForm(this); + ds.Write(sequence, 0, sequence.Length); + } + else + { + + DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; + if (dsm == null) + throw new IOException("Unknown data type: " + type); + dsm.Marshal2(this, o, ds, bs); + } + } + + public DataStructure UnmarshalNestedObject(BinaryReader dis, BooleanStream bs) + { + if (bs.ReadBoolean()) + { + + byte dataType = DataStreamMarshaller.ReadByte(dis); + DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF]; + if (dsm == null) + throw new IOException("Unknown data type: " + dataType); + DataStructure data = dsm.CreateObject(); + + if (data.IsMarshallAware() && bs.ReadBoolean()) + { + DataStreamMarshaller.ReadInt(dis); + DataStreamMarshaller.ReadByte(dis); + + BooleanStream bs2 = new BooleanStream(); + bs2.Unmarshal(dis); + dsm.Unmarshal(this, data, dis, bs2); + + // TODO: extract the sequence from the dis and associate it. + // MarshallAware ma = (MarshallAware)data + // ma.setCachedMarshalledForm(this, sequence); + } + else + { + dsm.Unmarshal(this, data, dis, bs); } - public void Marshal(Object o, BinaryWriter ds) { - int size = 1; - if (o != null) { - DataStructure c = (DataStructure) o; - byte type = c.GetDataStructureType(); - DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; - if (dsm == null) - throw new IOException("Unknown data type: " + type); - - BooleanStream bs = new BooleanStream(); - size += dsm.Marshal1(this, c, bs); - size += bs.MarshalledSize(); - - ds.Write(size); - ds.Write(type); - bs.Marshal(ds); - dsm.Marshal2(this, c, ds, bs); - } else { - ds.Write(size); - ds.Write(NULL_TYPE); - } - } - - public Object Unmarshal(BinaryReader dis) { - byte dataType = dis.ReadByte(); - if (dataType != NULL_TYPE) { - DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF]; - if (dsm == null) - throw new IOException("Unknown data type: " + dataType); - Object data = dsm.CreateObject(); - BooleanStream bs = new BooleanStream(); - bs.Unmarshal(dis); - dsm.Unmarshal(this, data, dis, bs); - return data; - } else { - return null; - } - } - - public int Marshal1NestedObject(DataStructure o, BooleanStream bs) { - bs.WriteBoolean(o != null); - if (o == null) - return 0; - - if (o.IsMarshallAware()) { - MarshallAware ma = (MarshallAware) o; - byte[] sequence = ma.GetMarshalledForm(this); - bs.WriteBoolean(sequence != null); - if (sequence != null) { - return 1 + sequence.Length; - } - } - - byte type = o.GetDataStructureType(); - DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; - if (dsm == null) - throw new IOException("Unknown data type: " + type); - return 1 + dsm.Marshal1(this, o, bs); - } - - public void Marshal2NestedObject(DataStructure o, BinaryWriter ds, BooleanStream bs) { - if (!bs.ReadBoolean()) - return ; - - byte type = o.GetDataStructureType(); - ds.Write(type); - - if (o.IsMarshallAware() && bs.ReadBoolean()) { - MarshallAware ma = (MarshallAware) o; - byte[] sequence = ma.GetMarshalledForm(this); - ds.Write(sequence, 0, sequence.Length); - } else { - - DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; - if (dsm == null) - throw new IOException("Unknown data type: " + type); - dsm.Marshal2(this, o, ds, bs); - } - } - - public DataStructure UnmarshalNestedObject(BinaryReader dis, BooleanStream bs) { - if (bs.ReadBoolean()) { - - byte dataType = dis.ReadByte(); - DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF]; - if (dsm == null) - throw new IOException("Unknown data type: " + dataType); - DataStructure data = dsm.CreateObject(); - - if (data.IsMarshallAware() && bs.ReadBoolean()) { - - dis.ReadInt32(); - dis.ReadByte(); - - BooleanStream bs2 = new BooleanStream(); - bs2.Unmarshal(dis); - dsm.Unmarshal(this, data, dis, bs2); - - // TODO: extract the sequence from the dis and associate it. - // MarshallAware ma = (MarshallAware)data - // ma.setCachedMarshalledForm(this, sequence); - } else { - dsm.Unmarshal(this, data, dis, bs); - } - - return data; - } else { - return null; - } - } + return data; + } + else + { + return null; + } } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs b/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs index 86a56e27d9..69221af974 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs @@ -12,134 +12,193 @@ using OpenWire.Client.Commands; using OpenWire.Client.Core; using OpenWire.Client.IO; -namespace OpenWire.Client.Core { - - /// - /// An implementation of ITransport that uses sockets to communicate with the broker - /// - public class SocketTransport : ITransport { - private readonly object transmissionLock = new object(); - private readonly Socket socket; - private readonly BinaryReader socketReader; - private readonly BinaryWriter socketWriter; - private readonly Thread readThread; - private bool closed; - private IDictionary requestMap = new Hashtable(); // TODO threadsafe - private short nextCommandId; - - public event CommandHandler Command; - public event ExceptionHandler Exception; - private OpenWireFormat wireformat = new OpenWireFormat(); - - public SocketTransport(string host, int port) { - Console.WriteLine("Opening socket to: " + host + " on port: " + port); - socket = Connect(host, port); - socketWriter = new BinaryWriter(new NetworkStream(socket)); - socketReader = new BinaryReader(new NetworkStream(socket)); - - // now lets create the background read thread - readThread = new Thread(new ThreadStart(ReadLoop)); - readThread.Start(); - } - - public void Oneway(Command command) { - BaseCommand baseCommand = (BaseCommand) command; - baseCommand.CommandId = GetNextCommandId(); - baseCommand.ResponseRequired = false; - Send(command); - } - - public FutureResponse AsyncRequest(Command command) { - BaseCommand baseCommand = (BaseCommand) command; - baseCommand.CommandId = GetNextCommandId(); - baseCommand.ResponseRequired = true; - Send(command); - FutureResponse future = new FutureResponse(); - requestMap[baseCommand.CommandId] = future; - return future; - } - - public Response Request(Command command) { - FutureResponse response = AsyncRequest(command); - return response.Response; - } - - public void Dispose() { - Console.WriteLine("Closing the socket"); - lock (transmissionLock) { - socket.Close(); - closed = true; +namespace OpenWire.Client.Core +{ + + /// + /// An implementation of ITransport that uses sockets to communicate with the broker + /// + public class SocketTransport : ITransport + { + private readonly object transmissionLock = new object(); + private readonly Socket socket; + private OpenWireFormat wireformat = new OpenWireFormat(); + private readonly BinaryReader socketReader; + private readonly BinaryWriter socketWriter; + private readonly Thread readThread; + private bool closed; + private IDictionary requestMap = new Hashtable(); // TODO threadsafe + private short nextCommandId; + + public event CommandHandler Command; + public event ExceptionHandler Exception; + + public SocketTransport(string host, int port) + { + Console.WriteLine("Opening socket to: " + host + " on port: " + port); + socket = Connect(host, port); + NetworkStream networkStream = new NetworkStream(socket); + socketWriter = new BinaryWriter(networkStream); + socketReader = new BinaryReader(networkStream); + /* + socketWriter = new BinaryWriter(new NetworkStream(socket)); + socketReader = new BinaryReader(new NetworkStream(socket)); + */ + + // now lets create the background read thread + readThread = new Thread(new ThreadStart(ReadLoop)); + readThread.Start(); + } + + public void Oneway(Command command) + { + command.CommandId = GetNextCommandId(); + command.ResponseRequired = false; + Send(command); + } + + public FutureResponse AsyncRequest(Command command) + { + command.CommandId = GetNextCommandId(); + command.ResponseRequired = true; + Send(command); + FutureResponse future = new FutureResponse(); + requestMap[command.CommandId] = future; + return future; + } + + public Response Request(Command command) + { + FutureResponse response = AsyncRequest(command); + return response.Response; + } + + public void Dispose() + { + Console.WriteLine("Closing the socket"); + lock (transmissionLock) + { + socket.Close(); + closed = true; + } + socketWriter.Close(); + socketReader.Close(); + } + + public void ReadLoop() + { + Console.WriteLine("Starting to read commands from ActiveMQ"); + while (!closed) + { + Command command = null; + try + { + command = (Command) wireformat.Unmarshal(socketReader); + if (command != null) + { + Console.WriteLine("Received command: " + command); + if (command is RemoveInfo) + { + RemoveInfo info = (RemoveInfo) command; + Console.WriteLine("Remove CommandId: " + info.CommandId); + Console.WriteLine("Remove ObjectID: " + info.ObjectId); } - socketWriter.Close(); - socketReader.Close(); + } } - - public void ReadLoop() { - Console.WriteLine("Starting to read commands from ActiveMQ"); - while (!closed) { - BaseCommand command = null; - try { - command = (BaseCommand) wireformat.Unmarshal(socketReader); - } catch (ObjectDisposedException e) { - // stream closed - break; - } - if (command is Response) { - Console.WriteLine("Received response!: " + command); - Response response = (Response) command; - FutureResponse future = (FutureResponse) requestMap[response.CommandId]; - if (future != null) { - if (response is ExceptionResponse) { - ExceptionResponse er = (ExceptionResponse) response; - if (this.Exception != null) { - Exception e = new BrokerException(er.Exception); - this.Exception(this, e); - } - } else { - future.Response = response; - } - } else { - Console.WriteLine("Unknown response ID: " + response.CommandId); - } - } else { - if (this.Command != null) { - this.Command(this, command); - } - } - } + catch (EndOfStreamException e) + { + // stream closed + break; } - - - // Implementation methods - - protected void Send(Command command) { - lock (transmissionLock) { - wireformat.Marshal(command, socketWriter); - socketWriter.Flush(); - } + catch (ObjectDisposedException e) + { + // stream closed + break; } - - protected short GetNextCommandId() { - lock (transmissionLock) { - return++nextCommandId; - } - } - - protected Socket Connect(string host, int port) { - // Looping through the AddressList allows different type of connections to be tried - // (IPv4, IPv6 and whatever else may be available). - IPHostEntry hostEntry = Dns.Resolve(host); - foreach (IPAddress address in hostEntry.AddressList) { - Socket socket = new Socket( - address.AddressFamily, - SocketType.Stream, - ProtocolType.Tcp); - socket.Connect(new IPEndPoint(address, port)); - if (socket.Connected) { - return socket; - } + if (command is Response) + { + Console.WriteLine("Received response!: " + command); + Response response = (Response) command; + FutureResponse future = (FutureResponse) requestMap[response.CommandId]; + if (future != null) + { + if (response is ExceptionResponse) + { + ExceptionResponse er = (ExceptionResponse) response; + Exception e = new BrokerException(er.Exception); + if (this.Exception != null) + { + this.Exception(this, e); + } + else + { + throw e; + } } - throw new SocketException(); - } - } + else + { + future.Response = response; + } + } + else + { + Console.WriteLine("Unknown response ID: " + response.CommandId); + } + } + else + { + if (this.Command != null) + { + this.Command(this, command); + } + else + { + Console.WriteLine("No handler available to process command: " + command); + } + } + } + } + + + // Implementation methods + + protected void Send(Command command) + { + lock (transmissionLock) + { + Console.WriteLine("Sending command: " + command + " with ID: " + command.CommandId + " response: " + command.ResponseRequired); + + wireformat.Marshal(command, socketWriter); + socketWriter.Flush(); + } + } + + protected short GetNextCommandId() + { + lock (transmissionLock) + { + return++nextCommandId; + } + } + + protected Socket Connect(string host, int port) + { + // Looping through the AddressList allows different type of connections to be tried + // (IPv4, IPv6 and whatever else may be available). + IPHostEntry hostEntry = Dns.Resolve(host); + foreach (IPAddress address in hostEntry.AddressList) + { + Socket socket = new Socket( + address.AddressFamily, + SocketType.Stream, + ProtocolType.Tcp); + socket.Connect(new IPEndPoint(address, port)); + if (socket.Connected) + { + return socket; + } + } + throw new SocketException(); + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs index 86870fc60b..74089be473 100755 --- a/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs +++ b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs @@ -1,21 +1,30 @@ using System; using OpenWire.Client.Commands; -namespace OpenWire.Client { +namespace OpenWire.Client +{ + public delegate void MessageHandler(IMessage message); + + /// + /// A consumer of messages + /// + public interface IMessageConsumer : IDisposable + { + /// - /// A consumer of messages + /// Waits until a message is available and returns it /// - public interface IMessageConsumer : IDisposable { - - /// - /// Waits until a message is available and returns it - /// - IMessage Receive(); - - /// - /// If a message is available immediately it is returned otherwise this method returns null - /// - IMessage ReceiveNoWait(); - - } + IMessage Receive(); + + /// + /// If a message is available immediately it is returned otherwise this method returns null + /// + IMessage ReceiveNoWait(); + + /// + /// An asynchronous listener which can be used to consume messages asynchronously + /// + event MessageHandler Listener; + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs index 0de4085394..452759b1e1 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs @@ -45,7 +45,7 @@ namespace OpenWire.Client.IO base.Unmarshal(wireFormat, o, dataIn, bs); BaseCommand info = (BaseCommand)o; - info.CommandId = dataIn.ReadInt16(); + info.CommandId = DataStreamMarshaller.ReadShort(dataIn); info.ResponseRequired = bs.ReadBoolean(); } @@ -70,7 +70,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); BaseCommand info = (BaseCommand)o; - dataOut.Write((short)info.CommandId); + DataStreamMarshaller.WriteShort(info.CommandId, dataOut); bs.ReadBoolean(); } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs index fe771f35a2..40bce86a60 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs @@ -60,7 +60,7 @@ namespace OpenWire.Client.IO info.BrokerURL = ReadString(dataIn, bs); if (bs.ReadBoolean()) { - short size = dataIn.ReadInt16(); + short size = DataStreamMarshaller.ReadShort(dataIn); BrokerInfo[] value = new BrokerInfo[size]; for( int i=0; i < size; i++ ) { value[i] = (BrokerInfo) UnmarshalNestedObject(wireFormat,dataIn, bs); diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs index af66c54ddd..ed252c244f 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs @@ -62,7 +62,7 @@ namespace OpenWire.Client.IO info.UserName = ReadString(dataIn, bs); if (bs.ReadBoolean()) { - short size = dataIn.ReadInt16(); + short size = DataStreamMarshaller.ReadShort(dataIn); BrokerId[] value = new BrokerId[size]; for( int i=0; i < size; i++ ) { value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs); diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs index 324a18a240..8cfd215a60 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs @@ -59,17 +59,17 @@ namespace OpenWire.Client.IO info.ConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs); info.Browser = bs.ReadBoolean(); info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs); - info.PrefetchSize = dataIn.ReadInt32(); + info.PrefetchSize = DataStreamMarshaller.ReadInt(dataIn); info.DispatchAsync = bs.ReadBoolean(); info.Selector = ReadString(dataIn, bs); info.SubcriptionName = ReadString(dataIn, bs); info.NoLocal = bs.ReadBoolean(); info.Exclusive = bs.ReadBoolean(); info.Retroactive = bs.ReadBoolean(); - info.Priority = dataIn.ReadByte(); + info.Priority = DataStreamMarshaller.ReadByte(dataIn); if (bs.ReadBoolean()) { - short size = dataIn.ReadInt16(); + short size = DataStreamMarshaller.ReadShort(dataIn); BrokerId[] value = new BrokerId[size]; for( int i=0; i < size; i++ ) { value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs); @@ -116,14 +116,14 @@ namespace OpenWire.Client.IO Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs); bs.ReadBoolean(); Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); - dataOut.Write((int) info.PrefetchSize); + DataStreamMarshaller.WriteInt(info.PrefetchSize, dataOut); bs.ReadBoolean(); WriteString(info.Selector, dataOut, bs); WriteString(info.SubcriptionName, dataOut, bs); bs.ReadBoolean(); bs.ReadBoolean(); bs.ReadBoolean(); - dataOut.Write((byte) info.Priority); + DataStreamMarshaller.WriteByte(info.Priority, dataOut); MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); bs.ReadBoolean(); diff --git a/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs index 2f9c94423a..58725018df 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs @@ -58,7 +58,7 @@ namespace OpenWire.Client.IO DataArrayResponse info = (DataArrayResponse)o; if (bs.ReadBoolean()) { - short size = dataIn.ReadInt16(); + short size = DataStreamMarshaller.ReadShort(dataIn); DataStructure[] value = new DataStructure[size]; for( int i=0; i < size; i++ ) { value[i] = (DataStructure) UnmarshalNestedObject(wireFormat,dataIn, bs); diff --git a/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs index 69234a8fc7..5148ecf733 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs @@ -58,11 +58,11 @@ namespace OpenWire.Client.IO DestinationInfo info = (DestinationInfo)o; info.ConnectionId = (ConnectionId) UnmarshalCachedObject(wireFormat, dataIn, bs); info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs); - info.OperationType = dataIn.ReadByte(); + info.OperationType = DataStreamMarshaller.ReadByte(dataIn); info.Timeout = UnmarshalLong(wireFormat, dataIn, bs); if (bs.ReadBoolean()) { - short size = dataIn.ReadInt16(); + short size = DataStreamMarshaller.ReadShort(dataIn); BrokerId[] value = new BrokerId[size]; for( int i=0; i < size; i++ ) { value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs); @@ -100,7 +100,7 @@ namespace OpenWire.Client.IO DestinationInfo info = (DestinationInfo)o; Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); - dataOut.Write((byte) info.OperationType); + DataStreamMarshaller.WriteByte(info.OperationType, dataOut); Marshal2Long(wireFormat, info.Timeout, dataOut, bs); MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); diff --git a/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs index 84581a70c8..05baaa5b10 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs @@ -56,7 +56,7 @@ namespace OpenWire.Client.IO base.Unmarshal(wireFormat, o, dataIn, bs); IntegerResponse info = (IntegerResponse)o; - info.Result = dataIn.ReadInt32(); + info.Result = DataStreamMarshaller.ReadInt(dataIn); } @@ -79,7 +79,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); IntegerResponse info = (IntegerResponse)o; - dataOut.Write((int) info.Result); + DataStreamMarshaller.WriteInt(info.Result, dataOut); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs index 88f37fb685..3db511646e 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs @@ -57,7 +57,7 @@ namespace OpenWire.Client.IO JournalTransaction info = (JournalTransaction)o; info.TransactionId = (TransactionId) UnmarshalNestedObject(wireFormat, dataIn, bs); - info.Type = dataIn.ReadByte(); + info.Type = DataStreamMarshaller.ReadByte(dataIn); info.WasPrepared = bs.ReadBoolean(); } @@ -84,7 +84,7 @@ namespace OpenWire.Client.IO JournalTransaction info = (JournalTransaction)o; Marshal2NestedObject(wireFormat, info.TransactionId, dataOut, bs); - dataOut.Write((byte) info.Type); + DataStreamMarshaller.WriteByte(info.Type, dataOut); bs.ReadBoolean(); } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs index 860a5d3320..351abf18fd 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs @@ -59,10 +59,10 @@ namespace OpenWire.Client.IO info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs); info.TransactionId = (TransactionId) UnmarshalCachedObject(wireFormat, dataIn, bs); info.ConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs); - info.AckType = dataIn.ReadByte(); + info.AckType = DataStreamMarshaller.ReadByte(dataIn); info.FirstMessageId = (MessageId) UnmarshalNestedObject(wireFormat, dataIn, bs); info.LastMessageId = (MessageId) UnmarshalNestedObject(wireFormat, dataIn, bs); - info.MessageCount = dataIn.ReadInt32(); + info.MessageCount = DataStreamMarshaller.ReadInt(dataIn); } @@ -93,10 +93,10 @@ namespace OpenWire.Client.IO Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs); Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs); - dataOut.Write((byte) info.AckType); + DataStreamMarshaller.WriteByte(info.AckType, dataOut); Marshal2NestedObject(wireFormat, info.FirstMessageId, dataOut, bs); Marshal2NestedObject(wireFormat, info.LastMessageId, dataOut, bs); - dataOut.Write((int) info.MessageCount); + DataStreamMarshaller.WriteInt(info.MessageCount, dataOut); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs index 22c9dc6921..cd084c4a96 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs @@ -59,7 +59,7 @@ namespace OpenWire.Client.IO info.ConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs); info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs); info.Message = (Message) UnmarshalNestedObject(wireFormat, dataIn, bs); - info.RedeliveryCounter = dataIn.ReadInt32(); + info.RedeliveryCounter = DataStreamMarshaller.ReadInt(dataIn); } @@ -88,7 +88,7 @@ namespace OpenWire.Client.IO Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs); Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); Marshal2NestedObject(wireFormat, info.Message, dataOut, bs); - dataOut.Write((int) info.RedeliveryCounter); + DataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs index 6fba723892..0c021f7bcf 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs @@ -52,11 +52,11 @@ namespace OpenWire.Client.IO info.MessageId = (MessageId) UnmarshalNestedObject(wireFormat, dataIn, bs); info.OriginalTransactionId = (TransactionId) UnmarshalCachedObject(wireFormat, dataIn, bs); info.GroupID = ReadString(dataIn, bs); - info.GroupSequence = dataIn.ReadInt32(); + info.GroupSequence = DataStreamMarshaller.ReadInt(dataIn); info.CorrelationId = ReadString(dataIn, bs); info.Persistent = bs.ReadBoolean(); info.Expiration = UnmarshalLong(wireFormat, dataIn, bs); - info.Priority = dataIn.ReadByte(); + info.Priority = DataStreamMarshaller.ReadByte(dataIn); info.ReplyTo = (ActiveMQDestination) UnmarshalNestedObject(wireFormat, dataIn, bs); info.Timestamp = UnmarshalLong(wireFormat, dataIn, bs); info.Type = ReadString(dataIn, bs); @@ -65,10 +65,10 @@ namespace OpenWire.Client.IO info.DataStructure = (DataStructure) UnmarshalNestedObject(wireFormat, dataIn, bs); info.TargetConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs); info.Compressed = bs.ReadBoolean(); - info.RedeliveryCounter = dataIn.ReadInt32(); + info.RedeliveryCounter = DataStreamMarshaller.ReadInt(dataIn); if (bs.ReadBoolean()) { - short size = dataIn.ReadInt16(); + short size = DataStreamMarshaller.ReadShort(dataIn); BrokerId[] value = new BrokerId[size]; for( int i=0; i < size; i++ ) { value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs); @@ -134,26 +134,26 @@ namespace OpenWire.Client.IO Marshal2NestedObject(wireFormat, info.MessageId, dataOut, bs); Marshal2CachedObject(wireFormat, info.OriginalTransactionId, dataOut, bs); WriteString(info.GroupID, dataOut, bs); - dataOut.Write((int) info.GroupSequence); + DataStreamMarshaller.WriteInt(info.GroupSequence, dataOut); WriteString(info.CorrelationId, dataOut, bs); bs.ReadBoolean(); Marshal2Long(wireFormat, info.Expiration, dataOut, bs); - dataOut.Write((byte) info.Priority); + DataStreamMarshaller.WriteByte(info.Priority, dataOut); Marshal2NestedObject(wireFormat, info.ReplyTo, dataOut, bs); Marshal2Long(wireFormat, info.Timestamp, dataOut, bs); WriteString(info.Type, dataOut, bs); if(bs.ReadBoolean()) { - dataOut.Write((int)info.Content.Length); + DataStreamMarshaller.WriteInt(info.Content.Length, dataOut); dataOut.Write(info.Content); } if(bs.ReadBoolean()) { - dataOut.Write((int)info.MarshalledProperties.Length); + DataStreamMarshaller.WriteInt(info.MarshalledProperties.Length, dataOut); dataOut.Write(info.MarshalledProperties); } Marshal2NestedObject(wireFormat, info.DataStructure, dataOut, bs); Marshal2CachedObject(wireFormat, info.TargetConsumerId, dataOut, bs); bs.ReadBoolean(); - dataOut.Write((int) info.RedeliveryCounter); + DataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut); MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); Marshal2Long(wireFormat, info.Arrival, dataOut, bs); WriteString(info.UserID, dataOut, bs); diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs index 42ce3ce5be..99403c7454 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs @@ -60,7 +60,7 @@ namespace OpenWire.Client.IO info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs); if (bs.ReadBoolean()) { - short size = dataIn.ReadInt16(); + short size = DataStreamMarshaller.ReadShort(dataIn); BrokerId[] value = new BrokerId[size]; for( int i=0; i < size; i++ ) { value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs); diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs index 8d8af83d66..012c2122c7 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs @@ -56,7 +56,7 @@ namespace OpenWire.Client.IO base.Unmarshal(wireFormat, o, dataIn, bs); Response info = (Response)o; - info.CorrelationId = dataIn.ReadInt16(); + info.CorrelationId = DataStreamMarshaller.ReadShort(dataIn); } @@ -79,7 +79,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); Response info = (Response)o; - dataOut.Write((short)info.CorrelationId); + DataStreamMarshaller.WriteShort(info.CorrelationId, dataOut); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs index 3256aceb0a..3cbf95b131 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs @@ -58,7 +58,7 @@ namespace OpenWire.Client.IO TransactionInfo info = (TransactionInfo)o; info.ConnectionId = (ConnectionId) UnmarshalCachedObject(wireFormat, dataIn, bs); info.TransactionId = (TransactionId) UnmarshalCachedObject(wireFormat, dataIn, bs); - info.Type = dataIn.ReadByte(); + info.Type = DataStreamMarshaller.ReadByte(dataIn); } @@ -85,7 +85,7 @@ namespace OpenWire.Client.IO TransactionInfo info = (TransactionInfo)o; Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs); - dataOut.Write((byte) info.Type); + DataStreamMarshaller.WriteByte(info.Type, dataOut); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/WireFormatInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/WireFormatInfoMarshaller.cs index 2e1fa8cd77..585c962160 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/WireFormatInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/WireFormatInfoMarshaller.cs @@ -57,8 +57,8 @@ namespace OpenWire.Client.IO WireFormatInfo info = (WireFormatInfo)o; info.Magic = ReadBytes(dataIn, 8); - info.Version = dataIn.ReadInt32(); - info.Options = dataIn.ReadInt32(); + info.Version = DataStreamMarshaller.ReadInt(dataIn); + info.Options = DataStreamMarshaller.ReadInt(dataIn); } @@ -82,8 +82,8 @@ namespace OpenWire.Client.IO WireFormatInfo info = (WireFormatInfo)o; dataOut.Write(info.Magic, 0, 8); - dataOut.Write((int) info.Version); - dataOut.Write((int) info.Options); + DataStreamMarshaller.WriteInt(info.Version, dataOut); + DataStreamMarshaller.WriteInt(info.Options, dataOut); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/XATransactionIdMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/XATransactionIdMarshaller.cs index c7cb793077..545555e09e 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/XATransactionIdMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/XATransactionIdMarshaller.cs @@ -56,7 +56,7 @@ namespace OpenWire.Client.IO base.Unmarshal(wireFormat, o, dataIn, bs); XATransactionId info = (XATransactionId)o; - info.FormatId = dataIn.ReadInt32(); + info.FormatId = DataStreamMarshaller.ReadInt(dataIn); info.GlobalTransactionId = ReadBytes(dataIn, bs.ReadBoolean()); info.BranchQualifier = ReadBytes(dataIn, bs.ReadBoolean()); @@ -85,13 +85,13 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); XATransactionId info = (XATransactionId)o; - dataOut.Write((int) info.FormatId); + DataStreamMarshaller.WriteInt(info.FormatId, dataOut); if(bs.ReadBoolean()) { - dataOut.Write((int)info.GlobalTransactionId.Length); + DataStreamMarshaller.WriteInt(info.GlobalTransactionId.Length, dataOut); dataOut.Write(info.GlobalTransactionId); } if(bs.ReadBoolean()) { - dataOut.Write((int)info.BranchQualifier.Length); + DataStreamMarshaller.WriteInt(info.BranchQualifier.Length, dataOut); dataOut.Write(info.BranchQualifier); } diff --git a/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs index 3ec1f4aa6b..4f41194c7b 100755 --- a/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs +++ b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs @@ -1,42 +1,63 @@ using System; +using System.Threading; using OpenWire.Client.Commands; -namespace OpenWire.Client { +namespace OpenWire.Client +{ + /// + /// An object capable of receiving messages from some destination + /// + public class MessageConsumer : IMessageConsumer + { + + private Session session; + private ConsumerInfo info; + private bool closed; + + public event MessageHandler Listener; + + public MessageConsumer(Session session, ConsumerInfo info) + { + this.session = session; + this.info = info; + } + /// - /// An object capable of receiving messages from some destination + /// Method Dispatch /// - public class MessageConsumer : IMessageConsumer { - - private Session session; - private ConsumerInfo info; - private bool closed; - - public MessageConsumer(Session session, ConsumerInfo info) { - this.session = session; - this.info = info; - } - - public IMessage Receive() { - CheckClosed(); - // TODO - return null; - } - - public IMessage ReceiveNoWait() { - CheckClosed(); - // TODO - return null; - } - - public void Dispose() { - session.DisposeOf(info.ConsumerId); - closed = true; - } - - protected void CheckClosed() { - if (closed) { - throw new ConnectionClosedException(); - } - } - } + /// An ActiveMQMessage + public void Dispatch(ActiveMQMessage message) + { + Console.WriteLine("Dispatching message to consumer: " + message); + } + + public IMessage Receive() + { + CheckClosed(); + Thread.Sleep(60000); + // TODO + return null; + } + + public IMessage ReceiveNoWait() + { + CheckClosed(); + // TODO + return null; + } + + public void Dispose() + { + session.DisposeOf(info.ConsumerId); + closed = true; + } + + protected void CheckClosed() + { + if (closed) + { + throw new ConnectionClosedException(); + } + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs b/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs index 03a029d23f..d3e8dac5b7 100755 --- a/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs +++ b/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs @@ -1,30 +1,52 @@ using System; using OpenWire.Client.Commands; -namespace OpenWire.Client { - /// - /// An object capable of sending messages to some destination - /// - public class MessageProducer : IMessageProducer { - - private Session session; - private ProducerInfo info; - - public MessageProducer(Session session, ProducerInfo info) { - this.session = session; - this.info = info; - } - - public void Send(IMessage message) { - Send(info.Destination, message); - } - - public void Send(IDestination destination, IMessage message) { - session.DoSend(destination, message); - } - - public void Dispose() { - session.DisposeOf(info.ProducerId); - } - } +namespace OpenWire.Client +{ + /// + /// An object capable of sending messages to some destination + /// + public class MessageProducer : IMessageProducer + { + + private Session session; + private ProducerInfo info; + private long messageCounter; + + + public MessageProducer(Session session, ProducerInfo info) + { + this.session = session; + this.info = info; + } + + public void Send(IMessage message) + { + Send(info.Destination, message); + } + + public void Send(IDestination destination, IMessage message) + { + MessageId id = new MessageId(); + id.ProducerId = info.ProducerId; + lock (this) + { + id.ProducerSequenceId = ++messageCounter; + } + ActiveMQMessage activeMessage = (ActiveMQMessage) message; + activeMessage.MessageId = id; + activeMessage.ProducerId = info.ProducerId; + activeMessage.Destination = (ActiveMQDestination) destination; + + Console.WriteLine("About to send message with MessageId: " + activeMessage.MessageId); + Console.WriteLine("About to send message with ProducerId: " + activeMessage.ProducerId); + Console.WriteLine("About to send message with Destination: " + activeMessage.Destination); + session.DoSend(destination, message); + } + + public void Dispose() + { + session.DisposeOf(info.ProducerId); + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Session.cs b/openwire-dotnet/src/OpenWire.Client/Session.cs index b4e00873f3..5f15abe55d 100755 --- a/openwire-dotnet/src/OpenWire.Client/Session.cs +++ b/openwire-dotnet/src/OpenWire.Client/Session.cs @@ -2,102 +2,170 @@ using System; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client { +namespace OpenWire.Client +{ + /// + /// Default provider of ISession + /// + public class Session : ISession + { + private Connection connection; + private AcknowledgementMode acknowledgementMode; + private SessionInfo info; + private long consumerCounter; + private long producerCounter; + private int prefetchSize = 1000; + + public Session(Connection connection, SessionInfo info) + { + this.connection = connection; + this.info = info; + } + + public void Dispose() + { + DisposeOf(info.SessionId); + } + + public IMessageProducer CreateProducer() + { + return CreateProducer(null); + } + + public IMessageProducer CreateProducer(IDestination destination) + { + ProducerInfo command = CreateProducerInfo(destination); + connection.SyncRequest(command); + return new MessageProducer(this, command); + } + + public void Acknowledge(Message message) + { + if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) + { + MessageAck ack = new MessageAck(); + // TODO complete packet + connection.SyncRequest(ack); + } + } + + public IMessageConsumer CreateConsumer(IDestination destination) + { + return CreateConsumer(destination, null); + } + + public IMessageConsumer CreateConsumer(IDestination destination, string selector) + { + ConsumerInfo command = CreateConsumerInfo(destination, selector); + connection.SyncRequest(command); + MessageConsumer consumer = new MessageConsumer(this, command); + connection.AddConsumer(command.ConsumerId, consumer); + return consumer; + } + + public IQueue GetQueue(string name) + { + return new ActiveMQQueue(name); + } + + public ITopic GetTopic(string name) + { + return new ActiveMQTopic(name); + } + + public IMessage CreateMessage() + { + ActiveMQMessage answer = new ActiveMQMessage(); + Configure(answer); + return answer; + } + + + public ITextMessage CreateTextMessage() + { + ActiveMQTextMessage answer = new ActiveMQTextMessage(); + Configure(answer); + return answer; + } + + public ITextMessage CreateTextMessage(string text) + { + ActiveMQTextMessage answer = new ActiveMQTextMessage(text); + Configure(answer); + return answer; + } + + // Implementation methods + public void DoSend(IDestination destination, IMessage message) + { + ActiveMQMessage command = ActiveMQMessage.Transform(message); + // TODO complete packet + connection.SyncRequest(command); + } + + public void DisposeOf(DataStructure objectId) + { + Console.WriteLine("Disposing of session: " + objectId + " with datatype: " + objectId.GetDataStructureType()); + /* + RemoveInfo command = new RemoveInfo(); + command.ObjectId = objectId; + connection.SyncRequest(command); + */ + } + + public void DisposeOf(ConsumerId objectId) + { + Console.WriteLine("Disposing of consumer: " + objectId); + connection.RemoveConsumer(objectId); + /* + RemoveInfo command = new RemoveInfo(); + command.ObjectId = objectId; + connection.SyncRequest(command); + */ + } + + protected ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) + { + ConsumerInfo answer = new ConsumerInfo(); + ConsumerId id = new ConsumerId(); + id.ConnectionId = info.SessionId.ConnectionId; + id.SessionId = info.SessionId.Value; + lock (this) + { + id.Value = ++consumerCounter; + } + answer.ConsumerId = id; + answer.Destination = (ActiveMQDestination) destination; + answer.Selector = selector; + answer.PrefetchSize = prefetchSize; + + // TODO configure other features on the consumer + return answer; + } + + protected ProducerInfo CreateProducerInfo(IDestination destination) + { + ProducerInfo answer = new ProducerInfo(); + ProducerId id = new ProducerId(); + id.ConnectionId = info.SessionId.ConnectionId; + id.SessionId = info.SessionId.Value; + lock (this) + { + id.Value = ++producerCounter; + } + answer.ProducerId = id; + answer.Destination = (ActiveMQDestination) destination; + return answer; + } + /// - /// Default provider of ISession + /// Configures the message command /// - public class Session : ISession { - private Connection connection; - private AcknowledgementMode acknowledgementMode; - private SessionInfo info; - private long consumerCounter; - - public Session(Connection connection, SessionInfo info) { - this.connection = connection; - this.info = info; - } - - public void Dispose() { - DisposeOf(info.SessionId); - } - - public IMessageProducer CreateProducer() { - return CreateProducer(null); - } - - public IMessageProducer CreateProducer(IDestination destination) { - ProducerInfo command = CreateProducerInfo(destination); - connection.SyncRequest(command); - return new MessageProducer(this, command); - } - - public void Acknowledge(Message message) { - if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) { - MessageAck ack = new MessageAck(); - // TODO complete packet - connection.SyncRequest(ack); - } - } - - public IMessageConsumer CreateConsumer(IDestination destination) { - return CreateConsumer(destination, null); - } - - public IMessageConsumer CreateConsumer(IDestination destination, string selector) { - ConsumerInfo command = CreateConsumerInfo(destination, selector); - connection.SyncRequest(command); - return new MessageConsumer(this, command); - } - - public IQueue GetQueue(string name) { - return new ActiveMQQueue(name); - } - - public ITopic GetTopic(string name) { - return new ActiveMQTopic(name); - } - - public IMessage CreateMessage() { - return new ActiveMQMessage(); - } - - public ITextMessage CreateTextMessage() { - return new ActiveMQTextMessage(); - } - - public ITextMessage CreateTextMessage(string text) { - return new ActiveMQTextMessage(text); - } - - // Implementation methods - public void DoSend(IDestination destination, IMessage message) { - ActiveMQMessage command = ActiveMQMessage.Transform(message); - // TODO complete packet - connection.SyncRequest(command); - } - - public void DisposeOf(DataStructure objectId) { - RemoveInfo command = new RemoveInfo(); - command.ObjectId = objectId; - connection.SyncRequest(command); - } - - protected ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) { - ConsumerInfo answer = new ConsumerInfo(); - ConsumerId consumerId = new ConsumerId(); - consumerId.SessionId = info.SessionId.Value; - lock (this) { - consumerId.Value = ++consumerCounter; - } - // TODO complete packet - answer.ConsumerId = consumerId; - return answer; - } - - protected ProducerInfo CreateProducerInfo(IDestination destination) { - ProducerInfo info = new ProducerInfo(); - // TODO complete packet - return info; - } - } + /// An ActiveMQMessage + /// An IMessage + protected void Configure(ActiveMQMessage message) + { + } + + } } diff --git a/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs b/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs index c8381a03d7..3c2b503bbd 100644 --- a/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs +++ b/openwire-dotnet/tests/OpenWire.Client/ClientTest.cs @@ -4,39 +4,84 @@ using System.IO; using NUnit.Framework; using OpenWire.Client; +using OpenWire.Client.Core; -namespace OpenWire.Client { - - [ TestFixture ] - public class ClientTest : TestSupport { - - [ Test ] - public void SendAndSyncReceive() { - IConnectionFactory factory = new ConnectionFactory("localhost", 61616); +namespace OpenWire.Client +{ + + [ TestFixture ] + public class ClientTest : TestSupport + { + + [ Test ] + public void CreateOpenWireFormat() + { + OpenWireFormat format = new OpenWireFormat(); + Assert.IsTrue(format != null); + } + + [ Test ] + public void CreateConnectionFactory() + { + IConnectionFactory factory = new ConnectionFactory("localhost", 61616); + Assert.IsTrue(factory != null, "created valid factory: " + factory); + } + + [ Test ] + public void SendAndSyncReceive() + { + IConnectionFactory factory = new ConnectionFactory("localhost", 61616); + + Assert.IsTrue(factory != null, "no factory created"); + + Console.WriteLine("Worked!"); + + using (IConnection connection = factory.CreateConnection()) + { + try + { + Assert.IsTrue(connection != null, "no connection created"); + Console.WriteLine("Created a connection!"); + + ISession session = connection.CreateSession(); + Console.WriteLine("Created a session: " + session); + + IDestination destination = session.GetQueue("FOO.BAR"); + Assert.IsTrue(destination != null, "No queue available!"); + Console.WriteLine("Using destination: " + destination); + + IMessageConsumer consumer = session.CreateConsumer(destination); + Console.WriteLine("Created consumer!: " + consumer); + + IMessageProducer producer = session.CreateProducer(destination); + Console.WriteLine("Created producer!: " + producer); + + string expected = "Hello World!"; + ITextMessage request = session.CreateTextMessage(expected); + Console.WriteLine("### About to send message: " + request); + + producer.Send(request); + Console.WriteLine("### Sent message!"); + + ITextMessage message = (ITextMessage) consumer.Receive(); + if (message == null) + { + Console.WriteLine("### No message!!"); + } + else + { + Console.WriteLine("### Received message: " + message + " of type: " + message.GetType()); + String actual = message.Text; - Assert.IsTrue(factory != null, "created valid factory: " + factory); - - Console.WriteLine("Worked!"); - - using (IConnection connection = factory.CreateConnection()) { - ISession session = connection.CreateSession(); - Console.WriteLine("Created a session: " + session); - - IDestination destination = session.GetQueue("FOO.BAR"); - Assert.IsTrue(destination != null, "No queue available!"); - Console.WriteLine("Using destination: " + destination); - - IMessageConsumer consumer = session.CreateConsumer(destination); - - IMessageProducer producer = session.CreateProducer(destination); - string expected = "Hello World!"; - ITextMessage request = session.CreateTextMessage(expected); - producer.Send(request); - - ITextMessage message = (ITextMessage) consumer.Receive(); - - Assert.AreEqual(expected, message.Text); - } - } - } + Console.WriteLine("### Message text is: " + actual); + } + } + catch (Exception e) + { + Console.WriteLine("Caught: " + e); + } + } + } + } } + diff --git a/openwire-dotnet/tests/OpenWire.Client/EndianTest.cs b/openwire-dotnet/tests/OpenWire.Client/EndianTest.cs new file mode 100644 index 0000000000..c0049fbcd0 --- /dev/null +++ b/openwire-dotnet/tests/OpenWire.Client/EndianTest.cs @@ -0,0 +1,60 @@ +using NUnit.Framework; +using OpenWire.Client.Core; +using System; + +namespace openwire_dotnet +{ + [TestFixture] + public class EndianTest + { + + [Test] + public void TestLongEndian() + { + long value = 0x0102030405060708l; + + long newValue = DataStreamMarshaller.SwitchEndian(value); + + Console.WriteLine("New value: " + newValue); + + Assert.AreEqual(0x0807060504030201L, newValue); + + long actual = DataStreamMarshaller.SwitchEndian(newValue); + + Assert.AreEqual(value, actual); + } + + [Test] + public void TestIntEndian() + { + int value = 0x12345678; + + int newValue = DataStreamMarshaller.SwitchEndian(value); + + Console.WriteLine("New value: " + newValue); + + Assert.AreEqual(0x78563412, newValue); + + int actual = DataStreamMarshaller.SwitchEndian(newValue); + + Assert.AreEqual(value, actual); + } + [Test] + public void TestShortEndian() + { + short value = 0x1234; + + short newValue = DataStreamMarshaller.SwitchEndian(value); + + Console.WriteLine("New value: " + newValue); + + Assert.AreEqual(0x3412, newValue); + + short actual = DataStreamMarshaller.SwitchEndian(newValue); + + Assert.AreEqual(value, actual); + } + } +} + + diff --git a/openwire-dotnet/tests/OpenWire.Client/TestMain.cs b/openwire-dotnet/tests/OpenWire.Client/TestMain.cs new file mode 100644 index 0000000000..d305ac796a --- /dev/null +++ b/openwire-dotnet/tests/OpenWire.Client/TestMain.cs @@ -0,0 +1,59 @@ +using System; +using System.IO; + +using OpenWire.Client; +using OpenWire.Client.Core; + +namespace openwire_dotnet +{ + public class TestMain + { + public static void Main(string[] args) + { + try + { + Console.WriteLine("About to connect to ActiveMQ"); + + IConnectionFactory factory = new ConnectionFactory("localhost", 61616); + + Console.WriteLine("Worked!"); + + using (IConnection connection = factory.CreateConnection()) + { + Console.WriteLine("Created a connection!"); + + ISession session = connection.CreateSession(); + Console.WriteLine("Created a session: " + session); + + IDestination destination = session.GetQueue("FOO.BAR"); + Console.WriteLine("Using destination: " + destination); + + IMessageConsumer consumer = session.CreateConsumer(destination); + + IMessageProducer producer = session.CreateProducer(destination); + string expected = "Hello World!"; + ITextMessage request = session.CreateTextMessage(expected); + producer.Send(request); + + ITextMessage message = (ITextMessage) consumer.Receive(); + if (message == null) + { + Console.WriteLine("### No message!!"); + } + else + { + Console.WriteLine("### Received message: " + message + " of type: " + message.GetType()); + String actual = message.Text; + + Console.WriteLine("### Message text is: " + actual); + } + } + } + catch (Exception e) + { + Console.WriteLine("Caught: " + e); + Console.WriteLine("Stack: " + e.StackTrace); + } + } + } +}