updated working .Net code which is capable of creating connections, sessions, producers and consumers - not quite completed the consumer side yet but we can send messages now! :)

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@380183 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-02-23 18:16:15 +00:00
parent eaafdbd69a
commit 8ba8da2388
38 changed files with 2166 additions and 1450 deletions

View File

@ -22,13 +22,13 @@ namespace OpenWire.Client.Commands {
public override bool IsMarshallAware() {
return true;
}
return true;
}
// Properties
public IDestination FromDestination {
get { return Destination; }
set { this.Destination = ActiveMQDestination.Transform(value); }
get { return Destination; }
set { this.Destination = ActiveMQDestination.Transform(value); }
}
public void BeforeMarshall(OpenWireFormat wireFormat) {

View File

@ -3,28 +3,40 @@ using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Commands {
/// <summary>
/// Summary description for ActiveMQQueue.
/// </summary>
public class ActiveMQQueue : ActiveMQDestination, IQueue {
public const byte ID_ActiveMQQueue = 100;
public ActiveMQQueue() : base() {
}
public ActiveMQQueue(String name) : base(name) {
}
public String QueueName {
get { return PhysicalName; }
}
public override int GetDestinationType() {
return ACTIVEMQ_QUEUE;
}
public override ActiveMQDestination CreateDestination(String name) {
return new ActiveMQQueue(name);
}
}
namespace OpenWire.Client.Commands
{
/// <summary>
/// Summary description for ActiveMQQueue.
/// </summary>
public class ActiveMQQueue : ActiveMQDestination, IQueue
{
public const byte ID_ActiveMQQueue = 100;
public ActiveMQQueue() : base()
{
}
public ActiveMQQueue(String name) : base(name)
{
}
public String QueueName
{
get { return PhysicalName; }
}
public override byte GetDataStructureType()
{
return ID_ActiveMQQueue;
}
public override int GetDestinationType()
{
return ACTIVEMQ_QUEUE;
}
public override ActiveMQDestination CreateDestination(String name)
{
return new ActiveMQQueue(name);
}
}
}

View File

@ -3,28 +3,40 @@ using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Commands {
/// <summary>
/// Summary description for ActiveMQTempQueue.
/// </summary>
public class ActiveMQTempQueue : ActiveMQDestination, ITemporaryQueue {
public const byte ID_ActiveMQTempQueue = 102;
public ActiveMQTempQueue() : base() {
}
public ActiveMQTempQueue(String name) : base(name) {
}
public String GetQueueName() {
return PhysicalName;
}
public override int GetDestinationType() {
return ACTIVEMQ_QUEUE;
}
public override ActiveMQDestination CreateDestination(String name) {
return new ActiveMQTempQueue(name);
}
}
namespace OpenWire.Client.Commands
{
/// <summary>
/// Summary description for ActiveMQTempQueue.
/// </summary>
public class ActiveMQTempQueue : ActiveMQDestination, ITemporaryQueue
{
public const byte ID_ActiveMQTempQueue = 102;
public ActiveMQTempQueue() : base()
{
}
public ActiveMQTempQueue(String name) : base(name)
{
}
public String GetQueueName()
{
return PhysicalName;
}
public override byte GetDataStructureType()
{
return ID_ActiveMQTempQueue;
}
public override int GetDestinationType()
{
return ACTIVEMQ_QUEUE;
}
public override ActiveMQDestination CreateDestination(String name)
{
return new ActiveMQTempQueue(name);
}
}
}

View File

@ -3,27 +3,40 @@ using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Commands {
/// <summary>
/// Summary description for ActiveMQTempTopic.
/// </summary>
public class ActiveMQTempTopic : ActiveMQDestination, ITemporaryTopic {
public const byte ID_ActiveMQTempTopic = 103;
public ActiveMQTempTopic() : base() {
}
public ActiveMQTempTopic(String name) : base(name) {
}
public String GetTopicName() {
return PhysicalName;
}
public override int GetDestinationType() {
return ACTIVEMQ_TOPIC;
}
public override ActiveMQDestination CreateDestination(String name) {
return new ActiveMQTempTopic(name);
}
}
namespace OpenWire.Client.Commands
{
/// <summary>
/// Summary description for ActiveMQTempTopic.
/// </summary>
public class ActiveMQTempTopic : ActiveMQDestination, ITemporaryTopic
{
public const byte ID_ActiveMQTempTopic = 103;
public ActiveMQTempTopic() : base()
{
}
public ActiveMQTempTopic(String name) : base(name)
{
}
public String GetTopicName()
{
return PhysicalName;
}
public override byte GetDataStructureType()
{
return ID_ActiveMQTempTopic;
}
public override int GetDestinationType()
{
return ACTIVEMQ_TOPIC;
}
public override ActiveMQDestination CreateDestination(String name)
{
return new ActiveMQTempTopic(name);
}
}
}

View File

@ -3,28 +3,40 @@ using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Commands {
/// <summary>
/// Summary description for ActiveMQTopic.
/// </summary>
public class ActiveMQTopic : ActiveMQDestination, ITopic {
public const byte ID_ActiveMQTopic = 101;
public ActiveMQTopic() : base() {
}
public ActiveMQTopic(String name) : base(name) {
}
public String TopicName {
get { return PhysicalName; }
}
public override int GetDestinationType() {
return ACTIVEMQ_TOPIC;
}
public override ActiveMQDestination CreateDestination(String name) {
return new ActiveMQTopic(name);
}
}
namespace OpenWire.Client.Commands
{
/// <summary>
/// Summary description for ActiveMQTopic.
/// </summary>
public class ActiveMQTopic : ActiveMQDestination, ITopic
{
public const byte ID_ActiveMQTopic = 101;
public ActiveMQTopic() : base()
{
}
public ActiveMQTopic(String name) : base(name)
{
}
public String TopicName
{
get { return PhysicalName; }
}
public override byte GetDataStructureType()
{
return ID_ActiveMQTopic;
}
public override int GetDestinationType()
{
return ACTIVEMQ_TOPIC;
}
public override ActiveMQDestination CreateDestination(String name)
{
return new ActiveMQTopic(name);
}
}
}

View File

@ -13,51 +13,38 @@ using System.Collections;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client.Commands {
public class BaseCommand : AbstractCommand {
public const byte ID_BaseCommand = 0;
short commandId;
bool responseRequired;
public override int GetHashCode() {
return commandId;
}
public override bool Equals(Object that) {
if (that is BaseCommand) {
BaseCommand thatCommand = (BaseCommand) that;
return this.GetDataStructureType() == thatCommand.GetDataStructureType()
&& this.CommandId == thatCommand.CommandId;
}
return false;
}
public override String ToString() {
string answer = GetDataStructureTypeAsString(GetDataStructureType());
if (answer.Length == 0) {
answer = base.ToString();
}
return answer + ": id = " + CommandId;
}
public override byte GetDataStructureType() {
return ID_BaseCommand;
}
// Properties
public short CommandId {
get { return commandId; }
set { this.commandId = value; }
}
public bool ResponseRequired {
get { return responseRequired; }
set { this.responseRequired = value; }
}
}
namespace OpenWire.Client.Commands
{
public abstract class BaseCommand : AbstractCommand
{
public override int GetHashCode()
{
return (CommandId * 37) + GetDataStructureType();
}
public override bool Equals(Object that)
{
if (that is BaseCommand)
{
BaseCommand thatCommand = (BaseCommand) that;
return this.GetDataStructureType() == thatCommand.GetDataStructureType()
&& this.CommandId == thatCommand.CommandId;
}
return false;
}
public override String ToString()
{
string answer = GetDataStructureTypeAsString(GetDataStructureType());
if (answer.Length == 0)
{
answer = base.ToString();
}
return answer + ": id = " + CommandId;
}
}
}

View File

@ -3,117 +3,214 @@ using System.Collections;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client {
namespace OpenWire.Client
{
/// <summary>
/// Represents a connection with a message broker
/// </summary>
public class Connection : IConnection
{
static private char[] MAGIC = new char[] { 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' };
private ITransport transport;
private ConnectionInfo info;
private WireFormatInfo wireFormatInfo = new WireFormatInfo();
IList sessions = new ArrayList();
private bool transacted;
private bool connected;
private bool closed;
private AcknowledgementMode acknowledgementMode;
private long sessionCounter;
private IDictionary consumers = new Hashtable(); // TODO threadsafe
public Connection(ITransport transport, ConnectionInfo info)
{
this.transport = transport;
this.info = info;
this.transport.Command += new CommandHandler(OnCommand);
}
/// <summary>
/// Represents a connection with a message broker
/// Creates a new session to work on this connection
/// </summary>
public class Connection : IConnection {
private ConnectionInfo info;
private ITransport transport;
IList sessions = new ArrayList();
private bool transacted;
private bool connected;
private bool closed;
private AcknowledgementMode acknowledgementMode;
private long sessionCounter;
public Connection(ITransport transport, ConnectionInfo info) {
this.transport = transport;
this.info = info;
public ISession CreateSession()
{
return CreateSession(transacted, acknowledgementMode);
}
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
public ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode)
{
CheckConnected();
SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode);
SyncRequest(info);
Session session = new Session(this, info);
sessions.Add(session);
return session;
}
public void Dispose()
{
foreach (Session session in sessions)
{
session.Dispose();
}
sessions.Clear();
transport.Dispose();
closed = true;
}
// Properties
public ITransport ITransport
{
get { return transport; }
set { this.transport = value; }
}
public bool Transacted
{
get { return transacted; }
set { this.transacted = value; }
}
public AcknowledgementMode AcknowledgementMode
{
get { return acknowledgementMode; }
set { this.acknowledgementMode = value; }
}
public string ClientId
{
get { return info.ClientId; }
set {
if (connected)
{
throw new OpenWireException("You cannot change the ClientId once the Connection is connected");
}
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
public ISession CreateSession() {
return CreateSession(transacted, acknowledgementMode);
info.ClientId = value;
}
}
// Implementation methods
/// <summary>
/// Performs a synchronous request-response with the broker
/// </summary>
public Response SyncRequest(Command command)
{
Response response = transport.Request(command);
if (response is ExceptionResponse)
{
ExceptionResponse exceptionResponse = (ExceptionResponse) response;
// TODO include stack trace
throw new OpenWireException("Request failed: " + exceptionResponse);
}
return response;
}
protected SessionInfo CreateSessionInfo(bool transacted, AcknowledgementMode acknowledgementMode)
{
SessionInfo answer = new SessionInfo();
SessionId sessionId = new SessionId();
sessionId.ConnectionId = info.ConnectionId.Value;
lock (this)
{
sessionId.Value = ++sessionCounter;
}
answer.SessionId = sessionId;
return answer;
}
protected void CheckConnected()
{
if (closed)
{
throw new ConnectionClosedException();
}
if (!connected)
{
Console.WriteLine("ConnectionId: " + info.ConnectionId.Value);
Console.WriteLine("ClientID: " + info.ClientId);
Console.WriteLine("About to send WireFormatInfo: " + wireFormatInfo);
// lets configure the wire format
wireFormatInfo.Magic = CreateMagicBytes();
wireFormatInfo.Version = 1;
transport.Oneway(wireFormatInfo);
Console.WriteLine("About to send ConnectionInfo: " + info);
SyncRequest(info);
Console.WriteLine("Received connection info response");
connected = true;
}
}
/// <summary>
/// Register a new consumer
/// </summary>
/// <param name="consumerId">A ConsumerId</param>
/// <param name="consumer">A MessageConsumer</param>
public void AddConsumer(ConsumerId consumerId, MessageConsumer consumer)
{
Console.WriteLine("#### Adding consumerId: " + consumerId.Value + " session: " + consumerId.SessionId + " with consumer: " + consumer);
consumers[consumerId] = consumer;
}
/// <summary>
/// Remove a consumer
/// </summary>
/// <param name="consumerId">A ConsumerId</param>
public void RemoveConsumer(ConsumerId consumerId)
{
consumers[consumerId] = null;
}
/// <summary>
/// Handle incoming commands
/// </summary>
/// <param name="transport">An ITransport</param>
/// <param name="command">A Command</param>
protected void OnCommand(ITransport transport, Command command)
{
if (command is MessageDispatch) {
MessageDispatch dispatch = (MessageDispatch) command;
ConsumerId consumerId = dispatch.ConsumerId;
MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
if (consumer == null) {
Console.WriteLine("No such consumer active: " + consumerId);
Console.WriteLine("No such consumer active: " + consumerId.Value);
Console.WriteLine("No such consumer active: " + consumerId.SessionId);
}
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
public ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode) {
CheckConnected();
SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode);
SyncRequest(info);
Session session = new Session(this, info);
sessions.Add(session);
return session;
else {
ActiveMQMessage message = (ActiveMQMessage) dispatch.Message;
consumer.Dispatch(message);
}
public void Dispose() {
foreach (Session session in sessions) {
session.Dispose();
}
sessions.Clear();
transport.Dispose();
closed = true;
}
// Properties
public ITransport ITransport {
get { return transport; }
set { this.transport = value; }
}
public bool Transacted {
get { return transacted; }
set { this.transacted = value; }
}
public AcknowledgementMode AcknowledgementMode {
get { return acknowledgementMode; }
set { this.acknowledgementMode = value; }
}
public string ClientId {
get { return info.ClientId; }
set {
if (connected) {
throw new OpenWireException("You cannot change the ClientId once the Connection is connected");
}
info.ClientId = value;
}
}
// Implementation methods
/// <summary>
/// Performs a synchronous request-response with the broker
/// </summary>
public Response SyncRequest(Command command) {
CheckConnected();
Response response = ITransport.Request(command);
if (response is ExceptionResponse) {
ExceptionResponse exceptionResponse = (ExceptionResponse) response;
// TODO include stack trace
throw new OpenWireException("Request failed: " + exceptionResponse);
}
return response;
}
protected SessionInfo CreateSessionInfo(bool transacted, AcknowledgementMode acknowledgementMode) {
SessionInfo answer = new SessionInfo();
SessionId sessionId = new SessionId();
sessionId.ConnectionId = info.ConnectionId.Value;
lock (this) {
sessionId.Value = ++sessionCounter;
}
answer.SessionId = sessionId;
return answer;
}
protected void CheckConnected() {
if (closed) {
throw new ConnectionClosedException();
}
if (!connected) {
SyncRequest(info);
connected = true;
}
}
}
}
else {
Console.WriteLine("Unknown command: " + command);
}
}
/// <summary>
/// Method CreateMagicBytes
/// </summary>
/// <returns>A byte[]</retutns>
private byte[] CreateMagicBytes()
{
byte[] answer = new byte[MAGIC.Length];
for (int i = 0; i < answer.Length; i++)
{
answer[i] = (byte) MAGIC[i];
}
return answer;
}
}
}

View File

@ -3,82 +3,102 @@ using System.Collections;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client {
/// <summary>
/// Represents a connection with a message broker
/// </summary>
public class ConnectionFactory : IConnectionFactory {
private string host = "localhost";
private int port = 61616;
private string userName;
private string password;
private string clientId;
public ConnectionFactory() {
}
public ConnectionFactory(string host, int port) {
this.host = host;
this.port = port;
}
public IConnection CreateConnection() {
return CreateConnection(userName, password);
}
public IConnection CreateConnection(string userName, string password) {
ConnectionInfo info = CreateConnectionInfo(userName, password);
ITransport transport = CreateITransport();
Connection connection = new Connection(transport, info);
connection.ClientId = clientId;
return connection;
}
// Properties
public string Host {
get { return host; }
set { host = value; }
}
public int Port {
get { return port; }
set { port = value; }
}
public string UserName {
get { return userName; }
set { userName = value; }
}
public string Password {
get { return password; }
set { password = value; }
}
public string ClientId {
get { return clientId; }
set { clientId = value; }
}
// Implementation methods
protected ConnectionInfo CreateConnectionInfo(string userName, string password) {
ConnectionInfo answer = new ConnectionInfo();
ConnectionId connectionId = new ConnectionId();
connectionId.Value = CreateNewConnectionID();
answer.ConnectionId = connectionId;
answer.UserName = userName;
answer.Password = password;
return answer;
}
protected string CreateNewConnectionID() {
return Guid.NewGuid().ToString();
}
protected ITransport CreateITransport() {
return new SocketTransport(host, port);
}
}
namespace OpenWire.Client
{
/// <summary>
/// Represents a connection with a message broker
/// </summary>
public class ConnectionFactory : IConnectionFactory
{
private string host = "localhost";
private int port = 61616;
private string userName;
private string password;
private string clientId;
public ConnectionFactory()
{
}
public ConnectionFactory(string host, int port)
{
this.host = host;
this.port = port;
}
public IConnection CreateConnection()
{
return CreateConnection(userName, password);
}
public IConnection CreateConnection(string userName, string password)
{
ConnectionInfo info = CreateConnectionInfo(userName, password);
ITransport transport = CreateTransport();
Connection connection = new Connection(transport, info);
connection.ClientId = info.ClientId;
return connection;
}
// Properties
public string Host
{
get { return host; }
set { host = value; }
}
public int Port
{
get { return port; }
set { port = value; }
}
public string UserName
{
get { return userName; }
set { userName = value; }
}
public string Password
{
get { return password; }
set { password = value; }
}
public string ClientId
{
get { return clientId; }
set { clientId = value; }
}
// Implementation methods
protected ConnectionInfo CreateConnectionInfo(string userName, string password)
{
ConnectionInfo answer = new ConnectionInfo();
ConnectionId connectionId = new ConnectionId();
connectionId.Value = CreateNewGuid();
answer.ConnectionId = connectionId;
answer.UserName = userName;
answer.Password = password;
answer.ClientId = clientId;
if (clientId == null)
{
answer.ClientId = CreateNewGuid();
}
return answer;
}
protected string CreateNewGuid()
{
return Guid.NewGuid().ToString();
}
protected ITransport CreateTransport()
{
return new SocketTransport(host, port);
}
}
}

View File

@ -3,85 +3,111 @@ using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core {
/// <summary>
/// Summary description for AbstractCommand.
/// </summary>
public abstract class AbstractCommand : Command {
protected AbstractCommand() {
}
public virtual byte GetDataStructureType() {
return 0;
}
public virtual bool IsMarshallAware() {
return false;
}
public static String GetDataStructureTypeAsString(int type) {
String packetTypeStr = "";
switch (type) {
case ActiveMQMessage.ID_ActiveMQMessage :
packetTypeStr = "ACTIVEMQ_MESSAGE";
break;
case ActiveMQTextMessage.ID_ActiveMQTextMessage :
packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE";
break;
case ActiveMQObjectMessage.ID_ActiveMQObjectMessage:
packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE";
break;
case ActiveMQBytesMessage.ID_ActiveMQBytesMessage :
packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE";
break;
case ActiveMQStreamMessage.ID_ActiveMQStreamMessage :
packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE";
break;
case ActiveMQMapMessage.ID_ActiveMQMapMessage :
packetTypeStr = "ACTIVEMQ_MAP_MESSAGE";
break;
case MessageAck.ID_MessageAck :
packetTypeStr = "ACTIVEMQ_MSG_ACK";
break;
case Response.ID_Response :
packetTypeStr = "RESPONSE";
break;
case ConsumerInfo.ID_ConsumerInfo :
packetTypeStr = "CONSUMER_INFO";
break;
case ProducerInfo.ID_ProducerInfo :
packetTypeStr = "PRODUCER_INFO";
break;
case TransactionInfo.ID_TransactionInfo :
packetTypeStr = "TRANSACTION_INFO";
break;
case BrokerInfo.ID_BrokerInfo :
packetTypeStr = "BROKER_INFO";
break;
case ConnectionInfo.ID_ConnectionInfo :
packetTypeStr = "CONNECTION_INFO";
break;
case SessionInfo.ID_SessionInfo :
packetTypeStr = "SESSION_INFO";
break;
case RemoveSubscriptionInfo.ID_RemoveSubscriptionInfo :
packetTypeStr = "DURABLE_UNSUBSCRIBE";
break;
case IntegerResponse.ID_IntegerResponse :
packetTypeStr = "INT_RESPONSE_RECEIPT_INFO";
break;
case WireFormatInfo.ID_WireFormatInfo :
packetTypeStr = "WIRE_FORMAT_INFO";
break;
case RemoveInfo.ID_RemoveInfo :
packetTypeStr = "REMOVE_INFO";
break;
case KeepAliveInfo.ID_KeepAliveInfo :
packetTypeStr = "KEEP_ALIVE";
break;
}
return packetTypeStr;
}
}
namespace OpenWire.Client.Core
{
/// <summary>
/// Summary description for AbstractCommand.
/// </summary>
public abstract class AbstractCommand : Command
{
private short commandId;
private bool responseRequired;
protected AbstractCommand()
{
}
public virtual byte GetDataStructureType()
{
return 0;
}
public virtual bool IsMarshallAware()
{
return false;
}
// Properties
public short CommandId
{
get { return commandId; }
set { this.commandId = value; }
}
public bool ResponseRequired
{
get { return responseRequired; }
set { this.responseRequired = value; }
}
public static String GetDataStructureTypeAsString(int type)
{
String packetTypeStr = "";
switch (type)
{
case ActiveMQMessage.ID_ActiveMQMessage :
packetTypeStr = "ACTIVEMQ_MESSAGE";
break;
case ActiveMQTextMessage.ID_ActiveMQTextMessage :
packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE";
break;
case ActiveMQObjectMessage.ID_ActiveMQObjectMessage:
packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE";
break;
case ActiveMQBytesMessage.ID_ActiveMQBytesMessage :
packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE";
break;
case ActiveMQStreamMessage.ID_ActiveMQStreamMessage :
packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE";
break;
case ActiveMQMapMessage.ID_ActiveMQMapMessage :
packetTypeStr = "ACTIVEMQ_MAP_MESSAGE";
break;
case MessageAck.ID_MessageAck :
packetTypeStr = "ACTIVEMQ_MSG_ACK";
break;
case Response.ID_Response :
packetTypeStr = "RESPONSE";
break;
case ConsumerInfo.ID_ConsumerInfo :
packetTypeStr = "CONSUMER_INFO";
break;
case ProducerInfo.ID_ProducerInfo :
packetTypeStr = "PRODUCER_INFO";
break;
case TransactionInfo.ID_TransactionInfo :
packetTypeStr = "TRANSACTION_INFO";
break;
case BrokerInfo.ID_BrokerInfo :
packetTypeStr = "BROKER_INFO";
break;
case ConnectionInfo.ID_ConnectionInfo :
packetTypeStr = "CONNECTION_INFO";
break;
case SessionInfo.ID_SessionInfo :
packetTypeStr = "SESSION_INFO";
break;
case RemoveSubscriptionInfo.ID_RemoveSubscriptionInfo :
packetTypeStr = "DURABLE_UNSUBSCRIBE";
break;
case IntegerResponse.ID_IntegerResponse :
packetTypeStr = "INT_RESPONSE_RECEIPT_INFO";
break;
case WireFormatInfo.ID_WireFormatInfo :
packetTypeStr = "WIRE_FORMAT_INFO";
break;
case RemoveInfo.ID_RemoveInfo :
packetTypeStr = "REMOVE_INFO";
break;
case KeepAliveInfo.ID_KeepAliveInfo :
packetTypeStr = "KEEP_ALIVE";
break;
}
return packetTypeStr;
}
}
}

View File

@ -5,89 +5,115 @@ using OpenWire.Client.Commands;
using OpenWire.Client.Core;
using OpenWire.Client.IO;
namespace OpenWire.Client.Core {
/// <summary>
/// Represents a stream of boolean flags
/// </summary>
public class BooleanStream {
byte[] data = new byte[48];
short arrayLimit;
short arrayPos;
byte bytePos;
public bool ReadBoolean() {
byte b = data[arrayPos];
bool rc = ((b >> bytePos) & 0x01) != 0;
bytePos++;
if (bytePos >= 8) {
bytePos = 0;
arrayPos++;
}
return rc;
}
public void WriteBoolean(bool value) {
if (bytePos == 0) {
arrayLimit++;
if (arrayLimit >= data.Length) {
// re-grow the array.
byte[] d = new byte[data.Length * 2];
for (int i = 0; i < data.Length; i++) {
d[i] = data[i];
}
data = d;
}
}
if (value) {
data[arrayPos] |= (byte) (0x01 << bytePos);
}
bytePos++;
if (bytePos >= 8) {
bytePos = 0;
arrayPos++;
}
}
public void Marshal(BinaryWriter dataOut) {
if (arrayLimit < 64) {
dataOut.Write((byte) arrayLimit);
} else if (arrayLimit < 256) { // max value of unsigned byte
dataOut.Write((byte) 0xC0);
dataOut.Write((byte) arrayLimit);
} else {
dataOut.Write((byte) 0xE0);
dataOut.Write((short) arrayLimit);
}
dataOut.Write(data, 0, arrayLimit);
Clear();
}
public void Unmarshal(BinaryReader dataIn) {
arrayLimit = dataIn.ReadByte();
if ((arrayLimit & 0xE0) != 0) {
arrayLimit = dataIn.ReadInt16();
} else if ((arrayLimit & 0xC0) != 0) {
arrayLimit = (short) (dataIn.ReadByte() & 0xFF);
}
if (data.Length < arrayLimit) {
data = new byte[arrayLimit];
}
dataIn.Read(data, 0, arrayLimit);
Clear();
}
public void Clear() {
arrayPos = 0;
bytePos = 0;
}
public int MarshalledSize() {
if (arrayLimit < 64) {
return 1 + arrayLimit;
} else {
return 2 + arrayLimit;
}
}
namespace OpenWire.Client.Core
{
/// <summary>
/// Represents a stream of boolean flags
/// </summary>
public class BooleanStream
{
byte[] data = new byte[48];
short arrayLimit;
short arrayPos;
byte bytePos;
public bool ReadBoolean()
{
byte b = data[arrayPos];
bool rc = ((b >> bytePos) & 0x01) != 0;
bytePos++;
if (bytePos >= 8)
{
bytePos = 0;
arrayPos++;
}
return rc;
}
public void WriteBoolean(bool value)
{
if (bytePos == 0)
{
arrayLimit++;
if (arrayLimit >= data.Length)
{
// re-grow the array.
byte[] d = new byte[data.Length * 2];
for (int i = 0; i < data.Length; i++)
{
d[i] = data[i];
}
data = d;
}
}
if (value)
{
data[arrayPos] |= (byte) (0x01 << bytePos);
}
bytePos++;
if (bytePos >= 8)
{
bytePos = 0;
arrayPos++;
}
}
public void Marshal(BinaryWriter dataOut)
{
if (arrayLimit < 64)
{
dataOut.Write((byte) arrayLimit);
}
else if (arrayLimit < 256)
{ // max value of unsigned byte
dataOut.Write((byte) 0xC0);
dataOut.Write((byte) arrayLimit);
}
else
{
dataOut.Write((byte) 0xE0);
DataStreamMarshaller.WriteShort(arrayLimit, dataOut);
}
dataOut.Write(data, 0, arrayLimit);
Clear();
}
public void Unmarshal(BinaryReader dataIn)
{
arrayLimit = DataStreamMarshaller.ReadByte(dataIn);
if ((arrayLimit & 0xE0) != 0)
{
arrayLimit = DataStreamMarshaller.ReadShort(dataIn);
}
else if ((arrayLimit & 0xC0) != 0)
{
arrayLimit = (short) (DataStreamMarshaller.ReadByte(dataIn) & 0xFF);
}
if (data.Length < arrayLimit)
{
data = new byte[arrayLimit];
}
dataIn.Read(data, 0, arrayLimit);
Clear();
}
public void Clear()
{
arrayPos = 0;
bytePos = 0;
}
public int MarshalledSize()
{
if (arrayLimit < 64)
{
return 1 + arrayLimit;
}
else
{
return 2 + arrayLimit;
}
}
}
}

View File

@ -1,22 +1,23 @@
using System;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core {
/// <summary>
/// An OpenWire command
/// </summary>
public interface Command : DataStructure {
/* TODO
short CommandId {
get;
set;
}
bool ResponseRequired {
get;
set;
}
*/
}
namespace OpenWire.Client.Core
{
/// <summary>
/// An OpenWire command
/// </summary>
public interface Command : DataStructure
{
short CommandId
{
get;
set;
}
bool ResponseRequired
{
get;
set;
}
}
}

View File

@ -1,401 +1,516 @@
using System;
using System.IO;
using System.Net;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
using OpenWire.Client.IO;
namespace OpenWire.Client.Core {
/// <summary>
/// A base class with useful implementation inheritence methods
/// for creating marshallers of the OpenWire protocol
/// </summary>
public abstract class DataStreamMarshaller {
public abstract DataStructure CreateObject();
public abstract byte GetDataStructureType();
public virtual int Marshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs) {
return 0;
}
public virtual void Marshal2(
OpenWireFormat wireFormat,
Object o,
BinaryWriter dataOut,
BooleanStream bs) {
}
public virtual void Unmarshal(
OpenWireFormat wireFormat,
Object o,
BinaryReader dataIn,
BooleanStream bs) {
}
public virtual int Marshal1Long(OpenWireFormat wireFormat, long o, BooleanStream bs) {
if (o == 0L) {
bs.WriteBoolean(false);
bs.WriteBoolean(false);
return 0;
} else {
ulong ul = (ulong) o;
if ((ul & 0xFFFFFFFFFFFF0000ul) == 0L) {
bs.WriteBoolean(false);
bs.WriteBoolean(true);
return 2;
} else if ((ul & 0xFFFFFFFF00000000ul) == 0L) {
bs.WriteBoolean(true);
bs.WriteBoolean(false);
return 4;
} else {
bs.WriteBoolean(true);
bs.WriteBoolean(true);
return 8;
}
}
}
public virtual void Marshal2Long(
OpenWireFormat wireFormat,
long o,
BinaryWriter dataOut,
BooleanStream bs) {
if (bs.ReadBoolean()) {
if (bs.ReadBoolean()) {
dataOut.Write(o);
} else {
dataOut.Write((int) o);
}
} else {
if (bs.ReadBoolean()) {
dataOut.Write((short) o);
}
}
}
public virtual long UnmarshalLong(OpenWireFormat wireFormat, BinaryReader dataIn, BooleanStream bs) {
if (bs.ReadBoolean()) {
if (bs.ReadBoolean()) {
return dataIn.ReadInt64();
} else {
return dataIn.ReadInt32();
}
} else {
if (bs.ReadBoolean()) {
return dataIn.ReadInt16();
} else {
return 0;
}
}
}
protected virtual DataStructure UnmarshalNestedObject(
OpenWireFormat wireFormat,
BinaryReader dataIn,
BooleanStream bs) {
return wireFormat.UnmarshalNestedObject(dataIn, bs);
}
protected virtual int Marshal1NestedObject(
OpenWireFormat wireFormat,
DataStructure o,
BooleanStream bs) {
return wireFormat.Marshal1NestedObject(o, bs);
}
protected virtual void Marshal2NestedObject(
OpenWireFormat wireFormat,
DataStructure o,
BinaryWriter dataOut,
BooleanStream bs) {
wireFormat.Marshal2NestedObject(o, dataOut, bs);
}
protected virtual DataStructure UnmarshalCachedObject(
OpenWireFormat wireFormat,
BinaryReader dataIn,
BooleanStream bs) {
/*
if (wireFormat.isCacheEnabled()) {
if (bs.ReadBoolean()) {
short index = dataIn.ReadInt16();
DataStructure value = wireFormat.UnmarshalNestedObject(dataIn, bs);
wireFormat.setInUnmarshallCache(index, value);
return value;
} else {
short index = dataIn.ReadInt16();
return wireFormat.getFromUnmarshallCache(index);
}
} else {
return wireFormat.UnmarshalNestedObject(dataIn, bs);
}
*/
return wireFormat.UnmarshalNestedObject(dataIn, bs);
}
protected virtual int Marshal1CachedObject(
OpenWireFormat wireFormat,
DataStructure o,
BooleanStream bs) {
/*
if (wireFormat.isCacheEnabled()) {
Short index = wireFormat.getMarshallCacheIndex(o);
bs.WriteBoolean(index == null);
if (index == null) {
int rc = wireFormat.Marshal1NestedObject(o, bs);
wireFormat.addToMarshallCache(o);
return 2 + rc;
} else {
return 2;
}
} else {
return wireFormat.Marshal1NestedObject(o, bs);
}
*/
return wireFormat.Marshal1NestedObject(o, bs);
}
protected virtual void Marshal2CachedObject(
OpenWireFormat wireFormat,
DataStructure o,
BinaryWriter dataOut,
BooleanStream bs) {
/*
if (wireFormat.isCacheEnabled()) {
Short index = wireFormat.getMarshallCacheIndex(o);
if (bs.ReadBoolean()) {
dataOut.Write((short)index.shortValue());
wireFormat.Marshal2NestedObject(o, dataOut, bs);
} else {
dataOut.Write((short)index.shortValue());
}
} else {
wireFormat.Marshal2NestedObject(o, dataOut, bs);
}
*/
wireFormat.Marshal2NestedObject(o, dataOut, bs);
}
protected virtual String ReadString(BinaryReader dataIn, BooleanStream bs) {
if (bs.ReadBoolean()) {
if (bs.ReadBoolean()) {
int size = dataIn.ReadInt16();
byte[] data = new byte[size];
dataIn.Read(data, 0, size);
char[] text = new char[size];
for (int i = 0; i < size; i++) {
text[i] = (char) data[i];
}
return new String(text);
} else {
return dataIn.ReadString();
}
} else {
return null;
}
}
protected virtual int WriteString(String value, BooleanStream bs) {
bs.WriteBoolean(value != null);
if (value != null) {
int strlen = value.Length;
int utflen = 0;
int c = 0;
bool isOnlyAscii = true;
char[] charr = value.ToCharArray();
for (int i = 0; i < strlen; i++) {
c = charr[i];
if ((c >= 0x0001) && (c <= 0x007F)) {
utflen++;
} else if (c > 0x07FF) {
utflen += 3;
isOnlyAscii = false;
} else {
isOnlyAscii = false;
utflen += 2;
}
}
if (utflen >= Int16.MaxValue)
throw new IOException("Encountered a String value that is too long to encode.");
bs.WriteBoolean(isOnlyAscii);
return utflen + 2;
} else {
return 0;
}
}
protected virtual void WriteString(String value, BinaryWriter dataOut, BooleanStream bs) {
if (bs.ReadBoolean()) {
// If we verified it only holds ascii values
if (bs.ReadBoolean()) {
dataOut.Write((short) value.Length);
dataOut.Write(value);
} else {
dataOut.Write(value);
}
}
}
protected virtual int MarshalObjectArray(
OpenWireFormat wireFormat,
DataStructure[] objects,
BooleanStream bs) {
if (objects != null) {
int rc = 0;
bs.WriteBoolean(true);
rc += 2;
for (int i = 0; i < objects.Length; i++) {
rc += Marshal1NestedObject(wireFormat, objects[i], bs);
}
return rc;
} else {
bs.WriteBoolean(false);
return 0;
}
}
protected virtual void MarshalObjectArray(
OpenWireFormat wireFormat,
DataStructure[] objects,
BinaryWriter dataOut,
BooleanStream bs) {
if (bs.ReadBoolean()) {
dataOut.Write((short) objects.Length);
for (int i = 0; i < objects.Length; i++) {
Marshal2NestedObject(wireFormat, objects[i], dataOut, bs);
}
}
}
protected virtual byte[] ReadBytes(BinaryReader dataIn, bool flag) {
if (flag) {
int size = dataIn.ReadInt32();
return dataIn.ReadBytes(size);
} else {
return null;
}
}
protected virtual byte[] ReadBytes(BinaryReader dataIn) {
int size = dataIn.ReadInt32();
return dataIn.ReadBytes(size);
}
protected virtual byte[] ReadBytes(BinaryReader dataIn, int size) {
return dataIn.ReadBytes(size);
}
protected virtual void WriteBytes(byte[] command, BinaryWriter dataOut) {
dataOut.Write(command.Length);
dataOut.Write(command);
}
protected virtual BrokerError UnmarshalBrokerError(
OpenWireFormat wireFormat,
BinaryReader dataIn,
BooleanStream bs) {
if (bs.ReadBoolean()) {
String clazz = ReadString(dataIn, bs);
String message = ReadString(dataIn, bs);
BrokerError answer = new BrokerError();
answer.ExceptionClass = clazz;
answer.Message = message;
return answer;
} else {
return null;
}
}
protected int MarshalBrokerError(OpenWireFormat wireFormat, BrokerError o, BooleanStream bs) {
if (o == null) {
bs.WriteBoolean(false);
return 0;
} else {
int rc = 0;
bs.WriteBoolean(true);
rc += WriteString(o.ExceptionClass, bs);
rc += WriteString(o.Message, bs);
return rc;
}
}
protected void MarshalBrokerError(
OpenWireFormat wireFormat,
BrokerError o,
BinaryWriter dataOut,
BooleanStream bs) {
if (bs.ReadBoolean()) {
WriteString(o.ExceptionClass, dataOut, bs);
WriteString(o.Message, dataOut, bs);
}
}
/*
protected virtual ActiveMQDestination ReadDestination(BinaryReader dataIn) {
return (ActiveMQDestination) CommandMarshallerRegistry.ReadCommand(dataIn);
}
protected virtual void WriteDestination(ActiveMQDestination command, BinaryWriter dataOut) {
CommandMarshallerRegistry.WriteCommand(command, dataOut);
}
protected virtual BrokerId[] ReadBrokerIds(BinaryReader dataIn) {
int size = dataIn.ReadInt32();
BrokerId[] answer = new BrokerId[size];
for (int i = 0; i < size; i++) {
answer[i] = (BrokerId) CommandMarshallerRegistry.BrokerIdMarshaller.ReadCommand(dataIn);
}
return answer;
}
protected virtual void WriteBrokerIds(BrokerId[] commands, BinaryWriter dataOut) {
int size = commands.Length;
dataOut.Write(size);
for (int i = 0; i < size; i++) {
CommandMarshallerRegistry.BrokerIdMarshaller.WriteCommand(commands[i], dataOut);
}
}
protected virtual BrokerInfo[] ReadBrokerInfos(BinaryReader dataIn) {
int size = dataIn.ReadInt32();
BrokerInfo[] answer = new BrokerInfo[size];
for (int i = 0; i < size; i++) {
answer[i] = (BrokerInfo) CommandMarshallerRegistry
.BrokerInfoMarshaller
.ReadCommand(dataIn);
}
return answer;
}
protected virtual void WriteBrokerInfos(BrokerInfo[] commands, BinaryWriter dataOut) {
int size = commands.Length;
dataOut.Write(size);
for (int i = 0; i < size; i++) {
CommandMarshallerRegistry.BrokerInfoMarshaller.WriteCommand(commands[i], dataOut);
}
}
protected virtual DataStructure[] ReadDataStructures(BinaryReader dataIn) {
int size = dataIn.ReadInt32();
DataStructure[] answer = new DataStructure[size];
for (int i = 0; i < size; i++) {
answer[i] = (DataStructure) CommandMarshallerRegistry.ReadCommand(dataIn);
}
return answer;
}
protected virtual void WriteDataStructures(DataStructure[] commands, BinaryWriter dataOut) {
int size = commands.Length;
dataOut.Write(size);
for (int i = 0; i < size; i++) {
CommandMarshallerRegistry.WriteCommand((Command) commands[i], dataOut);
}
}
*/
namespace OpenWire.Client.Core
{
/// <summary>
/// A base class with useful implementation inheritence methods
/// for creating marshallers of the OpenWire protocol
/// </summary>
public abstract class DataStreamMarshaller
{
public abstract DataStructure CreateObject();
public abstract byte GetDataStructureType();
public virtual int Marshal1(OpenWireFormat wireFormat, Object o, BooleanStream bs)
{
return 0;
}
public virtual void Marshal2(
OpenWireFormat wireFormat,
Object o,
BinaryWriter dataOut,
BooleanStream bs)
{
}
public virtual void Unmarshal(
OpenWireFormat wireFormat,
Object o,
BinaryReader dataIn,
BooleanStream bs)
{
}
protected virtual DataStructure UnmarshalNestedObject(
OpenWireFormat wireFormat,
BinaryReader dataIn,
BooleanStream bs)
{
return wireFormat.UnmarshalNestedObject(dataIn, bs);
}
protected virtual int Marshal1NestedObject(
OpenWireFormat wireFormat,
DataStructure o,
BooleanStream bs)
{
return wireFormat.Marshal1NestedObject(o, bs);
}
protected virtual void Marshal2NestedObject(
OpenWireFormat wireFormat,
DataStructure o,
BinaryWriter dataOut,
BooleanStream bs)
{
wireFormat.Marshal2NestedObject(o, dataOut, bs);
}
protected virtual DataStructure UnmarshalCachedObject(
OpenWireFormat wireFormat,
BinaryReader dataIn,
BooleanStream bs)
{
/*
if (wireFormat.isCacheEnabled()) {
if (bs.ReadBoolean()) {
short index = dataInReadShort(dataIn)Int16();
DataStructure value = wireFormat.UnmarshalNestedObject(dataIn, bs);
wireFormat.setInUnmarshallCache(index, value);
return value;
} else {
short index = ReadShort(dataIn);
return wireFormat.getFromUnmarshallCache(index);
}
} else {
return wireFormat.UnmarshalNestedObject(dataIn, bs);
}
*/
return wireFormat.UnmarshalNestedObject(dataIn, bs);
}
protected virtual int Marshal1CachedObject(
OpenWireFormat wireFormat,
DataStructure o,
BooleanStream bs)
{
/*
if (wireFormat.isCacheEnabled()) {
Short index = wireFormat.getMarshallCacheIndex(o);
bs.WriteBoolean(index == null);
if (index == null) {
int rc = wireFormat.Marshal1NestedObject(o, bs);
wireFormat.addToMarshallCache(o);
return 2 + rc;
} else {
return 2;
}
} else {
return wireFormat.Marshal1NestedObject(o, bs);
}
*/
return wireFormat.Marshal1NestedObject(o, bs);
}
protected virtual void Marshal2CachedObject(
OpenWireFormat wireFormat,
DataStructure o,
BinaryWriter dataOut,
BooleanStream bs)
{
/*
if (wireFormat.isCacheEnabled()) {
Short index = wireFormat.getMarshallCacheIndex(o);
if (bs.ReadBoolean()) {
WriteShort(index.shortValue(), dataOut);
wireFormat.Marshal2NestedObject(o, dataOut, bs);
} else {
WriteShort(index.shortValue(), dataOut);
}
} else {
wireFormat.Marshal2NestedObject(o, dataOut, bs);
}
*/
wireFormat.Marshal2NestedObject(o, dataOut, bs);
}
protected virtual String ReadString(BinaryReader dataIn, BooleanStream bs)
{
if (bs.ReadBoolean())
{
if (bs.ReadBoolean())
{
int size = ReadShort(dataIn);
byte[] data = new byte[size];
dataIn.Read(data, 0, size);
char[] text = new char[size];
for (int i = 0; i < size; i++)
{
text[i] = (char) data[i];
}
return new String(text);
}
else
{
return dataIn.ReadString();
}
}
else
{
return null;
}
}
protected virtual int WriteString(String value, BooleanStream bs)
{
bs.WriteBoolean(value != null);
if (value != null)
{
int strlen = value.Length;
// TODO until we get UTF8 working, lets just force ASCII
bs.WriteBoolean(true);
return strlen + 2;
/*
int utflen = 0;
int c = 0;
bool isOnlyAscii = true;
char[] charr = value.ToCharArray();
for (int i = 0; i < strlen; i++)
{
c = charr[i];
if ((c >= 0x0001) && (c <= 0x007F))
{
utflen++;
}
else if (c > 0x07FF)
{
utflen += 3;
isOnlyAscii = false;
}
else
{
isOnlyAscii = false;
utflen += 2;
}
}
if (utflen >= Int16.MaxValue)
throw new IOException("Encountered a String value that is too long to encode.");
bs.WriteBoolean(isOnlyAscii);
return utflen + 2;
*/
}
else
{
return 0;
}
}
public static void WriteString(String value, BinaryWriter dataOut, BooleanStream bs)
{
if (bs.ReadBoolean())
{
// If we verified it only holds ascii values
if (bs.ReadBoolean())
{
WriteShort((short) value.Length, dataOut);
// now lets write the bytes
char[] chars = value.ToCharArray();
for (int i = 0; i < chars.Length; i++) {
WriteByte((byte) chars[i], dataOut);
}
}
else
{
// TODO how should we properly write a String so that Java will grok it???
dataOut.Write(value);
}
}
}
public static byte ReadByte(BinaryReader dataIn)
{
return dataIn.ReadByte();
}
public static char ReadChar(BinaryReader dataIn)
{
return (char) ReadShort(dataIn);
}
public static short ReadShort(BinaryReader dataIn)
{
return SwitchEndian(dataIn.ReadInt16());
}
public static int ReadInt(BinaryReader dataIn)
{
return SwitchEndian(dataIn.ReadInt32());
}
public static long ReadLong(BinaryReader dataIn)
{
return SwitchEndian(dataIn.ReadInt64());
}
public static void WriteByte(byte value, BinaryWriter dataOut)
{
dataOut.Write(value);
}
public static void WriteChar(char value, BinaryWriter dataOut)
{
dataOut.Write(SwitchEndian(value));
}
public static void WriteShort(short value, BinaryWriter dataOut)
{
dataOut.Write(SwitchEndian(value));
}
public static void WriteInt(int value, BinaryWriter dataOut)
{
dataOut.Write(SwitchEndian(value));
}
/// <summary>
/// Switches from one endian to the other
/// </summary>
/// <param name="value">An int</param>
/// <returns>An int</retutns>
public static int SwitchEndian(int x)
{
return ((x << 24) | ((x & 0xff00) << 8) | ((x & 0xff0000) >> 8) | (x >> 24));
}
public static short SwitchEndian(short x)
{
int low = x & 0xff;
int high = x & 0xff00;
return(short)(high >> 8 | low << 8);
}
public static long SwitchEndian(long x)
{
long answer = 0;
for (int i = 0; i < 8; i++) {
long lowest = x & 0xff;
x >>= 8;
answer <<= 8;
answer += lowest;
}
return answer;
}
public static void WriteLong(long value, BinaryWriter dataOut)
{
dataOut.Write(IPAddress.HostToNetworkOrder(value));
}
public virtual int Marshal1Long(OpenWireFormat wireFormat, long o, BooleanStream bs)
{
if (o == 0L)
{
bs.WriteBoolean(false);
bs.WriteBoolean(false);
return 0;
}
else
{
ulong ul = (ulong) o;
if ((ul & 0xFFFFFFFFFFFF0000ul) == 0L)
{
bs.WriteBoolean(false);
bs.WriteBoolean(true);
return 2;
}
else if ((ul & 0xFFFFFFFF00000000ul) == 0L)
{
bs.WriteBoolean(true);
bs.WriteBoolean(false);
return 4;
}
else
{
bs.WriteBoolean(true);
bs.WriteBoolean(true);
return 8;
}
}
}
public virtual void Marshal2Long(
OpenWireFormat wireFormat,
long o,
BinaryWriter dataOut,
BooleanStream bs)
{
if (bs.ReadBoolean())
{
if (bs.ReadBoolean())
{
WriteLong(o, dataOut);
}
else
{
WriteInt((int) o, dataOut);
}
}
else
{
if (bs.ReadBoolean())
{
WriteShort((short) o, dataOut);
}
}
}
public virtual long UnmarshalLong(OpenWireFormat wireFormat, BinaryReader dataIn, BooleanStream bs)
{
if (bs.ReadBoolean())
{
if (bs.ReadBoolean())
{
return ReadLong(dataIn);
}
else
{
return ReadInt(dataIn);
}
}
else
{
if (bs.ReadBoolean())
{
return ReadShort(dataIn);
}
else
{
return 0;
}
}
}
protected virtual int MarshalObjectArray(
OpenWireFormat wireFormat,
DataStructure[] objects,
BooleanStream bs)
{
if (objects != null)
{
int rc = 0;
bs.WriteBoolean(true);
rc += 2;
for (int i = 0; i < objects.Length; i++)
{
rc += Marshal1NestedObject(wireFormat, objects[i], bs);
}
return rc;
}
else
{
bs.WriteBoolean(false);
return 0;
}
}
protected virtual void MarshalObjectArray(
OpenWireFormat wireFormat,
DataStructure[] objects,
BinaryWriter dataOut,
BooleanStream bs)
{
if (bs.ReadBoolean())
{
WriteShort((short) objects.Length, dataOut);
for (int i = 0; i < objects.Length; i++)
{
Marshal2NestedObject(wireFormat, objects[i], dataOut, bs);
}
}
}
protected virtual byte[] ReadBytes(BinaryReader dataIn, bool flag)
{
if (flag)
{
int size = ReadInt(dataIn);
return dataIn.ReadBytes(size);
}
else
{
return null;
}
}
protected virtual byte[] ReadBytes(BinaryReader dataIn)
{
int size = ReadInt(dataIn);
return dataIn.ReadBytes(size);
}
protected virtual byte[] ReadBytes(BinaryReader dataIn, int size)
{
return dataIn.ReadBytes(size);
}
protected virtual void WriteBytes(byte[] command, BinaryWriter dataOut)
{
WriteInt(command.Length, dataOut);
dataOut.Write(command);
}
protected virtual BrokerError UnmarshalBrokerError(
OpenWireFormat wireFormat,
BinaryReader dataIn,
BooleanStream bs)
{
if (bs.ReadBoolean())
{
String clazz = ReadString(dataIn, bs);
String message = ReadString(dataIn, bs);
BrokerError answer = new BrokerError();
answer.ExceptionClass = clazz;
answer.Message = message;
return answer;
}
else
{
return null;
}
}
protected int MarshalBrokerError(OpenWireFormat wireFormat, BrokerError o, BooleanStream bs)
{
if (o == null)
{
bs.WriteBoolean(false);
return 0;
}
else
{
int rc = 0;
bs.WriteBoolean(true);
rc += WriteString(o.ExceptionClass, bs);
rc += WriteString(o.Message, bs);
return rc;
}
}
protected void MarshalBrokerError(
OpenWireFormat wireFormat,
BrokerError o,
BinaryWriter dataOut,
BooleanStream bs)
{
if (bs.ReadBoolean())
{
WriteString(o.ExceptionClass, dataOut, bs);
WriteString(o.Message, dataOut, bs);
}
}
}
}

View File

@ -4,52 +4,67 @@ using System.Threading;
using OpenWire.Client;
using OpenWire.Client.Commands;
namespace OpenWire.Client.Core {
/// <summary>
/// Handles asynchronous responses
/// </summary>
public class FutureResponse : IAsyncResult {
private Response response;
private Mutex asyncWaitHandle = new Mutex();
private Object semaphore = new Object();
private int maxWait = 3000;
private bool isCompleted;
public WaitHandle AsyncWaitHandle {
get { return asyncWaitHandle; }
namespace OpenWire.Client.Core
{
/// <summary>
/// Handles asynchronous responses
/// </summary>
public class FutureResponse : IAsyncResult
{
private Response response;
private Mutex asyncWaitHandle = new Mutex();
private Object semaphore = new Object();
private int maxWait = 3000;
private bool isCompleted;
public WaitHandle AsyncWaitHandle
{
get { return asyncWaitHandle; }
}
public object AsyncState
{
get { return response; }
set { Response = (Response) value; }
}
public bool IsCompleted
{
get { return isCompleted; }
}
public bool CompletedSynchronously
{
get { return false; }
}
public Response Response
{
// Blocks the caller until a value has been set
get {
while (response == null)
{
try {
lock (semaphore)
{
Monitor.Wait(semaphore, maxWait);
}
}
catch (Exception e) {
Console.WriteLine("Caught while waiting on monitor: " + e);
}
}
public object AsyncState {
get { return response; }
set { Response = (Response) value; }
return response;
}
set {
lock (semaphore)
{
response = value;
isCompleted = true;
Monitor.PulseAll(semaphore);
}
public bool IsCompleted {
get { return isCompleted; }
}
public bool CompletedSynchronously {
get { return false; }
}
public Response Response {
// Blocks the caller until a value has been set
get {
lock (semaphore) {
while (response == null) {
Monitor.Wait(semaphore, maxWait);
}
return response;
}
}
set {
lock (semaphore) {
response = value;
isCompleted = true;
Monitor.PulseAll(semaphore);
}
}
}
}
}
}
}
}

View File

@ -5,129 +5,166 @@ using OpenWire.Client.Commands;
using OpenWire.Client.Core;
using OpenWire.Client.IO;
namespace OpenWire.Client.Core {
/// <summary>
/// Represents the wire format
/// </summary>
public class OpenWireFormat {
private DataStreamMarshaller[] dataMarshallers;
private const byte NULL_TYPE = 0;
public void addMarshaller(DataStreamMarshaller marshaller)
namespace OpenWire.Client.Core
{
/// <summary>
/// Represents the wire format
/// </summary>
public class OpenWireFormat
{
private DataStreamMarshaller[] dataMarshallers;
private const byte NULL_TYPE = 0;
public OpenWireFormat()
{
dataMarshallers = new DataStreamMarshaller[256];
MarshallerFactory factory = new MarshallerFactory();
factory.configure(this);
}
public void addMarshaller(DataStreamMarshaller marshaller)
{
byte type = marshaller.GetDataStructureType();
dataMarshallers[type & 0xFF] = marshaller;
}
public void Marshal(Object o, BinaryWriter ds)
{
int size = 1;
if (o != null)
{
DataStructure c = (DataStructure) o;
byte type = c.GetDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
BooleanStream bs = new BooleanStream();
size += dsm.Marshal1(this, c, bs);
size += bs.MarshalledSize();
DataStreamMarshaller.WriteInt(size, ds);
DataStreamMarshaller.WriteByte(type, ds);
bs.Marshal(ds);
dsm.Marshal2(this, c, ds, bs);
}
else
{
DataStreamMarshaller.WriteInt(size, ds);
DataStreamMarshaller.WriteByte(NULL_TYPE, ds);
}
}
public Object Unmarshal(BinaryReader dis)
{
int size = DataStreamMarshaller.ReadInt(dis);
byte dataType = DataStreamMarshaller.ReadByte(dis);
if (dataType != NULL_TYPE)
{
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + dataType);
Console.WriteLine("Parsing type: " + dataType + " with: " + dsm);
Object data = dsm.CreateObject();
BooleanStream bs = new BooleanStream();
bs.Unmarshal(dis);
dsm.Unmarshal(this, data, dis, bs);
return data;
}
else
{
return null;
}
}
public int Marshal1NestedObject(DataStructure o, BooleanStream bs)
{
bs.WriteBoolean(o != null);
if (o == null)
return 0;
if (o.IsMarshallAware())
{
MarshallAware ma = (MarshallAware) o;
byte[] sequence = ma.GetMarshalledForm(this);
bs.WriteBoolean(sequence != null);
if (sequence != null)
{
dataMarshallers[marshaller.GetDataStructureType()] = marshaller;
return 1 + sequence.Length;
}
}
byte type = o.GetDataStructureType();
if (type == 0) {
throw new IOException("No valid data structure type for: " + o + " of type: " + o.GetType());
}
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
Console.WriteLine("Marshalling type: " + type + " with structure: " + o);
return 1 + dsm.Marshal1(this, o, bs);
}
public void Marshal2NestedObject(DataStructure o, BinaryWriter ds, BooleanStream bs)
{
if (!bs.ReadBoolean())
return ;
byte type = o.GetDataStructureType();
DataStreamMarshaller.WriteByte(type, ds);
if (o.IsMarshallAware() && bs.ReadBoolean())
{
MarshallAware ma = (MarshallAware) o;
byte[] sequence = ma.GetMarshalledForm(this);
ds.Write(sequence, 0, sequence.Length);
}
else
{
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
dsm.Marshal2(this, o, ds, bs);
}
}
public DataStructure UnmarshalNestedObject(BinaryReader dis, BooleanStream bs)
{
if (bs.ReadBoolean())
{
byte dataType = DataStreamMarshaller.ReadByte(dis);
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + dataType);
DataStructure data = dsm.CreateObject();
if (data.IsMarshallAware() && bs.ReadBoolean())
{
DataStreamMarshaller.ReadInt(dis);
DataStreamMarshaller.ReadByte(dis);
BooleanStream bs2 = new BooleanStream();
bs2.Unmarshal(dis);
dsm.Unmarshal(this, data, dis, bs2);
// TODO: extract the sequence from the dis and associate it.
// MarshallAware ma = (MarshallAware)data
// ma.setCachedMarshalledForm(this, sequence);
}
else
{
dsm.Unmarshal(this, data, dis, bs);
}
public void Marshal(Object o, BinaryWriter ds) {
int size = 1;
if (o != null) {
DataStructure c = (DataStructure) o;
byte type = c.GetDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
BooleanStream bs = new BooleanStream();
size += dsm.Marshal1(this, c, bs);
size += bs.MarshalledSize();
ds.Write(size);
ds.Write(type);
bs.Marshal(ds);
dsm.Marshal2(this, c, ds, bs);
} else {
ds.Write(size);
ds.Write(NULL_TYPE);
}
}
public Object Unmarshal(BinaryReader dis) {
byte dataType = dis.ReadByte();
if (dataType != NULL_TYPE) {
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + dataType);
Object data = dsm.CreateObject();
BooleanStream bs = new BooleanStream();
bs.Unmarshal(dis);
dsm.Unmarshal(this, data, dis, bs);
return data;
} else {
return null;
}
}
public int Marshal1NestedObject(DataStructure o, BooleanStream bs) {
bs.WriteBoolean(o != null);
if (o == null)
return 0;
if (o.IsMarshallAware()) {
MarshallAware ma = (MarshallAware) o;
byte[] sequence = ma.GetMarshalledForm(this);
bs.WriteBoolean(sequence != null);
if (sequence != null) {
return 1 + sequence.Length;
}
}
byte type = o.GetDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
return 1 + dsm.Marshal1(this, o, bs);
}
public void Marshal2NestedObject(DataStructure o, BinaryWriter ds, BooleanStream bs) {
if (!bs.ReadBoolean())
return ;
byte type = o.GetDataStructureType();
ds.Write(type);
if (o.IsMarshallAware() && bs.ReadBoolean()) {
MarshallAware ma = (MarshallAware) o;
byte[] sequence = ma.GetMarshalledForm(this);
ds.Write(sequence, 0, sequence.Length);
} else {
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
dsm.Marshal2(this, o, ds, bs);
}
}
public DataStructure UnmarshalNestedObject(BinaryReader dis, BooleanStream bs) {
if (bs.ReadBoolean()) {
byte dataType = dis.ReadByte();
DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + dataType);
DataStructure data = dsm.CreateObject();
if (data.IsMarshallAware() && bs.ReadBoolean()) {
dis.ReadInt32();
dis.ReadByte();
BooleanStream bs2 = new BooleanStream();
bs2.Unmarshal(dis);
dsm.Unmarshal(this, data, dis, bs2);
// TODO: extract the sequence from the dis and associate it.
// MarshallAware ma = (MarshallAware)data
// ma.setCachedMarshalledForm(this, sequence);
} else {
dsm.Unmarshal(this, data, dis, bs);
}
return data;
} else {
return null;
}
}
return data;
}
else
{
return null;
}
}
}
}

View File

@ -12,134 +12,193 @@ using OpenWire.Client.Commands;
using OpenWire.Client.Core;
using OpenWire.Client.IO;
namespace OpenWire.Client.Core {
/// <summary>
/// An implementation of ITransport that uses sockets to communicate with the broker
/// </summary>
public class SocketTransport : ITransport {
private readonly object transmissionLock = new object();
private readonly Socket socket;
private readonly BinaryReader socketReader;
private readonly BinaryWriter socketWriter;
private readonly Thread readThread;
private bool closed;
private IDictionary requestMap = new Hashtable(); // TODO threadsafe
private short nextCommandId;
public event CommandHandler Command;
public event ExceptionHandler Exception;
private OpenWireFormat wireformat = new OpenWireFormat();
public SocketTransport(string host, int port) {
Console.WriteLine("Opening socket to: " + host + " on port: " + port);
socket = Connect(host, port);
socketWriter = new BinaryWriter(new NetworkStream(socket));
socketReader = new BinaryReader(new NetworkStream(socket));
// now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop));
readThread.Start();
}
public void Oneway(Command command) {
BaseCommand baseCommand = (BaseCommand) command;
baseCommand.CommandId = GetNextCommandId();
baseCommand.ResponseRequired = false;
Send(command);
}
public FutureResponse AsyncRequest(Command command) {
BaseCommand baseCommand = (BaseCommand) command;
baseCommand.CommandId = GetNextCommandId();
baseCommand.ResponseRequired = true;
Send(command);
FutureResponse future = new FutureResponse();
requestMap[baseCommand.CommandId] = future;
return future;
}
public Response Request(Command command) {
FutureResponse response = AsyncRequest(command);
return response.Response;
}
public void Dispose() {
Console.WriteLine("Closing the socket");
lock (transmissionLock) {
socket.Close();
closed = true;
namespace OpenWire.Client.Core
{
/// <summary>
/// An implementation of ITransport that uses sockets to communicate with the broker
/// </summary>
public class SocketTransport : ITransport
{
private readonly object transmissionLock = new object();
private readonly Socket socket;
private OpenWireFormat wireformat = new OpenWireFormat();
private readonly BinaryReader socketReader;
private readonly BinaryWriter socketWriter;
private readonly Thread readThread;
private bool closed;
private IDictionary requestMap = new Hashtable(); // TODO threadsafe
private short nextCommandId;
public event CommandHandler Command;
public event ExceptionHandler Exception;
public SocketTransport(string host, int port)
{
Console.WriteLine("Opening socket to: " + host + " on port: " + port);
socket = Connect(host, port);
NetworkStream networkStream = new NetworkStream(socket);
socketWriter = new BinaryWriter(networkStream);
socketReader = new BinaryReader(networkStream);
/*
socketWriter = new BinaryWriter(new NetworkStream(socket));
socketReader = new BinaryReader(new NetworkStream(socket));
*/
// now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop));
readThread.Start();
}
public void Oneway(Command command)
{
command.CommandId = GetNextCommandId();
command.ResponseRequired = false;
Send(command);
}
public FutureResponse AsyncRequest(Command command)
{
command.CommandId = GetNextCommandId();
command.ResponseRequired = true;
Send(command);
FutureResponse future = new FutureResponse();
requestMap[command.CommandId] = future;
return future;
}
public Response Request(Command command)
{
FutureResponse response = AsyncRequest(command);
return response.Response;
}
public void Dispose()
{
Console.WriteLine("Closing the socket");
lock (transmissionLock)
{
socket.Close();
closed = true;
}
socketWriter.Close();
socketReader.Close();
}
public void ReadLoop()
{
Console.WriteLine("Starting to read commands from ActiveMQ");
while (!closed)
{
Command command = null;
try
{
command = (Command) wireformat.Unmarshal(socketReader);
if (command != null)
{
Console.WriteLine("Received command: " + command);
if (command is RemoveInfo)
{
RemoveInfo info = (RemoveInfo) command;
Console.WriteLine("Remove CommandId: " + info.CommandId);
Console.WriteLine("Remove ObjectID: " + info.ObjectId);
}
socketWriter.Close();
socketReader.Close();
}
}
public void ReadLoop() {
Console.WriteLine("Starting to read commands from ActiveMQ");
while (!closed) {
BaseCommand command = null;
try {
command = (BaseCommand) wireformat.Unmarshal(socketReader);
} catch (ObjectDisposedException e) {
// stream closed
break;
}
if (command is Response) {
Console.WriteLine("Received response!: " + command);
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap[response.CommandId];
if (future != null) {
if (response is ExceptionResponse) {
ExceptionResponse er = (ExceptionResponse) response;
if (this.Exception != null) {
Exception e = new BrokerException(er.Exception);
this.Exception(this, e);
}
} else {
future.Response = response;
}
} else {
Console.WriteLine("Unknown response ID: " + response.CommandId);
}
} else {
if (this.Command != null) {
this.Command(this, command);
}
}
}
catch (EndOfStreamException e)
{
// stream closed
break;
}
// Implementation methods
protected void Send(Command command) {
lock (transmissionLock) {
wireformat.Marshal(command, socketWriter);
socketWriter.Flush();
}
catch (ObjectDisposedException e)
{
// stream closed
break;
}
protected short GetNextCommandId() {
lock (transmissionLock) {
return++nextCommandId;
}
}
protected Socket Connect(string host, int port) {
// Looping through the AddressList allows different type of connections to be tried
// (IPv4, IPv6 and whatever else may be available).
IPHostEntry hostEntry = Dns.Resolve(host);
foreach (IPAddress address in hostEntry.AddressList) {
Socket socket = new Socket(
address.AddressFamily,
SocketType.Stream,
ProtocolType.Tcp);
socket.Connect(new IPEndPoint(address, port));
if (socket.Connected) {
return socket;
}
if (command is Response)
{
Console.WriteLine("Received response!: " + command);
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap[response.CommandId];
if (future != null)
{
if (response is ExceptionResponse)
{
ExceptionResponse er = (ExceptionResponse) response;
Exception e = new BrokerException(er.Exception);
if (this.Exception != null)
{
this.Exception(this, e);
}
else
{
throw e;
}
}
throw new SocketException();
}
}
else
{
future.Response = response;
}
}
else
{
Console.WriteLine("Unknown response ID: " + response.CommandId);
}
}
else
{
if (this.Command != null)
{
this.Command(this, command);
}
else
{
Console.WriteLine("No handler available to process command: " + command);
}
}
}
}
// Implementation methods
protected void Send(Command command)
{
lock (transmissionLock)
{
Console.WriteLine("Sending command: " + command + " with ID: " + command.CommandId + " response: " + command.ResponseRequired);
wireformat.Marshal(command, socketWriter);
socketWriter.Flush();
}
}
protected short GetNextCommandId()
{
lock (transmissionLock)
{
return++nextCommandId;
}
}
protected Socket Connect(string host, int port)
{
// Looping through the AddressList allows different type of connections to be tried
// (IPv4, IPv6 and whatever else may be available).
IPHostEntry hostEntry = Dns.Resolve(host);
foreach (IPAddress address in hostEntry.AddressList)
{
Socket socket = new Socket(
address.AddressFamily,
SocketType.Stream,
ProtocolType.Tcp);
socket.Connect(new IPEndPoint(address, port));
if (socket.Connected)
{
return socket;
}
}
throw new SocketException();
}
}
}

View File

@ -1,21 +1,30 @@
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
namespace OpenWire.Client
{
public delegate void MessageHandler(IMessage message);
/// <summary>
/// A consumer of messages
/// </summary>
public interface IMessageConsumer : IDisposable
{
/// <summary>
/// A consumer of messages
/// Waits until a message is available and returns it
/// </summary>
public interface IMessageConsumer : IDisposable {
/// <summary>
/// Waits until a message is available and returns it
/// </summary>
IMessage Receive();
/// <summary>
/// If a message is available immediately it is returned otherwise this method returns null
/// </summary>
IMessage ReceiveNoWait();
}
IMessage Receive();
/// <summary>
/// If a message is available immediately it is returned otherwise this method returns null
/// </summary>
IMessage ReceiveNoWait();
/// <summary>
/// An asynchronous listener which can be used to consume messages asynchronously
/// </summary>
event MessageHandler Listener;
}
}

View File

@ -45,7 +45,7 @@ namespace OpenWire.Client.IO
base.Unmarshal(wireFormat, o, dataIn, bs);
BaseCommand info = (BaseCommand)o;
info.CommandId = dataIn.ReadInt16();
info.CommandId = DataStreamMarshaller.ReadShort(dataIn);
info.ResponseRequired = bs.ReadBoolean();
}
@ -70,7 +70,7 @@ namespace OpenWire.Client.IO
base.Marshal2(wireFormat, o, dataOut, bs);
BaseCommand info = (BaseCommand)o;
dataOut.Write((short)info.CommandId);
DataStreamMarshaller.WriteShort(info.CommandId, dataOut);
bs.ReadBoolean();
}

View File

@ -60,7 +60,7 @@ namespace OpenWire.Client.IO
info.BrokerURL = ReadString(dataIn, bs);
if (bs.ReadBoolean()) {
short size = dataIn.ReadInt16();
short size = DataStreamMarshaller.ReadShort(dataIn);
BrokerInfo[] value = new BrokerInfo[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerInfo) UnmarshalNestedObject(wireFormat,dataIn, bs);

View File

@ -62,7 +62,7 @@ namespace OpenWire.Client.IO
info.UserName = ReadString(dataIn, bs);
if (bs.ReadBoolean()) {
short size = dataIn.ReadInt16();
short size = DataStreamMarshaller.ReadShort(dataIn);
BrokerId[] value = new BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs);

View File

@ -59,17 +59,17 @@ namespace OpenWire.Client.IO
info.ConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.Browser = bs.ReadBoolean();
info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.PrefetchSize = dataIn.ReadInt32();
info.PrefetchSize = DataStreamMarshaller.ReadInt(dataIn);
info.DispatchAsync = bs.ReadBoolean();
info.Selector = ReadString(dataIn, bs);
info.SubcriptionName = ReadString(dataIn, bs);
info.NoLocal = bs.ReadBoolean();
info.Exclusive = bs.ReadBoolean();
info.Retroactive = bs.ReadBoolean();
info.Priority = dataIn.ReadByte();
info.Priority = DataStreamMarshaller.ReadByte(dataIn);
if (bs.ReadBoolean()) {
short size = dataIn.ReadInt16();
short size = DataStreamMarshaller.ReadShort(dataIn);
BrokerId[] value = new BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs);
@ -116,14 +116,14 @@ namespace OpenWire.Client.IO
Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs);
bs.ReadBoolean();
Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs);
dataOut.Write((int) info.PrefetchSize);
DataStreamMarshaller.WriteInt(info.PrefetchSize, dataOut);
bs.ReadBoolean();
WriteString(info.Selector, dataOut, bs);
WriteString(info.SubcriptionName, dataOut, bs);
bs.ReadBoolean();
bs.ReadBoolean();
bs.ReadBoolean();
dataOut.Write((byte) info.Priority);
DataStreamMarshaller.WriteByte(info.Priority, dataOut);
MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs);
bs.ReadBoolean();

View File

@ -58,7 +58,7 @@ namespace OpenWire.Client.IO
DataArrayResponse info = (DataArrayResponse)o;
if (bs.ReadBoolean()) {
short size = dataIn.ReadInt16();
short size = DataStreamMarshaller.ReadShort(dataIn);
DataStructure[] value = new DataStructure[size];
for( int i=0; i < size; i++ ) {
value[i] = (DataStructure) UnmarshalNestedObject(wireFormat,dataIn, bs);

View File

@ -58,11 +58,11 @@ namespace OpenWire.Client.IO
DestinationInfo info = (DestinationInfo)o;
info.ConnectionId = (ConnectionId) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.OperationType = dataIn.ReadByte();
info.OperationType = DataStreamMarshaller.ReadByte(dataIn);
info.Timeout = UnmarshalLong(wireFormat, dataIn, bs);
if (bs.ReadBoolean()) {
short size = dataIn.ReadInt16();
short size = DataStreamMarshaller.ReadShort(dataIn);
BrokerId[] value = new BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs);
@ -100,7 +100,7 @@ namespace OpenWire.Client.IO
DestinationInfo info = (DestinationInfo)o;
Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs);
Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs);
dataOut.Write((byte) info.OperationType);
DataStreamMarshaller.WriteByte(info.OperationType, dataOut);
Marshal2Long(wireFormat, info.Timeout, dataOut, bs);
MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs);

View File

@ -56,7 +56,7 @@ namespace OpenWire.Client.IO
base.Unmarshal(wireFormat, o, dataIn, bs);
IntegerResponse info = (IntegerResponse)o;
info.Result = dataIn.ReadInt32();
info.Result = DataStreamMarshaller.ReadInt(dataIn);
}
@ -79,7 +79,7 @@ namespace OpenWire.Client.IO
base.Marshal2(wireFormat, o, dataOut, bs);
IntegerResponse info = (IntegerResponse)o;
dataOut.Write((int) info.Result);
DataStreamMarshaller.WriteInt(info.Result, dataOut);
}
}

View File

@ -57,7 +57,7 @@ namespace OpenWire.Client.IO
JournalTransaction info = (JournalTransaction)o;
info.TransactionId = (TransactionId) UnmarshalNestedObject(wireFormat, dataIn, bs);
info.Type = dataIn.ReadByte();
info.Type = DataStreamMarshaller.ReadByte(dataIn);
info.WasPrepared = bs.ReadBoolean();
}
@ -84,7 +84,7 @@ namespace OpenWire.Client.IO
JournalTransaction info = (JournalTransaction)o;
Marshal2NestedObject(wireFormat, info.TransactionId, dataOut, bs);
dataOut.Write((byte) info.Type);
DataStreamMarshaller.WriteByte(info.Type, dataOut);
bs.ReadBoolean();
}

View File

@ -59,10 +59,10 @@ namespace OpenWire.Client.IO
info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.TransactionId = (TransactionId) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.ConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.AckType = dataIn.ReadByte();
info.AckType = DataStreamMarshaller.ReadByte(dataIn);
info.FirstMessageId = (MessageId) UnmarshalNestedObject(wireFormat, dataIn, bs);
info.LastMessageId = (MessageId) UnmarshalNestedObject(wireFormat, dataIn, bs);
info.MessageCount = dataIn.ReadInt32();
info.MessageCount = DataStreamMarshaller.ReadInt(dataIn);
}
@ -93,10 +93,10 @@ namespace OpenWire.Client.IO
Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs);
Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs);
Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs);
dataOut.Write((byte) info.AckType);
DataStreamMarshaller.WriteByte(info.AckType, dataOut);
Marshal2NestedObject(wireFormat, info.FirstMessageId, dataOut, bs);
Marshal2NestedObject(wireFormat, info.LastMessageId, dataOut, bs);
dataOut.Write((int) info.MessageCount);
DataStreamMarshaller.WriteInt(info.MessageCount, dataOut);
}
}

View File

@ -59,7 +59,7 @@ namespace OpenWire.Client.IO
info.ConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.Message = (Message) UnmarshalNestedObject(wireFormat, dataIn, bs);
info.RedeliveryCounter = dataIn.ReadInt32();
info.RedeliveryCounter = DataStreamMarshaller.ReadInt(dataIn);
}
@ -88,7 +88,7 @@ namespace OpenWire.Client.IO
Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs);
Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs);
Marshal2NestedObject(wireFormat, info.Message, dataOut, bs);
dataOut.Write((int) info.RedeliveryCounter);
DataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut);
}
}

View File

@ -52,11 +52,11 @@ namespace OpenWire.Client.IO
info.MessageId = (MessageId) UnmarshalNestedObject(wireFormat, dataIn, bs);
info.OriginalTransactionId = (TransactionId) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.GroupID = ReadString(dataIn, bs);
info.GroupSequence = dataIn.ReadInt32();
info.GroupSequence = DataStreamMarshaller.ReadInt(dataIn);
info.CorrelationId = ReadString(dataIn, bs);
info.Persistent = bs.ReadBoolean();
info.Expiration = UnmarshalLong(wireFormat, dataIn, bs);
info.Priority = dataIn.ReadByte();
info.Priority = DataStreamMarshaller.ReadByte(dataIn);
info.ReplyTo = (ActiveMQDestination) UnmarshalNestedObject(wireFormat, dataIn, bs);
info.Timestamp = UnmarshalLong(wireFormat, dataIn, bs);
info.Type = ReadString(dataIn, bs);
@ -65,10 +65,10 @@ namespace OpenWire.Client.IO
info.DataStructure = (DataStructure) UnmarshalNestedObject(wireFormat, dataIn, bs);
info.TargetConsumerId = (ConsumerId) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.Compressed = bs.ReadBoolean();
info.RedeliveryCounter = dataIn.ReadInt32();
info.RedeliveryCounter = DataStreamMarshaller.ReadInt(dataIn);
if (bs.ReadBoolean()) {
short size = dataIn.ReadInt16();
short size = DataStreamMarshaller.ReadShort(dataIn);
BrokerId[] value = new BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs);
@ -134,26 +134,26 @@ namespace OpenWire.Client.IO
Marshal2NestedObject(wireFormat, info.MessageId, dataOut, bs);
Marshal2CachedObject(wireFormat, info.OriginalTransactionId, dataOut, bs);
WriteString(info.GroupID, dataOut, bs);
dataOut.Write((int) info.GroupSequence);
DataStreamMarshaller.WriteInt(info.GroupSequence, dataOut);
WriteString(info.CorrelationId, dataOut, bs);
bs.ReadBoolean();
Marshal2Long(wireFormat, info.Expiration, dataOut, bs);
dataOut.Write((byte) info.Priority);
DataStreamMarshaller.WriteByte(info.Priority, dataOut);
Marshal2NestedObject(wireFormat, info.ReplyTo, dataOut, bs);
Marshal2Long(wireFormat, info.Timestamp, dataOut, bs);
WriteString(info.Type, dataOut, bs);
if(bs.ReadBoolean()) {
dataOut.Write((int)info.Content.Length);
DataStreamMarshaller.WriteInt(info.Content.Length, dataOut);
dataOut.Write(info.Content);
}
if(bs.ReadBoolean()) {
dataOut.Write((int)info.MarshalledProperties.Length);
DataStreamMarshaller.WriteInt(info.MarshalledProperties.Length, dataOut);
dataOut.Write(info.MarshalledProperties);
}
Marshal2NestedObject(wireFormat, info.DataStructure, dataOut, bs);
Marshal2CachedObject(wireFormat, info.TargetConsumerId, dataOut, bs);
bs.ReadBoolean();
dataOut.Write((int) info.RedeliveryCounter);
DataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut);
MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs);
Marshal2Long(wireFormat, info.Arrival, dataOut, bs);
WriteString(info.UserID, dataOut, bs);

View File

@ -60,7 +60,7 @@ namespace OpenWire.Client.IO
info.Destination = (ActiveMQDestination) UnmarshalCachedObject(wireFormat, dataIn, bs);
if (bs.ReadBoolean()) {
short size = dataIn.ReadInt16();
short size = DataStreamMarshaller.ReadShort(dataIn);
BrokerId[] value = new BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerId) UnmarshalNestedObject(wireFormat,dataIn, bs);

View File

@ -56,7 +56,7 @@ namespace OpenWire.Client.IO
base.Unmarshal(wireFormat, o, dataIn, bs);
Response info = (Response)o;
info.CorrelationId = dataIn.ReadInt16();
info.CorrelationId = DataStreamMarshaller.ReadShort(dataIn);
}
@ -79,7 +79,7 @@ namespace OpenWire.Client.IO
base.Marshal2(wireFormat, o, dataOut, bs);
Response info = (Response)o;
dataOut.Write((short)info.CorrelationId);
DataStreamMarshaller.WriteShort(info.CorrelationId, dataOut);
}
}

View File

@ -58,7 +58,7 @@ namespace OpenWire.Client.IO
TransactionInfo info = (TransactionInfo)o;
info.ConnectionId = (ConnectionId) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.TransactionId = (TransactionId) UnmarshalCachedObject(wireFormat, dataIn, bs);
info.Type = dataIn.ReadByte();
info.Type = DataStreamMarshaller.ReadByte(dataIn);
}
@ -85,7 +85,7 @@ namespace OpenWire.Client.IO
TransactionInfo info = (TransactionInfo)o;
Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs);
Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs);
dataOut.Write((byte) info.Type);
DataStreamMarshaller.WriteByte(info.Type, dataOut);
}
}

View File

@ -57,8 +57,8 @@ namespace OpenWire.Client.IO
WireFormatInfo info = (WireFormatInfo)o;
info.Magic = ReadBytes(dataIn, 8);
info.Version = dataIn.ReadInt32();
info.Options = dataIn.ReadInt32();
info.Version = DataStreamMarshaller.ReadInt(dataIn);
info.Options = DataStreamMarshaller.ReadInt(dataIn);
}
@ -82,8 +82,8 @@ namespace OpenWire.Client.IO
WireFormatInfo info = (WireFormatInfo)o;
dataOut.Write(info.Magic, 0, 8);
dataOut.Write((int) info.Version);
dataOut.Write((int) info.Options);
DataStreamMarshaller.WriteInt(info.Version, dataOut);
DataStreamMarshaller.WriteInt(info.Options, dataOut);
}
}

View File

@ -56,7 +56,7 @@ namespace OpenWire.Client.IO
base.Unmarshal(wireFormat, o, dataIn, bs);
XATransactionId info = (XATransactionId)o;
info.FormatId = dataIn.ReadInt32();
info.FormatId = DataStreamMarshaller.ReadInt(dataIn);
info.GlobalTransactionId = ReadBytes(dataIn, bs.ReadBoolean());
info.BranchQualifier = ReadBytes(dataIn, bs.ReadBoolean());
@ -85,13 +85,13 @@ namespace OpenWire.Client.IO
base.Marshal2(wireFormat, o, dataOut, bs);
XATransactionId info = (XATransactionId)o;
dataOut.Write((int) info.FormatId);
DataStreamMarshaller.WriteInt(info.FormatId, dataOut);
if(bs.ReadBoolean()) {
dataOut.Write((int)info.GlobalTransactionId.Length);
DataStreamMarshaller.WriteInt(info.GlobalTransactionId.Length, dataOut);
dataOut.Write(info.GlobalTransactionId);
}
if(bs.ReadBoolean()) {
dataOut.Write((int)info.BranchQualifier.Length);
DataStreamMarshaller.WriteInt(info.BranchQualifier.Length, dataOut);
dataOut.Write(info.BranchQualifier);
}

View File

@ -1,42 +1,63 @@
using System;
using System.Threading;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
namespace OpenWire.Client
{
/// <summary>
/// An object capable of receiving messages from some destination
/// </summary>
public class MessageConsumer : IMessageConsumer
{
private Session session;
private ConsumerInfo info;
private bool closed;
public event MessageHandler Listener;
public MessageConsumer(Session session, ConsumerInfo info)
{
this.session = session;
this.info = info;
}
/// <summary>
/// An object capable of receiving messages from some destination
/// Method Dispatch
/// </summary>
public class MessageConsumer : IMessageConsumer {
private Session session;
private ConsumerInfo info;
private bool closed;
public MessageConsumer(Session session, ConsumerInfo info) {
this.session = session;
this.info = info;
}
public IMessage Receive() {
CheckClosed();
// TODO
return null;
}
public IMessage ReceiveNoWait() {
CheckClosed();
// TODO
return null;
}
public void Dispose() {
session.DisposeOf(info.ConsumerId);
closed = true;
}
protected void CheckClosed() {
if (closed) {
throw new ConnectionClosedException();
}
}
}
/// <param name="message">An ActiveMQMessage</param>
public void Dispatch(ActiveMQMessage message)
{
Console.WriteLine("Dispatching message to consumer: " + message);
}
public IMessage Receive()
{
CheckClosed();
Thread.Sleep(60000);
// TODO
return null;
}
public IMessage ReceiveNoWait()
{
CheckClosed();
// TODO
return null;
}
public void Dispose()
{
session.DisposeOf(info.ConsumerId);
closed = true;
}
protected void CheckClosed()
{
if (closed)
{
throw new ConnectionClosedException();
}
}
}
}

View File

@ -1,30 +1,52 @@
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
/// <summary>
/// An object capable of sending messages to some destination
/// </summary>
public class MessageProducer : IMessageProducer {
private Session session;
private ProducerInfo info;
public MessageProducer(Session session, ProducerInfo info) {
this.session = session;
this.info = info;
}
public void Send(IMessage message) {
Send(info.Destination, message);
}
public void Send(IDestination destination, IMessage message) {
session.DoSend(destination, message);
}
public void Dispose() {
session.DisposeOf(info.ProducerId);
}
}
namespace OpenWire.Client
{
/// <summary>
/// An object capable of sending messages to some destination
/// </summary>
public class MessageProducer : IMessageProducer
{
private Session session;
private ProducerInfo info;
private long messageCounter;
public MessageProducer(Session session, ProducerInfo info)
{
this.session = session;
this.info = info;
}
public void Send(IMessage message)
{
Send(info.Destination, message);
}
public void Send(IDestination destination, IMessage message)
{
MessageId id = new MessageId();
id.ProducerId = info.ProducerId;
lock (this)
{
id.ProducerSequenceId = ++messageCounter;
}
ActiveMQMessage activeMessage = (ActiveMQMessage) message;
activeMessage.MessageId = id;
activeMessage.ProducerId = info.ProducerId;
activeMessage.Destination = (ActiveMQDestination) destination;
Console.WriteLine("About to send message with MessageId: " + activeMessage.MessageId);
Console.WriteLine("About to send message with ProducerId: " + activeMessage.ProducerId);
Console.WriteLine("About to send message with Destination: " + activeMessage.Destination);
session.DoSend(destination, message);
}
public void Dispose()
{
session.DisposeOf(info.ProducerId);
}
}
}

View File

@ -2,102 +2,170 @@ using System;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client {
namespace OpenWire.Client
{
/// <summary>
/// Default provider of ISession
/// </summary>
public class Session : ISession
{
private Connection connection;
private AcknowledgementMode acknowledgementMode;
private SessionInfo info;
private long consumerCounter;
private long producerCounter;
private int prefetchSize = 1000;
public Session(Connection connection, SessionInfo info)
{
this.connection = connection;
this.info = info;
}
public void Dispose()
{
DisposeOf(info.SessionId);
}
public IMessageProducer CreateProducer()
{
return CreateProducer(null);
}
public IMessageProducer CreateProducer(IDestination destination)
{
ProducerInfo command = CreateProducerInfo(destination);
connection.SyncRequest(command);
return new MessageProducer(this, command);
}
public void Acknowledge(Message message)
{
if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
{
MessageAck ack = new MessageAck();
// TODO complete packet
connection.SyncRequest(ack);
}
}
public IMessageConsumer CreateConsumer(IDestination destination)
{
return CreateConsumer(destination, null);
}
public IMessageConsumer CreateConsumer(IDestination destination, string selector)
{
ConsumerInfo command = CreateConsumerInfo(destination, selector);
connection.SyncRequest(command);
MessageConsumer consumer = new MessageConsumer(this, command);
connection.AddConsumer(command.ConsumerId, consumer);
return consumer;
}
public IQueue GetQueue(string name)
{
return new ActiveMQQueue(name);
}
public ITopic GetTopic(string name)
{
return new ActiveMQTopic(name);
}
public IMessage CreateMessage()
{
ActiveMQMessage answer = new ActiveMQMessage();
Configure(answer);
return answer;
}
public ITextMessage CreateTextMessage()
{
ActiveMQTextMessage answer = new ActiveMQTextMessage();
Configure(answer);
return answer;
}
public ITextMessage CreateTextMessage(string text)
{
ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
Configure(answer);
return answer;
}
// Implementation methods
public void DoSend(IDestination destination, IMessage message)
{
ActiveMQMessage command = ActiveMQMessage.Transform(message);
// TODO complete packet
connection.SyncRequest(command);
}
public void DisposeOf(DataStructure objectId)
{
Console.WriteLine("Disposing of session: " + objectId + " with datatype: " + objectId.GetDataStructureType());
/*
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
connection.SyncRequest(command);
*/
}
public void DisposeOf(ConsumerId objectId)
{
Console.WriteLine("Disposing of consumer: " + objectId);
connection.RemoveConsumer(objectId);
/*
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
connection.SyncRequest(command);
*/
}
protected ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
{
ConsumerInfo answer = new ConsumerInfo();
ConsumerId id = new ConsumerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
lock (this)
{
id.Value = ++consumerCounter;
}
answer.ConsumerId = id;
answer.Destination = (ActiveMQDestination) destination;
answer.Selector = selector;
answer.PrefetchSize = prefetchSize;
// TODO configure other features on the consumer
return answer;
}
protected ProducerInfo CreateProducerInfo(IDestination destination)
{
ProducerInfo answer = new ProducerInfo();
ProducerId id = new ProducerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
lock (this)
{
id.Value = ++producerCounter;
}
answer.ProducerId = id;
answer.Destination = (ActiveMQDestination) destination;
return answer;
}
/// <summary>
/// Default provider of ISession
/// Configures the message command
/// </summary>
public class Session : ISession {
private Connection connection;
private AcknowledgementMode acknowledgementMode;
private SessionInfo info;
private long consumerCounter;
public Session(Connection connection, SessionInfo info) {
this.connection = connection;
this.info = info;
}
public void Dispose() {
DisposeOf(info.SessionId);
}
public IMessageProducer CreateProducer() {
return CreateProducer(null);
}
public IMessageProducer CreateProducer(IDestination destination) {
ProducerInfo command = CreateProducerInfo(destination);
connection.SyncRequest(command);
return new MessageProducer(this, command);
}
public void Acknowledge(Message message) {
if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) {
MessageAck ack = new MessageAck();
// TODO complete packet
connection.SyncRequest(ack);
}
}
public IMessageConsumer CreateConsumer(IDestination destination) {
return CreateConsumer(destination, null);
}
public IMessageConsumer CreateConsumer(IDestination destination, string selector) {
ConsumerInfo command = CreateConsumerInfo(destination, selector);
connection.SyncRequest(command);
return new MessageConsumer(this, command);
}
public IQueue GetQueue(string name) {
return new ActiveMQQueue(name);
}
public ITopic GetTopic(string name) {
return new ActiveMQTopic(name);
}
public IMessage CreateMessage() {
return new ActiveMQMessage();
}
public ITextMessage CreateTextMessage() {
return new ActiveMQTextMessage();
}
public ITextMessage CreateTextMessage(string text) {
return new ActiveMQTextMessage(text);
}
// Implementation methods
public void DoSend(IDestination destination, IMessage message) {
ActiveMQMessage command = ActiveMQMessage.Transform(message);
// TODO complete packet
connection.SyncRequest(command);
}
public void DisposeOf(DataStructure objectId) {
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
connection.SyncRequest(command);
}
protected ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) {
ConsumerInfo answer = new ConsumerInfo();
ConsumerId consumerId = new ConsumerId();
consumerId.SessionId = info.SessionId.Value;
lock (this) {
consumerId.Value = ++consumerCounter;
}
// TODO complete packet
answer.ConsumerId = consumerId;
return answer;
}
protected ProducerInfo CreateProducerInfo(IDestination destination) {
ProducerInfo info = new ProducerInfo();
// TODO complete packet
return info;
}
}
/// <param name="activeMQMessage">An ActiveMQMessage</param>
/// <returns>An IMessage</retutns>
protected void Configure(ActiveMQMessage message)
{
}
}
}

View File

@ -4,39 +4,84 @@ using System.IO;
using NUnit.Framework;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client {
[ TestFixture ]
public class ClientTest : TestSupport {
[ Test ]
public void SendAndSyncReceive() {
IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
namespace OpenWire.Client
{
[ TestFixture ]
public class ClientTest : TestSupport
{
[ Test ]
public void CreateOpenWireFormat()
{
OpenWireFormat format = new OpenWireFormat();
Assert.IsTrue(format != null);
}
[ Test ]
public void CreateConnectionFactory()
{
IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
Assert.IsTrue(factory != null, "created valid factory: " + factory);
}
[ Test ]
public void SendAndSyncReceive()
{
IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
Assert.IsTrue(factory != null, "no factory created");
Console.WriteLine("Worked!");
using (IConnection connection = factory.CreateConnection())
{
try
{
Assert.IsTrue(connection != null, "no connection created");
Console.WriteLine("Created a connection!");
ISession session = connection.CreateSession();
Console.WriteLine("Created a session: " + session);
IDestination destination = session.GetQueue("FOO.BAR");
Assert.IsTrue(destination != null, "No queue available!");
Console.WriteLine("Using destination: " + destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
Console.WriteLine("Created consumer!: " + consumer);
IMessageProducer producer = session.CreateProducer(destination);
Console.WriteLine("Created producer!: " + producer);
string expected = "Hello World!";
ITextMessage request = session.CreateTextMessage(expected);
Console.WriteLine("### About to send message: " + request);
producer.Send(request);
Console.WriteLine("### Sent message!");
ITextMessage message = (ITextMessage) consumer.Receive();
if (message == null)
{
Console.WriteLine("### No message!!");
}
else
{
Console.WriteLine("### Received message: " + message + " of type: " + message.GetType());
String actual = message.Text;
Assert.IsTrue(factory != null, "created valid factory: " + factory);
Console.WriteLine("Worked!");
using (IConnection connection = factory.CreateConnection()) {
ISession session = connection.CreateSession();
Console.WriteLine("Created a session: " + session);
IDestination destination = session.GetQueue("FOO.BAR");
Assert.IsTrue(destination != null, "No queue available!");
Console.WriteLine("Using destination: " + destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
IMessageProducer producer = session.CreateProducer(destination);
string expected = "Hello World!";
ITextMessage request = session.CreateTextMessage(expected);
producer.Send(request);
ITextMessage message = (ITextMessage) consumer.Receive();
Assert.AreEqual(expected, message.Text);
}
}
}
Console.WriteLine("### Message text is: " + actual);
}
}
catch (Exception e)
{
Console.WriteLine("Caught: " + e);
}
}
}
}
}

View File

@ -0,0 +1,60 @@
using NUnit.Framework;
using OpenWire.Client.Core;
using System;
namespace openwire_dotnet
{
[TestFixture]
public class EndianTest
{
[Test]
public void TestLongEndian()
{
long value = 0x0102030405060708l;
long newValue = DataStreamMarshaller.SwitchEndian(value);
Console.WriteLine("New value: " + newValue);
Assert.AreEqual(0x0807060504030201L, newValue);
long actual = DataStreamMarshaller.SwitchEndian(newValue);
Assert.AreEqual(value, actual);
}
[Test]
public void TestIntEndian()
{
int value = 0x12345678;
int newValue = DataStreamMarshaller.SwitchEndian(value);
Console.WriteLine("New value: " + newValue);
Assert.AreEqual(0x78563412, newValue);
int actual = DataStreamMarshaller.SwitchEndian(newValue);
Assert.AreEqual(value, actual);
}
[Test]
public void TestShortEndian()
{
short value = 0x1234;
short newValue = DataStreamMarshaller.SwitchEndian(value);
Console.WriteLine("New value: " + newValue);
Assert.AreEqual(0x3412, newValue);
short actual = DataStreamMarshaller.SwitchEndian(newValue);
Assert.AreEqual(value, actual);
}
}
}

View File

@ -0,0 +1,59 @@
using System;
using System.IO;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace openwire_dotnet
{
public class TestMain
{
public static void Main(string[] args)
{
try
{
Console.WriteLine("About to connect to ActiveMQ");
IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
Console.WriteLine("Worked!");
using (IConnection connection = factory.CreateConnection())
{
Console.WriteLine("Created a connection!");
ISession session = connection.CreateSession();
Console.WriteLine("Created a session: " + session);
IDestination destination = session.GetQueue("FOO.BAR");
Console.WriteLine("Using destination: " + destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
IMessageProducer producer = session.CreateProducer(destination);
string expected = "Hello World!";
ITextMessage request = session.CreateTextMessage(expected);
producer.Send(request);
ITextMessage message = (ITextMessage) consumer.Receive();
if (message == null)
{
Console.WriteLine("### No message!!");
}
else
{
Console.WriteLine("### Received message: " + message + " of type: " + message.GetType());
String actual = message.Text;
Console.WriteLine("### Message text is: " + actual);
}
}
}
catch (Exception e)
{
Console.WriteLine("Caught: " + e);
Console.WriteLine("Stack: " + e.StackTrace);
}
}
}
}