diff --git a/openwire-dotnet/src/OpenWire.Client/Connection.cs b/openwire-dotnet/src/OpenWire.Client/Connection.cs index a9785f9fec..96f123e9f6 100755 --- a/openwire-dotnet/src/OpenWire.Client/Connection.cs +++ b/openwire-dotnet/src/OpenWire.Client/Connection.cs @@ -13,16 +13,16 @@ namespace OpenWire.Client { private ITransport transport; private ConnectionInfo info; + private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; private BrokerInfo brokerInfo; // from broker private WireFormatInfo brokerWireFormatInfo; // from broker private IList sessions = new ArrayList(); - private bool transacted; + private IDictionary consumers = new Hashtable(); // TODO threadsafe private bool connected; private bool closed; - private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; private long sessionCounter; private long temporaryDestinationCounter; - private IDictionary consumers = new Hashtable(); // TODO threadsafe + private long localTransactionCounter; public Connection(ITransport transport, ConnectionInfo info) @@ -32,7 +32,7 @@ namespace OpenWire.Client this.transport.Command += new CommandHandler(OnCommand); this.transport.Start(); } - + /// /// Starts message delivery for this connection. /// @@ -53,16 +53,15 @@ namespace OpenWire.Client /// public ISession CreateSession() { - return CreateSession(transacted, acknowledgementMode); + return CreateSession(acknowledgementMode); } /// /// Creates a new session to work on this connection /// - public ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode) + public ISession CreateSession(AcknowledgementMode acknowledgementMode) { - CheckConnected(); - SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode); + SessionInfo info = CreateSessionInfo(acknowledgementMode); SyncRequest(info); Session session = new Session(this, info, acknowledgementMode); sessions.Add(session); @@ -71,10 +70,13 @@ namespace OpenWire.Client public void Dispose() { + /* foreach (Session session in sessions) { session.Dispose(); } + */ + DisposeOf(ConnectionId); sessions.Clear(); transport.Dispose(); closed = true; @@ -88,11 +90,6 @@ namespace OpenWire.Client set { this.transport = value; } } - public bool Transacted - { - get { return transacted; } - set { this.transacted = value; } - } public AcknowledgementMode AcknowledgementMode { @@ -112,13 +109,22 @@ namespace OpenWire.Client } } - public BrokerInfo BrokerInfo { + public ConnectionId ConnectionId + { + get { + return info.ConnectionId; + } + } + + public BrokerInfo BrokerInfo + { get { return brokerInfo; } } - public WireFormatInfo BrokerWireFormat { + public WireFormatInfo BrokerWireFormat + { get { return brokerWireFormatInfo; } @@ -131,6 +137,7 @@ namespace OpenWire.Client /// public Response SyncRequest(Command command) { + CheckConnected(); Response response = transport.Request(command); if (response is ExceptionResponse) { @@ -141,18 +148,17 @@ namespace OpenWire.Client return response; } - - protected SessionInfo CreateSessionInfo(bool transacted, AcknowledgementMode acknowledgementMode) + public void OneWay(Command command) { - SessionInfo answer = new SessionInfo(); - SessionId sessionId = new SessionId(); - sessionId.ConnectionId = info.ConnectionId.Value; - lock (this) - { - sessionId.Value = ++sessionCounter; - } - answer.SessionId = sessionId; - return answer; + CheckConnected(); + transport.Oneway(command); + } + + public void DisposeOf(DataStructure objectId) + { + RemoveInfo command = new RemoveInfo(); + command.ObjectId = objectId; + SyncRequest(command); } @@ -167,6 +173,20 @@ namespace OpenWire.Client } } + /// + /// Creates a new local transaction ID + /// + public LocalTransactionId CreateLocalTransactionId() + { + LocalTransactionId id= new LocalTransactionId(); + id.ConnectionId = ConnectionId; + lock (this) + { + id.Value = (++localTransactionCounter); + } + return id; + } + protected void CheckConnected() { if (closed) @@ -175,9 +195,9 @@ namespace OpenWire.Client } if (!connected) { + connected = true; // now lets send the connection and see if we get an ack/nak SyncRequest(info); - connected = true; } } @@ -224,10 +244,12 @@ namespace OpenWire.Client consumer.Dispatch(message); } } - else if (command is WireFormatInfo) { + else if (command is WireFormatInfo) + { this.brokerWireFormatInfo = (WireFormatInfo) command; } - else if (command is BrokerInfo) { + else if (command is BrokerInfo) + { this.brokerInfo = (BrokerInfo) command; } else @@ -235,6 +257,19 @@ namespace OpenWire.Client Console.WriteLine("ERROR:ÊUnknown command: " + command); } } - + + protected SessionInfo CreateSessionInfo(AcknowledgementMode acknowledgementMode) + { + SessionInfo answer = new SessionInfo(); + SessionId sessionId = new SessionId(); + sessionId.ConnectionId = info.ConnectionId.Value; + lock (this) + { + sessionId.Value = ++sessionCounter; + } + answer.SessionId = sessionId; + return answer; + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs b/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs index a31719f93a..d2131c1547 100644 --- a/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs @@ -18,6 +18,7 @@ using System.Collections; using OpenWire.Client.Commands; using System; using OpenWire.Client; +using System.Threading; namespace OpenWire.Client.Core { @@ -26,14 +27,52 @@ namespace OpenWire.Client.Core /// public class Dispatcher { - Queue queue = Queue.Synchronized( new Queue() ); + Queue queue = new Queue(); + Object semaphore = new Object(); + ArrayList messagesToRedeliver = new ArrayList(); + /// + /// Whem we start a transaction we must redeliver any rolled back messages + /// + public void RedeliverRolledBackMessages() { + lock (semaphore) + { + Queue replacement = new Queue(queue.Count + messagesToRedeliver.Count); + foreach (ActiveMQMessage element in messagesToRedeliver) { + replacement.Enqueue(element); + } + messagesToRedeliver.Clear(); + + while (queue.Count > 0) + { + ActiveMQMessage element = (ActiveMQMessage) queue.Dequeue(); + replacement.Enqueue(element); + } + queue = replacement; + Monitor.PulseAll(semaphore); + } + } + + /// + /// Redeliver the given message, putting it at the head of the queue + /// + public void Redeliver(ActiveMQMessage message) + { + lock (semaphore) { + messagesToRedeliver.Add(message); + } + } + /// /// Method Enqueue /// public void Enqueue(ActiveMQMessage message) { - queue.Enqueue(message); + lock (semaphore) + { + queue.Enqueue(message); + Monitor.PulseAll(semaphore); + } } /// @@ -41,9 +80,9 @@ namespace OpenWire.Client.Core /// public IMessage DequeueNoWait() { - lock (queue) + lock (semaphore) { - if (queue.Peek() != null) + if (queue.Count > 0) { return (IMessage) queue.Dequeue(); } @@ -54,10 +93,20 @@ namespace OpenWire.Client.Core /// /// Method Dequeue /// - public IMessage Dequeue(long timeout) + public IMessage Dequeue(int timeout) { - // TODO - throw new Exception("Not implemented yet"); + lock (semaphore) + { + if (queue.Count == 0) + { + Monitor.Wait(semaphore, timeout); + } + if (queue.Count > 0) + { + return (IMessage) queue.Dequeue(); + } + } + return null; } /// @@ -65,7 +114,10 @@ namespace OpenWire.Client.Core /// public IMessage Dequeue() { - return (IMessage) queue.Dequeue(); + lock (semaphore) + { + return (IMessage) queue.Dequeue(); + } } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/ISynchronization.cs b/openwire-dotnet/src/OpenWire.Client/Core/ISynchronization.cs new file mode 100644 index 0000000000..1fb0fe059b --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/Core/ISynchronization.cs @@ -0,0 +1,36 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace OpenWire.Client.Core +{ + public interface ISynchronization + { + /// + /// Called before a commit + /// + void BeforeCommit(); + + /// + /// Called after a commit + /// + void AfterCommit(); + + /// + /// Called after a transaction rollback + /// + void AfterRollback(); + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/Core/TransactionContext.cs b/openwire-dotnet/src/OpenWire.Client/Core/TransactionContext.cs new file mode 100644 index 0000000000..7556c89e9b --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/Core/TransactionContext.cs @@ -0,0 +1,110 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System.Collections; +using OpenWire.Client.Commands; +using System; +using OpenWire.Client; + +namespace OpenWire.Client.Core +{ + public enum TransactionType + { + Begin = 0, Prepare = 1, CommitOnePhase = 2, CommitTwoPhase = 3, Rollback = 4, Recover=5, Forget = 6, End = 7 + } + + public class TransactionContext + { + private TransactionId transactionId; + private Session session; + private ArrayList synchronizations = new ArrayList(); + + public TransactionContext(Session session) { + this.session = session; + } + + public TransactionId TransactionId + { + get { return transactionId; } + } + + /// + /// Method AddSynchronization + /// + public void AddSynchronization(ISynchronization synchronization) + { + synchronizations.Add(synchronization); + } + + + public void Begin() + { + if (transactionId == null) + { + transactionId = session.Connection.CreateLocalTransactionId(); + + TransactionInfo info = new TransactionInfo(); + info.ConnectionId = session.Connection.ConnectionId; + info.TransactionId = transactionId; + info.Type = (int) TransactionType.Begin; + session.Connection.OneWay(info); + } + } + + + public void Rollback() + { + if (transactionId != null) + { + TransactionInfo info = new TransactionInfo(); + info.ConnectionId = session.Connection.ConnectionId; + info.TransactionId = transactionId; + info.Type = (int) TransactionType.Rollback; + + transactionId = null; + session.Connection.OneWay(info); + } + + foreach (ISynchronization synchronization in synchronizations) { + synchronization.AfterRollback(); + } + synchronizations.Clear(); + } + + public void Commit() + { + foreach (ISynchronization synchronization in synchronizations) { + synchronization.BeforeCommit(); + } + + if (transactionId != null) + { + TransactionInfo info = new TransactionInfo(); + info.ConnectionId = session.Connection.ConnectionId; + info.TransactionId = transactionId; + info.Type = (int) TransactionType.CommitOnePhase; + + transactionId = null; + session.Connection.OneWay(info); + } + + foreach (ISynchronization synchronization in synchronizations) { + synchronization.AfterCommit(); + } + synchronizations.Clear(); + } + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/IConnection.cs b/openwire-dotnet/src/OpenWire.Client/IConnection.cs index e64893f12c..cecc5d4ff9 100755 --- a/openwire-dotnet/src/OpenWire.Client/IConnection.cs +++ b/openwire-dotnet/src/OpenWire.Client/IConnection.cs @@ -17,47 +17,46 @@ using System; using OpenWire.Client.Commands; -namespace OpenWire.Client { - - public enum AcknowledgementMode { - Unknown, AutoAcknowledge, ClientAcknowledge, Transactional - } - - +namespace OpenWire.Client +{ + + public enum AcknowledgementMode + { + Unknown, AutoAcknowledge, ClientAcknowledge, Transactional + } + + + /// + /// Represents a connection with a message broker + /// + public interface IConnection : IDisposable, IStartable + { + /// - /// Represents a connection with a message broker + /// Creates a new session to work on this connection /// - public interface IConnection : IDisposable, IStartable { - - /// - /// Creates a new session to work on this connection - /// - ISession CreateSession(); - - /// - /// Creates a new session to work on this connection - /// - ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode); - - - // Properties - - bool Transacted { - get; - set; - } - - AcknowledgementMode AcknowledgementMode { - get; - set; - } - - String ClientId - { - get; - set; - } - - + ISession CreateSession(); + + /// + /// Creates a new session to work on this connection + /// + ISession CreateSession(AcknowledgementMode acknowledgementMode); + + + // Properties + + AcknowledgementMode AcknowledgementMode + { + get; + set; } + + String ClientId + { + get; + set; + } + + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs index ac583a6f2a..8be6589e57 100755 --- a/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs +++ b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs @@ -35,7 +35,7 @@ namespace OpenWire.Client /// /// If a message is available within the timeout duration it is returned otherwise this method returns null /// - IMessage Receive(long timeout); + IMessage Receive(int timeout); /// /// If a message is available immediately it is returned otherwise this method returns null diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ActiveMQDestinationMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ActiveMQDestinationMarshaller.cs index 43c6284b45..7afa70f9ec 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ActiveMQDestinationMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ActiveMQDestinationMarshaller.cs @@ -57,7 +57,7 @@ namespace OpenWire.Client.IO ActiveMQDestination info = (ActiveMQDestination)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += WriteString(info.PhysicalName, bs); + rc += WriteString(info.PhysicalName, bs); return rc + 0; } @@ -69,7 +69,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); ActiveMQDestination info = (ActiveMQDestination)o; - WriteString(info.PhysicalName, dataOut, bs); + WriteString(info.PhysicalName, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs index 452759b1e1..e2b3b37990 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/BaseCommandMarshaller.cs @@ -58,7 +58,7 @@ namespace OpenWire.Client.IO BaseCommand info = (BaseCommand)o; int rc = base.Marshal1(wireFormat, info, bs); - bs.WriteBoolean(info.ResponseRequired); + bs.WriteBoolean(info.ResponseRequired); return rc + 1; } @@ -70,8 +70,8 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); BaseCommand info = (BaseCommand)o; - DataStreamMarshaller.WriteShort(info.CommandId, dataOut); - bs.ReadBoolean(); + DataStreamMarshaller.WriteShort(info.CommandId, dataOut); + bs.ReadBoolean(); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/BrokerIdMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/BrokerIdMarshaller.cs index 52050e4026..6033641898 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/BrokerIdMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/BrokerIdMarshaller.cs @@ -68,7 +68,7 @@ namespace OpenWire.Client.IO BrokerId info = (BrokerId)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += WriteString(info.Value, bs); + rc += WriteString(info.Value, bs); return rc + 0; } @@ -80,7 +80,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); BrokerId info = (BrokerId)o; - WriteString(info.Value, dataOut, bs); + WriteString(info.Value, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs index 40bce86a60..e871155871 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/BrokerInfoMarshaller.cs @@ -83,11 +83,11 @@ namespace OpenWire.Client.IO BrokerInfo info = (BrokerInfo)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.BrokerId, bs); - rc += WriteString(info.BrokerURL, bs); - rc += MarshalObjectArray(wireFormat, info.PeerBrokerInfos, bs); - rc += WriteString(info.BrokerName, bs); - bs.WriteBoolean(info.SlaveBroker); + rc += Marshal1CachedObject(wireFormat, info.BrokerId, bs); + rc += WriteString(info.BrokerURL, bs); + rc += MarshalObjectArray(wireFormat, info.PeerBrokerInfos, bs); + rc += WriteString(info.BrokerName, bs); + bs.WriteBoolean(info.SlaveBroker); return rc + 0; } @@ -99,11 +99,11 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); BrokerInfo info = (BrokerInfo)o; - Marshal2CachedObject(wireFormat, info.BrokerId, dataOut, bs); - WriteString(info.BrokerURL, dataOut, bs); - MarshalObjectArray(wireFormat, info.PeerBrokerInfos, dataOut, bs); - WriteString(info.BrokerName, dataOut, bs); - bs.ReadBoolean(); + Marshal2CachedObject(wireFormat, info.BrokerId, dataOut, bs); + WriteString(info.BrokerURL, dataOut, bs); + MarshalObjectArray(wireFormat, info.PeerBrokerInfos, dataOut, bs); + WriteString(info.BrokerName, dataOut, bs); + bs.ReadBoolean(); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ConnectionErrorMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ConnectionErrorMarshaller.cs index b5ff0256b7..136f21d4b1 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ConnectionErrorMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ConnectionErrorMarshaller.cs @@ -69,8 +69,8 @@ namespace OpenWire.Client.IO ConnectionError info = (ConnectionError)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += MarshalBrokerError(wireFormat, info.Exception, bs); - rc += Marshal1NestedObject(wireFormat, info.ConnectionId, bs); + rc += MarshalBrokerError(wireFormat, info.Exception, bs); + rc += Marshal1NestedObject(wireFormat, info.ConnectionId, bs); return rc + 0; } @@ -82,8 +82,8 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); ConnectionError info = (ConnectionError)o; - MarshalBrokerError(wireFormat, info.Exception, dataOut, bs); - Marshal2NestedObject(wireFormat, info.ConnectionId, dataOut, bs); + MarshalBrokerError(wireFormat, info.Exception, dataOut, bs); + Marshal2NestedObject(wireFormat, info.ConnectionId, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ConnectionIdMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ConnectionIdMarshaller.cs index 7d8d369cb1..777eab8519 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ConnectionIdMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ConnectionIdMarshaller.cs @@ -68,7 +68,7 @@ namespace OpenWire.Client.IO ConnectionId info = (ConnectionId)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += WriteString(info.Value, bs); + rc += WriteString(info.Value, bs); return rc + 0; } @@ -80,7 +80,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); ConnectionId info = (ConnectionId)o; - WriteString(info.Value, dataOut, bs); + WriteString(info.Value, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs index ed252c244f..de03922be6 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ConnectionInfoMarshaller.cs @@ -83,11 +83,11 @@ namespace OpenWire.Client.IO ConnectionInfo info = (ConnectionInfo)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.ConnectionId, bs); - rc += WriteString(info.ClientId, bs); - rc += WriteString(info.Password, bs); - rc += WriteString(info.UserName, bs); - rc += MarshalObjectArray(wireFormat, info.BrokerPath, bs); + rc += Marshal1CachedObject(wireFormat, info.ConnectionId, bs); + rc += WriteString(info.ClientId, bs); + rc += WriteString(info.Password, bs); + rc += WriteString(info.UserName, bs); + rc += MarshalObjectArray(wireFormat, info.BrokerPath, bs); return rc + 0; } @@ -99,11 +99,11 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); ConnectionInfo info = (ConnectionInfo)o; - Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); - WriteString(info.ClientId, dataOut, bs); - WriteString(info.Password, dataOut, bs); - WriteString(info.UserName, dataOut, bs); - MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); + Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); + WriteString(info.ClientId, dataOut, bs); + WriteString(info.Password, dataOut, bs); + WriteString(info.UserName, dataOut, bs); + MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ConsumerIdMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ConsumerIdMarshaller.cs index 720a668718..e1a1f42c82 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ConsumerIdMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ConsumerIdMarshaller.cs @@ -70,9 +70,9 @@ namespace OpenWire.Client.IO ConsumerId info = (ConsumerId)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += WriteString(info.ConnectionId, bs); - rc += Marshal1Long(wireFormat, info.SessionId, bs); - rc += Marshal1Long(wireFormat, info.Value, bs); + rc += WriteString(info.ConnectionId, bs); + rc += Marshal1Long(wireFormat, info.SessionId, bs); + rc += Marshal1Long(wireFormat, info.Value, bs); return rc + 0; } @@ -84,9 +84,9 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); ConsumerId info = (ConsumerId)o; - WriteString(info.ConnectionId, dataOut, bs); - Marshal2Long(wireFormat, info.SessionId, dataOut, bs); - Marshal2Long(wireFormat, info.Value, dataOut, bs); + WriteString(info.ConnectionId, dataOut, bs); + Marshal2Long(wireFormat, info.SessionId, dataOut, bs); + Marshal2Long(wireFormat, info.Value, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs index 8cfd215a60..7b86767e88 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ConsumerInfoMarshaller.cs @@ -91,17 +91,17 @@ namespace OpenWire.Client.IO ConsumerInfo info = (ConsumerInfo)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.ConsumerId, bs); - bs.WriteBoolean(info.Browser); - rc += Marshal1CachedObject(wireFormat, info.Destination, bs); - bs.WriteBoolean(info.DispatchAsync); - rc += WriteString(info.Selector, bs); - rc += WriteString(info.SubcriptionName, bs); - bs.WriteBoolean(info.NoLocal); - bs.WriteBoolean(info.Exclusive); - bs.WriteBoolean(info.Retroactive); - rc += MarshalObjectArray(wireFormat, info.BrokerPath, bs); - bs.WriteBoolean(info.NetworkSubscription); + rc += Marshal1CachedObject(wireFormat, info.ConsumerId, bs); + bs.WriteBoolean(info.Browser); + rc += Marshal1CachedObject(wireFormat, info.Destination, bs); + bs.WriteBoolean(info.DispatchAsync); + rc += WriteString(info.Selector, bs); + rc += WriteString(info.SubcriptionName, bs); + bs.WriteBoolean(info.NoLocal); + bs.WriteBoolean(info.Exclusive); + bs.WriteBoolean(info.Retroactive); + rc += MarshalObjectArray(wireFormat, info.BrokerPath, bs); + bs.WriteBoolean(info.NetworkSubscription); return rc + 2; } @@ -113,19 +113,19 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); ConsumerInfo info = (ConsumerInfo)o; - Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs); - bs.ReadBoolean(); - Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); - DataStreamMarshaller.WriteInt(info.PrefetchSize, dataOut); - bs.ReadBoolean(); - WriteString(info.Selector, dataOut, bs); - WriteString(info.SubcriptionName, dataOut, bs); - bs.ReadBoolean(); - bs.ReadBoolean(); - bs.ReadBoolean(); - DataStreamMarshaller.WriteByte(info.Priority, dataOut); - MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); - bs.ReadBoolean(); + Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs); + bs.ReadBoolean(); + Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); + DataStreamMarshaller.WriteInt(info.PrefetchSize, dataOut); + bs.ReadBoolean(); + WriteString(info.Selector, dataOut, bs); + WriteString(info.SubcriptionName, dataOut, bs); + bs.ReadBoolean(); + bs.ReadBoolean(); + bs.ReadBoolean(); + DataStreamMarshaller.WriteByte(info.Priority, dataOut); + MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); + bs.ReadBoolean(); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ControlCommandMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ControlCommandMarshaller.cs index 9231618f2a..b83583dce2 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ControlCommandMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ControlCommandMarshaller.cs @@ -68,7 +68,7 @@ namespace OpenWire.Client.IO ControlCommand info = (ControlCommand)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += WriteString(info.Command, bs); + rc += WriteString(info.Command, bs); return rc + 0; } @@ -80,7 +80,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); ControlCommand info = (ControlCommand)o; - WriteString(info.Command, dataOut, bs); + WriteString(info.Command, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs index 58725018df..cacb34de9f 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/DataArrayResponseMarshaller.cs @@ -79,7 +79,7 @@ namespace OpenWire.Client.IO DataArrayResponse info = (DataArrayResponse)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += MarshalObjectArray(wireFormat, info.Data, bs); + rc += MarshalObjectArray(wireFormat, info.Data, bs); return rc + 0; } @@ -91,7 +91,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); DataArrayResponse info = (DataArrayResponse)o; - MarshalObjectArray(wireFormat, info.Data, dataOut, bs); + MarshalObjectArray(wireFormat, info.Data, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/DataResponseMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/DataResponseMarshaller.cs index 36a8d189c0..505206aacf 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/DataResponseMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/DataResponseMarshaller.cs @@ -68,7 +68,7 @@ namespace OpenWire.Client.IO DataResponse info = (DataResponse)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1NestedObject(wireFormat, info.Data, bs); + rc += Marshal1NestedObject(wireFormat, info.Data, bs); return rc + 0; } @@ -80,7 +80,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); DataResponse info = (DataResponse)o; - Marshal2NestedObject(wireFormat, info.Data, dataOut, bs); + Marshal2NestedObject(wireFormat, info.Data, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs index 5148ecf733..76237f3cf1 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/DestinationInfoMarshaller.cs @@ -83,10 +83,10 @@ namespace OpenWire.Client.IO DestinationInfo info = (DestinationInfo)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.ConnectionId, bs); - rc += Marshal1CachedObject(wireFormat, info.Destination, bs); - rc += Marshal1Long(wireFormat, info.Timeout, bs); - rc += MarshalObjectArray(wireFormat, info.BrokerPath, bs); + rc += Marshal1CachedObject(wireFormat, info.ConnectionId, bs); + rc += Marshal1CachedObject(wireFormat, info.Destination, bs); + rc += Marshal1Long(wireFormat, info.Timeout, bs); + rc += MarshalObjectArray(wireFormat, info.BrokerPath, bs); return rc + 1; } @@ -98,11 +98,11 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); DestinationInfo info = (DestinationInfo)o; - Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); - Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); - DataStreamMarshaller.WriteByte(info.OperationType, dataOut); - Marshal2Long(wireFormat, info.Timeout, dataOut, bs); - MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); + Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); + DataStreamMarshaller.WriteByte(info.OperationType, dataOut); + Marshal2Long(wireFormat, info.Timeout, dataOut, bs); + MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/DiscoveryEventMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/DiscoveryEventMarshaller.cs index f77755aea6..c5aa856753 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/DiscoveryEventMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/DiscoveryEventMarshaller.cs @@ -69,8 +69,8 @@ namespace OpenWire.Client.IO DiscoveryEvent info = (DiscoveryEvent)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += WriteString(info.ServiceName, bs); - rc += WriteString(info.BrokerName, bs); + rc += WriteString(info.ServiceName, bs); + rc += WriteString(info.BrokerName, bs); return rc + 0; } @@ -82,8 +82,8 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); DiscoveryEvent info = (DiscoveryEvent)o; - WriteString(info.ServiceName, dataOut, bs); - WriteString(info.BrokerName, dataOut, bs); + WriteString(info.ServiceName, dataOut, bs); + WriteString(info.BrokerName, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ExceptionResponseMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ExceptionResponseMarshaller.cs index 6478af3b77..614b3cdb5a 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ExceptionResponseMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ExceptionResponseMarshaller.cs @@ -68,7 +68,7 @@ namespace OpenWire.Client.IO ExceptionResponse info = (ExceptionResponse)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += MarshalBrokerError(wireFormat, info.Exception, bs); + rc += MarshalBrokerError(wireFormat, info.Exception, bs); return rc + 0; } @@ -80,7 +80,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); ExceptionResponse info = (ExceptionResponse)o; - MarshalBrokerError(wireFormat, info.Exception, dataOut, bs); + MarshalBrokerError(wireFormat, info.Exception, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs index 05baaa5b10..76115e82a0 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/IntegerResponseMarshaller.cs @@ -68,7 +68,7 @@ namespace OpenWire.Client.IO IntegerResponse info = (IntegerResponse)o; int rc = base.Marshal1(wireFormat, info, bs); - + return rc + 1; } @@ -79,7 +79,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); IntegerResponse info = (IntegerResponse)o; - DataStreamMarshaller.WriteInt(info.Result, dataOut); + DataStreamMarshaller.WriteInt(info.Result, dataOut); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/JournalQueueAckMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/JournalQueueAckMarshaller.cs index 3afedd9b68..7115e2f521 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/JournalQueueAckMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/JournalQueueAckMarshaller.cs @@ -69,8 +69,8 @@ namespace OpenWire.Client.IO JournalQueueAck info = (JournalQueueAck)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1NestedObject(wireFormat, info.Destination, bs); - rc += Marshal1NestedObject(wireFormat, info.MessageAck, bs); + rc += Marshal1NestedObject(wireFormat, info.Destination, bs); + rc += Marshal1NestedObject(wireFormat, info.MessageAck, bs); return rc + 0; } @@ -82,8 +82,8 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); JournalQueueAck info = (JournalQueueAck)o; - Marshal2NestedObject(wireFormat, info.Destination, dataOut, bs); - Marshal2NestedObject(wireFormat, info.MessageAck, dataOut, bs); + Marshal2NestedObject(wireFormat, info.Destination, dataOut, bs); + Marshal2NestedObject(wireFormat, info.MessageAck, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/JournalTopicAckMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/JournalTopicAckMarshaller.cs index 513b60aea0..0a9fb0a3e2 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/JournalTopicAckMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/JournalTopicAckMarshaller.cs @@ -73,12 +73,12 @@ namespace OpenWire.Client.IO JournalTopicAck info = (JournalTopicAck)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1NestedObject(wireFormat, info.Destination, bs); - rc += Marshal1NestedObject(wireFormat, info.MessageId, bs); - rc += Marshal1Long(wireFormat, info.MessageSequenceId, bs); - rc += WriteString(info.SubscritionName, bs); - rc += WriteString(info.ClientId, bs); - rc += Marshal1NestedObject(wireFormat, info.TransactionId, bs); + rc += Marshal1NestedObject(wireFormat, info.Destination, bs); + rc += Marshal1NestedObject(wireFormat, info.MessageId, bs); + rc += Marshal1Long(wireFormat, info.MessageSequenceId, bs); + rc += WriteString(info.SubscritionName, bs); + rc += WriteString(info.ClientId, bs); + rc += Marshal1NestedObject(wireFormat, info.TransactionId, bs); return rc + 0; } @@ -90,12 +90,12 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); JournalTopicAck info = (JournalTopicAck)o; - Marshal2NestedObject(wireFormat, info.Destination, dataOut, bs); - Marshal2NestedObject(wireFormat, info.MessageId, dataOut, bs); - Marshal2Long(wireFormat, info.MessageSequenceId, dataOut, bs); - WriteString(info.SubscritionName, dataOut, bs); - WriteString(info.ClientId, dataOut, bs); - Marshal2NestedObject(wireFormat, info.TransactionId, dataOut, bs); + Marshal2NestedObject(wireFormat, info.Destination, dataOut, bs); + Marshal2NestedObject(wireFormat, info.MessageId, dataOut, bs); + Marshal2Long(wireFormat, info.MessageSequenceId, dataOut, bs); + WriteString(info.SubscritionName, dataOut, bs); + WriteString(info.ClientId, dataOut, bs); + Marshal2NestedObject(wireFormat, info.TransactionId, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/JournalTraceMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/JournalTraceMarshaller.cs index f81bf3c525..101ea3891c 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/JournalTraceMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/JournalTraceMarshaller.cs @@ -68,7 +68,7 @@ namespace OpenWire.Client.IO JournalTrace info = (JournalTrace)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += WriteString(info.Message, bs); + rc += WriteString(info.Message, bs); return rc + 0; } @@ -80,7 +80,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); JournalTrace info = (JournalTrace)o; - WriteString(info.Message, dataOut, bs); + WriteString(info.Message, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs index 3db511646e..6e9e235696 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/JournalTransactionMarshaller.cs @@ -70,8 +70,8 @@ namespace OpenWire.Client.IO JournalTransaction info = (JournalTransaction)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1NestedObject(wireFormat, info.TransactionId, bs); - bs.WriteBoolean(info.WasPrepared); + rc += Marshal1NestedObject(wireFormat, info.TransactionId, bs); + bs.WriteBoolean(info.WasPrepared); return rc + 1; } @@ -83,9 +83,9 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); JournalTransaction info = (JournalTransaction)o; - Marshal2NestedObject(wireFormat, info.TransactionId, dataOut, bs); - DataStreamMarshaller.WriteByte(info.Type, dataOut); - bs.ReadBoolean(); + Marshal2NestedObject(wireFormat, info.TransactionId, dataOut, bs); + DataStreamMarshaller.WriteByte(info.Type, dataOut); + bs.ReadBoolean(); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/LocalTransactionIdMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/LocalTransactionIdMarshaller.cs index 6c170949a8..c15e53f268 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/LocalTransactionIdMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/LocalTransactionIdMarshaller.cs @@ -69,8 +69,8 @@ namespace OpenWire.Client.IO LocalTransactionId info = (LocalTransactionId)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1Long(wireFormat, info.Value, bs); - rc += Marshal1CachedObject(wireFormat, info.ConnectionId, bs); + rc += Marshal1Long(wireFormat, info.Value, bs); + rc += Marshal1CachedObject(wireFormat, info.ConnectionId, bs); return rc + 0; } @@ -82,8 +82,8 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); LocalTransactionId info = (LocalTransactionId)o; - Marshal2Long(wireFormat, info.Value, dataOut, bs); - Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); + Marshal2Long(wireFormat, info.Value, dataOut, bs); + Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs index 351abf18fd..e77d6ee31a 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/MessageAckMarshaller.cs @@ -74,12 +74,12 @@ namespace OpenWire.Client.IO MessageAck info = (MessageAck)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.Destination, bs); - rc += Marshal1CachedObject(wireFormat, info.TransactionId, bs); - rc += Marshal1CachedObject(wireFormat, info.ConsumerId, bs); - rc += Marshal1NestedObject(wireFormat, info.FirstMessageId, bs); - rc += Marshal1NestedObject(wireFormat, info.LastMessageId, bs); - + rc += Marshal1CachedObject(wireFormat, info.Destination, bs); + rc += Marshal1CachedObject(wireFormat, info.TransactionId, bs); + rc += Marshal1CachedObject(wireFormat, info.ConsumerId, bs); + rc += Marshal1NestedObject(wireFormat, info.FirstMessageId, bs); + rc += Marshal1NestedObject(wireFormat, info.LastMessageId, bs); + return rc + 2; } @@ -90,13 +90,13 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); MessageAck info = (MessageAck)o; - Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); - Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs); - Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs); - DataStreamMarshaller.WriteByte(info.AckType, dataOut); - Marshal2NestedObject(wireFormat, info.FirstMessageId, dataOut, bs); - Marshal2NestedObject(wireFormat, info.LastMessageId, dataOut, bs); - DataStreamMarshaller.WriteInt(info.MessageCount, dataOut); + Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); + Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs); + DataStreamMarshaller.WriteByte(info.AckType, dataOut); + Marshal2NestedObject(wireFormat, info.FirstMessageId, dataOut, bs); + Marshal2NestedObject(wireFormat, info.LastMessageId, dataOut, bs); + DataStreamMarshaller.WriteInt(info.MessageCount, dataOut); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs index cd084c4a96..15d68cf8eb 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchMarshaller.cs @@ -71,10 +71,10 @@ namespace OpenWire.Client.IO MessageDispatch info = (MessageDispatch)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.ConsumerId, bs); - rc += Marshal1CachedObject(wireFormat, info.Destination, bs); - rc += Marshal1NestedObject(wireFormat, info.Message, bs); - + rc += Marshal1CachedObject(wireFormat, info.ConsumerId, bs); + rc += Marshal1CachedObject(wireFormat, info.Destination, bs); + rc += Marshal1NestedObject(wireFormat, info.Message, bs); + return rc + 1; } @@ -85,10 +85,10 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); MessageDispatch info = (MessageDispatch)o; - Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs); - Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); - Marshal2NestedObject(wireFormat, info.Message, dataOut, bs); - DataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut); + Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); + Marshal2NestedObject(wireFormat, info.Message, dataOut, bs); + DataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchNotificationMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchNotificationMarshaller.cs index f4d5ecb1d0..05678ff6f7 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchNotificationMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/MessageDispatchNotificationMarshaller.cs @@ -71,10 +71,10 @@ namespace OpenWire.Client.IO MessageDispatchNotification info = (MessageDispatchNotification)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.ConsumerId, bs); - rc += Marshal1CachedObject(wireFormat, info.Destination, bs); - rc += Marshal1Long(wireFormat, info.DeliverySequenceId, bs); - rc += Marshal1NestedObject(wireFormat, info.MessageId, bs); + rc += Marshal1CachedObject(wireFormat, info.ConsumerId, bs); + rc += Marshal1CachedObject(wireFormat, info.Destination, bs); + rc += Marshal1Long(wireFormat, info.DeliverySequenceId, bs); + rc += Marshal1NestedObject(wireFormat, info.MessageId, bs); return rc + 0; } @@ -86,10 +86,10 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); MessageDispatchNotification info = (MessageDispatchNotification)o; - Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs); - Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); - Marshal2Long(wireFormat, info.DeliverySequenceId, dataOut, bs); - Marshal2NestedObject(wireFormat, info.MessageId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.ConsumerId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); + Marshal2Long(wireFormat, info.DeliverySequenceId, dataOut, bs); + Marshal2NestedObject(wireFormat, info.MessageId, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/MessageIdMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/MessageIdMarshaller.cs index 8829779977..0d75b0d765 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/MessageIdMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/MessageIdMarshaller.cs @@ -70,9 +70,9 @@ namespace OpenWire.Client.IO MessageId info = (MessageId)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.ProducerId, bs); - rc += Marshal1Long(wireFormat, info.ProducerSequenceId, bs); - rc += Marshal1Long(wireFormat, info.BrokerSequenceId, bs); + rc += Marshal1CachedObject(wireFormat, info.ProducerId, bs); + rc += Marshal1Long(wireFormat, info.ProducerSequenceId, bs); + rc += Marshal1Long(wireFormat, info.BrokerSequenceId, bs); return rc + 0; } @@ -84,9 +84,9 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); MessageId info = (MessageId)o; - Marshal2CachedObject(wireFormat, info.ProducerId, dataOut, bs); - Marshal2Long(wireFormat, info.ProducerSequenceId, dataOut, bs); - Marshal2Long(wireFormat, info.BrokerSequenceId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.ProducerId, dataOut, bs); + Marshal2Long(wireFormat, info.ProducerSequenceId, dataOut, bs); + Marshal2Long(wireFormat, info.BrokerSequenceId, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs index 0c021f7bcf..547caa512e 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/MessageMarshaller.cs @@ -92,30 +92,30 @@ namespace OpenWire.Client.IO Message info = (Message)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.ProducerId, bs); - rc += Marshal1CachedObject(wireFormat, info.Destination, bs); - rc += Marshal1CachedObject(wireFormat, info.TransactionId, bs); - rc += Marshal1CachedObject(wireFormat, info.OriginalDestination, bs); - rc += Marshal1NestedObject(wireFormat, info.MessageId, bs); - rc += Marshal1CachedObject(wireFormat, info.OriginalTransactionId, bs); - rc += WriteString(info.GroupID, bs); - rc += WriteString(info.CorrelationId, bs); - bs.WriteBoolean(info.Persistent); - rc += Marshal1Long(wireFormat, info.Expiration, bs); - rc += Marshal1NestedObject(wireFormat, info.ReplyTo, bs); - rc += Marshal1Long(wireFormat, info.Timestamp, bs); - rc += WriteString(info.Type, bs); - bs.WriteBoolean(info.Content!=null); + rc += Marshal1CachedObject(wireFormat, info.ProducerId, bs); + rc += Marshal1CachedObject(wireFormat, info.Destination, bs); + rc += Marshal1CachedObject(wireFormat, info.TransactionId, bs); + rc += Marshal1CachedObject(wireFormat, info.OriginalDestination, bs); + rc += Marshal1NestedObject(wireFormat, info.MessageId, bs); + rc += Marshal1CachedObject(wireFormat, info.OriginalTransactionId, bs); + rc += WriteString(info.GroupID, bs); + rc += WriteString(info.CorrelationId, bs); + bs.WriteBoolean(info.Persistent); + rc += Marshal1Long(wireFormat, info.Expiration, bs); + rc += Marshal1NestedObject(wireFormat, info.ReplyTo, bs); + rc += Marshal1Long(wireFormat, info.Timestamp, bs); + rc += WriteString(info.Type, bs); + bs.WriteBoolean(info.Content!=null); rc += info.Content==null ? 0 : info.Content.Length+4; - bs.WriteBoolean(info.MarshalledProperties!=null); + bs.WriteBoolean(info.MarshalledProperties!=null); rc += info.MarshalledProperties==null ? 0 : info.MarshalledProperties.Length+4; - rc += Marshal1NestedObject(wireFormat, info.DataStructure, bs); - rc += Marshal1CachedObject(wireFormat, info.TargetConsumerId, bs); - bs.WriteBoolean(info.Compressed); - rc += MarshalObjectArray(wireFormat, info.BrokerPath, bs); - rc += Marshal1Long(wireFormat, info.Arrival, bs); - rc += WriteString(info.UserID, bs); - bs.WriteBoolean(info.RecievedByDFBridge); + rc += Marshal1NestedObject(wireFormat, info.DataStructure, bs); + rc += Marshal1CachedObject(wireFormat, info.TargetConsumerId, bs); + bs.WriteBoolean(info.Compressed); + rc += MarshalObjectArray(wireFormat, info.BrokerPath, bs); + rc += Marshal1Long(wireFormat, info.Arrival, bs); + rc += WriteString(info.UserID, bs); + bs.WriteBoolean(info.RecievedByDFBridge); return rc + 3; } @@ -127,37 +127,37 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); Message info = (Message)o; - Marshal2CachedObject(wireFormat, info.ProducerId, dataOut, bs); - Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); - Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs); - Marshal2CachedObject(wireFormat, info.OriginalDestination, dataOut, bs); - Marshal2NestedObject(wireFormat, info.MessageId, dataOut, bs); - Marshal2CachedObject(wireFormat, info.OriginalTransactionId, dataOut, bs); - WriteString(info.GroupID, dataOut, bs); - DataStreamMarshaller.WriteInt(info.GroupSequence, dataOut); - WriteString(info.CorrelationId, dataOut, bs); - bs.ReadBoolean(); - Marshal2Long(wireFormat, info.Expiration, dataOut, bs); - DataStreamMarshaller.WriteByte(info.Priority, dataOut); - Marshal2NestedObject(wireFormat, info.ReplyTo, dataOut, bs); - Marshal2Long(wireFormat, info.Timestamp, dataOut, bs); - WriteString(info.Type, dataOut, bs); - if(bs.ReadBoolean()) { + Marshal2CachedObject(wireFormat, info.ProducerId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); + Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.OriginalDestination, dataOut, bs); + Marshal2NestedObject(wireFormat, info.MessageId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.OriginalTransactionId, dataOut, bs); + WriteString(info.GroupID, dataOut, bs); + DataStreamMarshaller.WriteInt(info.GroupSequence, dataOut); + WriteString(info.CorrelationId, dataOut, bs); + bs.ReadBoolean(); + Marshal2Long(wireFormat, info.Expiration, dataOut, bs); + DataStreamMarshaller.WriteByte(info.Priority, dataOut); + Marshal2NestedObject(wireFormat, info.ReplyTo, dataOut, bs); + Marshal2Long(wireFormat, info.Timestamp, dataOut, bs); + WriteString(info.Type, dataOut, bs); + if(bs.ReadBoolean()) { DataStreamMarshaller.WriteInt(info.Content.Length, dataOut); dataOut.Write(info.Content); } - if(bs.ReadBoolean()) { + if(bs.ReadBoolean()) { DataStreamMarshaller.WriteInt(info.MarshalledProperties.Length, dataOut); dataOut.Write(info.MarshalledProperties); } - Marshal2NestedObject(wireFormat, info.DataStructure, dataOut, bs); - Marshal2CachedObject(wireFormat, info.TargetConsumerId, dataOut, bs); - bs.ReadBoolean(); - DataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut); - MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); - Marshal2Long(wireFormat, info.Arrival, dataOut, bs); - WriteString(info.UserID, dataOut, bs); - bs.ReadBoolean(); + Marshal2NestedObject(wireFormat, info.DataStructure, dataOut, bs); + Marshal2CachedObject(wireFormat, info.TargetConsumerId, dataOut, bs); + bs.ReadBoolean(); + DataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut); + MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); + Marshal2Long(wireFormat, info.Arrival, dataOut, bs); + WriteString(info.UserID, dataOut, bs); + bs.ReadBoolean(); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ProducerIdMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ProducerIdMarshaller.cs index b75bf022c3..94b414ddf5 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ProducerIdMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ProducerIdMarshaller.cs @@ -70,9 +70,9 @@ namespace OpenWire.Client.IO ProducerId info = (ProducerId)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += WriteString(info.ConnectionId, bs); - rc += Marshal1Long(wireFormat, info.Value, bs); - rc += Marshal1Long(wireFormat, info.SessionId, bs); + rc += WriteString(info.ConnectionId, bs); + rc += Marshal1Long(wireFormat, info.Value, bs); + rc += Marshal1Long(wireFormat, info.SessionId, bs); return rc + 0; } @@ -84,9 +84,9 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); ProducerId info = (ProducerId)o; - WriteString(info.ConnectionId, dataOut, bs); - Marshal2Long(wireFormat, info.Value, dataOut, bs); - Marshal2Long(wireFormat, info.SessionId, dataOut, bs); + WriteString(info.ConnectionId, dataOut, bs); + Marshal2Long(wireFormat, info.Value, dataOut, bs); + Marshal2Long(wireFormat, info.SessionId, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs index 99403c7454..36a4cfe94d 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ProducerInfoMarshaller.cs @@ -81,9 +81,9 @@ namespace OpenWire.Client.IO ProducerInfo info = (ProducerInfo)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.ProducerId, bs); - rc += Marshal1CachedObject(wireFormat, info.Destination, bs); - rc += MarshalObjectArray(wireFormat, info.BrokerPath, bs); + rc += Marshal1CachedObject(wireFormat, info.ProducerId, bs); + rc += Marshal1CachedObject(wireFormat, info.Destination, bs); + rc += MarshalObjectArray(wireFormat, info.BrokerPath, bs); return rc + 0; } @@ -95,9 +95,9 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); ProducerInfo info = (ProducerInfo)o; - Marshal2CachedObject(wireFormat, info.ProducerId, dataOut, bs); - Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); - MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); + Marshal2CachedObject(wireFormat, info.ProducerId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); + MarshalObjectArray(wireFormat, info.BrokerPath, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/RemoveInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/RemoveInfoMarshaller.cs index ee6c9e6161..d597702960 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/RemoveInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/RemoveInfoMarshaller.cs @@ -68,7 +68,7 @@ namespace OpenWire.Client.IO RemoveInfo info = (RemoveInfo)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.ObjectId, bs); + rc += Marshal1CachedObject(wireFormat, info.ObjectId, bs); return rc + 0; } @@ -80,7 +80,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); RemoveInfo info = (RemoveInfo)o; - Marshal2CachedObject(wireFormat, info.ObjectId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.ObjectId, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/RemoveSubscriptionInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/RemoveSubscriptionInfoMarshaller.cs index 14dd782428..060ff25ec2 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/RemoveSubscriptionInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/RemoveSubscriptionInfoMarshaller.cs @@ -70,9 +70,9 @@ namespace OpenWire.Client.IO RemoveSubscriptionInfo info = (RemoveSubscriptionInfo)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.ConnectionId, bs); - rc += WriteString(info.SubcriptionName, bs); - rc += WriteString(info.ClientId, bs); + rc += Marshal1CachedObject(wireFormat, info.ConnectionId, bs); + rc += WriteString(info.SubcriptionName, bs); + rc += WriteString(info.ClientId, bs); return rc + 0; } @@ -84,9 +84,9 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); RemoveSubscriptionInfo info = (RemoveSubscriptionInfo)o; - Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); - WriteString(info.SubcriptionName, dataOut, bs); - WriteString(info.ClientId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); + WriteString(info.SubcriptionName, dataOut, bs); + WriteString(info.ClientId, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs index 012c2122c7..8ea4683fe9 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/ResponseMarshaller.cs @@ -68,7 +68,7 @@ namespace OpenWire.Client.IO Response info = (Response)o; int rc = base.Marshal1(wireFormat, info, bs); - + return rc + 1; } @@ -79,7 +79,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); Response info = (Response)o; - DataStreamMarshaller.WriteShort(info.CorrelationId, dataOut); + DataStreamMarshaller.WriteShort(info.CorrelationId, dataOut); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/SessionIdMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/SessionIdMarshaller.cs index 3d00c517a9..f74166c8f1 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/SessionIdMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/SessionIdMarshaller.cs @@ -69,8 +69,8 @@ namespace OpenWire.Client.IO SessionId info = (SessionId)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += WriteString(info.ConnectionId, bs); - rc += Marshal1Long(wireFormat, info.Value, bs); + rc += WriteString(info.ConnectionId, bs); + rc += Marshal1Long(wireFormat, info.Value, bs); return rc + 0; } @@ -82,8 +82,8 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); SessionId info = (SessionId)o; - WriteString(info.ConnectionId, dataOut, bs); - Marshal2Long(wireFormat, info.Value, dataOut, bs); + WriteString(info.ConnectionId, dataOut, bs); + Marshal2Long(wireFormat, info.Value, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/SessionInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/SessionInfoMarshaller.cs index 896d0ae710..f77051703d 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/SessionInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/SessionInfoMarshaller.cs @@ -68,7 +68,7 @@ namespace OpenWire.Client.IO SessionInfo info = (SessionInfo)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.SessionId, bs); + rc += Marshal1CachedObject(wireFormat, info.SessionId, bs); return rc + 0; } @@ -80,7 +80,7 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); SessionInfo info = (SessionInfo)o; - Marshal2CachedObject(wireFormat, info.SessionId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.SessionId, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/SubscriptionInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/SubscriptionInfoMarshaller.cs index 71e46804ea..f8fc5034f3 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/SubscriptionInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/SubscriptionInfoMarshaller.cs @@ -71,10 +71,10 @@ namespace OpenWire.Client.IO SubscriptionInfo info = (SubscriptionInfo)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += WriteString(info.ClientId, bs); - rc += Marshal1CachedObject(wireFormat, info.Destination, bs); - rc += WriteString(info.Selector, bs); - rc += WriteString(info.SubcriptionName, bs); + rc += WriteString(info.ClientId, bs); + rc += Marshal1CachedObject(wireFormat, info.Destination, bs); + rc += WriteString(info.Selector, bs); + rc += WriteString(info.SubcriptionName, bs); return rc + 0; } @@ -86,10 +86,10 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); SubscriptionInfo info = (SubscriptionInfo)o; - WriteString(info.ClientId, dataOut, bs); - Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); - WriteString(info.Selector, dataOut, bs); - WriteString(info.SubcriptionName, dataOut, bs); + WriteString(info.ClientId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.Destination, dataOut, bs); + WriteString(info.Selector, dataOut, bs); + WriteString(info.SubcriptionName, dataOut, bs); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs index 3cbf95b131..18120baeb9 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/TransactionInfoMarshaller.cs @@ -70,9 +70,9 @@ namespace OpenWire.Client.IO TransactionInfo info = (TransactionInfo)o; int rc = base.Marshal1(wireFormat, info, bs); - rc += Marshal1CachedObject(wireFormat, info.ConnectionId, bs); - rc += Marshal1CachedObject(wireFormat, info.TransactionId, bs); - + rc += Marshal1CachedObject(wireFormat, info.ConnectionId, bs); + rc += Marshal1CachedObject(wireFormat, info.TransactionId, bs); + return rc + 1; } @@ -83,9 +83,9 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); TransactionInfo info = (TransactionInfo)o; - Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); - Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs); - DataStreamMarshaller.WriteByte(info.Type, dataOut); + Marshal2CachedObject(wireFormat, info.ConnectionId, dataOut, bs); + Marshal2CachedObject(wireFormat, info.TransactionId, dataOut, bs); + DataStreamMarshaller.WriteByte(info.Type, dataOut); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/WireFormatInfoMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/WireFormatInfoMarshaller.cs index f50f908d61..8f93d48fe7 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/WireFormatInfoMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/WireFormatInfoMarshaller.cs @@ -73,10 +73,10 @@ namespace OpenWire.Client.IO WireFormatInfo info = (WireFormatInfo)o; int rc = base.Marshal1(wireFormat, info, bs); - bs.WriteBoolean(info.CacheEnabled); - bs.WriteBoolean(info.CompressionEnabled); - bs.WriteBoolean(info.StackTraceEnabled); - bs.WriteBoolean(info.TcpNoDelayEnabled); + bs.WriteBoolean(info.CacheEnabled); + bs.WriteBoolean(info.CompressionEnabled); + bs.WriteBoolean(info.StackTraceEnabled); + bs.WriteBoolean(info.TcpNoDelayEnabled); return rc + 9; } @@ -88,12 +88,12 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); WireFormatInfo info = (WireFormatInfo)o; - dataOut.Write(info.Magic, 0, 8); - DataStreamMarshaller.WriteInt(info.Version, dataOut); - bs.ReadBoolean(); - bs.ReadBoolean(); - bs.ReadBoolean(); - bs.ReadBoolean(); + dataOut.Write(info.Magic, 0, 8); + DataStreamMarshaller.WriteInt(info.Version, dataOut); + bs.ReadBoolean(); + bs.ReadBoolean(); + bs.ReadBoolean(); + bs.ReadBoolean(); } } diff --git a/openwire-dotnet/src/OpenWire.Client/IO/XATransactionIdMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/IO/XATransactionIdMarshaller.cs index 545555e09e..a6cbaf6686 100644 --- a/openwire-dotnet/src/OpenWire.Client/IO/XATransactionIdMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/IO/XATransactionIdMarshaller.cs @@ -70,9 +70,9 @@ namespace OpenWire.Client.IO XATransactionId info = (XATransactionId)o; int rc = base.Marshal1(wireFormat, info, bs); - bs.WriteBoolean(info.GlobalTransactionId!=null); + bs.WriteBoolean(info.GlobalTransactionId!=null); rc += info.GlobalTransactionId==null ? 0 : info.GlobalTransactionId.Length+4; - bs.WriteBoolean(info.BranchQualifier!=null); + bs.WriteBoolean(info.BranchQualifier!=null); rc += info.BranchQualifier==null ? 0 : info.BranchQualifier.Length+4; return rc + 1; @@ -85,12 +85,12 @@ namespace OpenWire.Client.IO base.Marshal2(wireFormat, o, dataOut, bs); XATransactionId info = (XATransactionId)o; - DataStreamMarshaller.WriteInt(info.FormatId, dataOut); - if(bs.ReadBoolean()) { + DataStreamMarshaller.WriteInt(info.FormatId, dataOut); + if(bs.ReadBoolean()) { DataStreamMarshaller.WriteInt(info.GlobalTransactionId.Length, dataOut); dataOut.Write(info.GlobalTransactionId); } - if(bs.ReadBoolean()) { + if(bs.ReadBoolean()) { DataStreamMarshaller.WriteInt(info.BranchQualifier.Length, dataOut); dataOut.Write(info.BranchQualifier); } diff --git a/openwire-dotnet/src/OpenWire.Client/ISession.cs b/openwire-dotnet/src/OpenWire.Client/ISession.cs index 216b9bdd67..b19e487df8 100755 --- a/openwire-dotnet/src/OpenWire.Client/ISession.cs +++ b/openwire-dotnet/src/OpenWire.Client/ISession.cs @@ -104,6 +104,21 @@ namespace OpenWire.Client /// Creates a new binary message with the given body /// IBytesMessage CreateBytesMessage(byte[] body); + + + // Transaction methods + + /// + /// If this is a transactional session then commit all message + /// send and acknowledgements for producers and consumers in this session + /// + void Commit(); + + /// + /// If this is a transactional session then rollback all message + /// send and acknowledgements for producers and consumers in this session + /// + void Rollback(); } } diff --git a/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs index d3d928c770..59620acb67 100755 --- a/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs +++ b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs @@ -22,10 +22,11 @@ using OpenWire.Client.Core; namespace OpenWire.Client { - public enum AckType { + public enum AckType + { DeliveredAck = 0, // Message delivered but not consumed - ConsumedAck = 1, // Message consumed, discard - PoisonAck = 2 // Message could not be processed due to poison pill but discard anyway + PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway + ConsumedAck = 2 // Message consumed, discard } @@ -40,9 +41,12 @@ namespace OpenWire.Client private AcknowledgementMode acknowledgementMode; private bool closed; private Dispatcher dispatcher = new Dispatcher(); + private int maximumRedeliveryCount = 10; + private int redeliveryTimeout = 500; public event MessageListener Listener; + public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode) { this.session = session; @@ -50,12 +54,29 @@ namespace OpenWire.Client this.acknowledgementMode = acknowledgementMode; } - public ConsumerId ConsumerId { + public ConsumerId ConsumerId + { get { return info.ConsumerId; } } - + + public int MaximumRedeliveryCount + { + get { return maximumRedeliveryCount; } + set { maximumRedeliveryCount = value; } + } + + public int RedeliveryTimeout + { + get { return redeliveryTimeout; } + set { redeliveryTimeout = value; } + } + + public void RedeliverRolledBackMessages() + { + dispatcher.RedeliverRolledBackMessages(); + } /// /// Method Dispatch @@ -65,7 +86,8 @@ namespace OpenWire.Client { dispatcher.Enqueue(message); - if (Listener != null) { + if (Listener != null) + { // lets dispatch to the thread pool for this connection for messages to be processed ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages)); } @@ -77,7 +99,7 @@ namespace OpenWire.Client return AutoAcknowledge(dispatcher.Dequeue()); } - public IMessage Receive(long timeout) + public IMessage Receive(int timeout) { CheckClosed(); return AutoAcknowledge(dispatcher.Dequeue(timeout)); @@ -102,12 +124,15 @@ namespace OpenWire.Client /// public void DispatchAsyncMessages() { - while (Listener != null) { + while (Listener != null) + { IMessage message = dispatcher.DequeueNoWait(); - if (message != null) { + if (message != null) + { Listener(message); } - else { + else + { break; } } @@ -145,7 +170,7 @@ namespace OpenWire.Client DoAcknowledge(message); } } - + protected void DoAcknowledge(Message message) { MessageAck ack = CreateMessageAck(message); @@ -163,10 +188,70 @@ namespace OpenWire.Client ack.FirstMessageId = message.MessageId; ack.LastMessageId = message.MessageId; ack.MessageCount = 1; - ack.TransactionId = message.TransactionId; + + if (session.Transacted) + { + session.DoStartTransaction(); + ack.TransactionId = session.TransactionContext.TransactionId; + session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this, message)); + } return ack; } + public void AfterRollback(ActiveMQMessage message) + { + // lets redeliver the message again + message.RedeliveryCounter += 1; + if (message.RedeliveryCounter > MaximumRedeliveryCount) + { + // lets send back a poisoned pill + MessageAck ack = new MessageAck(); + ack.AckType = (int) AckType.PoisonAck; + ack.ConsumerId = info.ConsumerId; + ack.Destination = message.Destination; + ack.FirstMessageId = message.MessageId; + ack.LastMessageId = message.MessageId; + ack.MessageCount = 1; + session.Connection.OneWay(ack); + } + else + { + dispatcher.Redeliver(message); + + if (Listener != null) + { + // lets re-dispatch the message at some point in the future + Thread.Sleep(RedeliveryTimeout); + ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages)); + } + } + } + } + + // TODO maybe there's a cleaner way of creating stateful delegates to make this code neater + class MessageConsumerSynchronization : ISynchronization + { + private MessageConsumer consumer; + private Message message; + + public MessageConsumerSynchronization(MessageConsumer consumer, Message message) + { + this.message = message; + this.consumer = consumer; + } + + public void BeforeCommit() + { + } + + public void AfterCommit() + { + } + + public void AfterRollback() + { + consumer.AfterRollback((ActiveMQMessage) message); + } } } diff --git a/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs b/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs index 3496e3eb31..0761650946 100755 --- a/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs +++ b/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs @@ -29,11 +29,11 @@ namespace OpenWire.Client private ProducerInfo info; private long messageCounter; - bool persistent; - long timeToLive; - int priority; - bool disableMessageID; - bool disableMessageTimestamp; + bool persistent; + long timeToLive; + int priority; + bool disableMessageID; + bool disableMessageTimestamp; public MessageProducer(Session session, ProducerInfo info) { @@ -59,42 +59,48 @@ namespace OpenWire.Client activeMessage.ProducerId = info.ProducerId; activeMessage.Destination = ActiveMQDestination.Transform(destination); + if (session.Transacted) + { + session.DoStartTransaction(); + activeMessage.TransactionId = session.TransactionContext.TransactionId; + } + session.DoSend(destination, message); } public void Dispose() { - session.DisposeOf(info.ProducerId); + session.Connection.DisposeOf(info.ProducerId); } - - public bool Persistent - { - get { return persistent; } - set { this.persistent = value; } - } - - public long TimeToLive - { - get { return timeToLive; } - set { this.timeToLive = value; } -} - public int Priority - { - get { return priority; } - set { this.priority = value; } - } - - public bool DisableMessageID - { - get { return disableMessageID; } - set { this.disableMessageID = value; } - } - - public bool DisableMessageTimestamp - { - get { return disableMessageTimestamp; } - set { this.disableMessageTimestamp = value; } - } - + + public bool Persistent + { + get { return persistent; } + set { this.persistent = value; } + } + + public long TimeToLive + { + get { return timeToLive; } + set { this.timeToLive = value; } + } + public int Priority + { + get { return priority; } + set { this.priority = value; } + } + + public bool DisableMessageID + { + get { return disableMessageID; } + set { this.disableMessageID = value; } + } + + public bool DisableMessageTimestamp + { + get { return disableMessageTimestamp; } + set { this.disableMessageTimestamp = value; } + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Session.cs b/openwire-dotnet/src/OpenWire.Client/Session.cs index 01aad0474c..7f3e2dc10f 100755 --- a/openwire-dotnet/src/OpenWire.Client/Session.cs +++ b/openwire-dotnet/src/OpenWire.Client/Session.cs @@ -33,17 +33,20 @@ namespace OpenWire.Client private long producerCounter; private int prefetchSize = 1000; private IDictionary consumers = new Hashtable(); + private TransactionContext transactionContext; public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode) { this.connection = connection; this.info = info; this.acknowledgementMode = acknowledgementMode; + transactionContext = new TransactionContext(this); } + public void Dispose() { - DisposeOf(info.SessionId); + connection.DisposeOf(info.SessionId); } public IMessageProducer CreateProducer() @@ -174,13 +177,52 @@ namespace OpenWire.Client return answer; } + public void Commit() + { + if (! Transacted) + { + throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); + } + transactionContext.Commit(); + } + + public void Rollback() + { + if (! Transacted) + { + throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); + } + transactionContext.Rollback(); + + // lets ensure all the consumers redeliver any rolled back messages + foreach (MessageConsumer consumer in consumers.Values) + { + consumer.RedeliverRolledBackMessages(); + } + } + + // Properties + public Connection Connection { - get { - return connection; - } + get { return connection; } + } + + public SessionId SessionId + { + get { return info.SessionId; } + } + + public bool Transacted + { + get { return acknowledgementMode == AcknowledgementMode.Transactional; } + } + + public TransactionContext TransactionContext + { + get { return transactionContext; } } // Implementation methods @@ -191,21 +233,22 @@ namespace OpenWire.Client connection.SyncRequest(command); } - public void DisposeOf(DataStructure objectId) + /// + /// Ensures that a transaction is started + /// + public void DoStartTransaction() { - // TODO dispose of all the session first? - RemoveInfo command = new RemoveInfo(); - command.ObjectId = objectId; - connection.SyncRequest(command); + if (Transacted) + { + transactionContext.Begin(); + } } public void DisposeOf(ConsumerId objectId) { consumers.Remove(objectId); connection.RemoveConsumer(objectId); - RemoveInfo command = new RemoveInfo(); - command.ObjectId = objectId; - connection.SyncRequest(command); + connection.DisposeOf(objectId); } public void DispatchAsyncMessages(object state) diff --git a/openwire-dotnet/tests/OpenWire.Client/ConsumerTest.cs b/openwire-dotnet/tests/OpenWire.Client/ConsumerTest.cs index 12cfc70d69..239485da49 100755 --- a/openwire-dotnet/tests/OpenWire.Client/ConsumerTest.cs +++ b/openwire-dotnet/tests/OpenWire.Client/ConsumerTest.cs @@ -27,65 +27,65 @@ namespace OpenWire.Client [TestFixture] public class ConsumerTest : TestSupport { - IConnectionFactory factory; - IConnection connection; - IDestination destination; + IConnectionFactory factory; + IConnection connection; + IDestination destination; - [SetUp] - protected void SetUp() - { - factory = new ConnectionFactory("localhost", 61616); - connection = factory.CreateConnection(); - } + [SetUp] + protected void SetUp() + { + factory = new ConnectionFactory("localhost", 61616); + connection = factory.CreateConnection(); + } - [TearDown] - protected void TearDown() - { - connection.Dispose(); - } + [TearDown] + protected void TearDown() + { + connection.Dispose(); + } [Test] - [Ignore("Not fully implemented yet.")] - public void testDurableConsumerSelectorChange() - { + [Ignore("Not fully implemented yet.")] + public void testDurableConsumerSelectorChange() + { - // Receive a message with the JMS API - connection.ClientId="test"; - connection.Start(); - - ISession session = connection.CreateSession(false, AcknowledgementMode.AutoAcknowledge); - destination = session.GetTopic("foo"); - IMessageProducer producer = session.CreateProducer(destination); - producer.Persistent = true; - IMessageConsumer consumer = session.CreateDurableConsumer((ITopic)destination, "test", "color='red'", false); + // Receive a message with the JMS API + connection.ClientId="test"; + connection.Start(); + + ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); + destination = session.GetTopic("foo"); + IMessageProducer producer = session.CreateProducer(destination); + producer.Persistent = true; + IMessageConsumer consumer = session.CreateDurableConsumer((ITopic)destination, "test", "color='red'", false); - // Send the messages - ITextMessage message = session.CreateTextMessage("1st"); - //message.SetStringProperty("color", "red"); - producer.Send(message); - - IMessage m = consumer.Receive(1000); - Assert.IsNotNull(m); - Assert.AreEqual("1st", ((ITextMessage)m).Text ); + // Send the messages + ITextMessage message = session.CreateTextMessage("1st"); + //message.SetStringProperty("color", "red"); + producer.Send(message); + + IMessage m = consumer.Receive(1000); + Assert.IsNotNull(m); + Assert.AreEqual("1st", ((ITextMessage)m).Text ); - // Change the subscription. - consumer.Dispose(); - consumer = session.CreateDurableConsumer((ITopic)destination, "test", "color='blue'", false); - - message = session.CreateTextMessage("2nd"); - // message.setStringProperty("color", "red"); - producer.Send(message); - message = session.CreateTextMessage("3rd"); - // message.setStringProperty("color", "blue"); - producer.Send(message); + // Change the subscription. + consumer.Dispose(); + consumer = session.CreateDurableConsumer((ITopic)destination, "test", "color='blue'", false); + + message = session.CreateTextMessage("2nd"); + // message.setStringProperty("color", "red"); + producer.Send(message); + message = session.CreateTextMessage("3rd"); + // message.setStringProperty("color", "blue"); + producer.Send(message); - // Selector should skip the 2nd message. - m = consumer.Receive(1000); - Assert.IsNotNull(m); - Assert.AreEqual("3rd", ((ITextMessage)m).Text); - - Assert.IsNull(consumer.ReceiveNoWait()); - } + // Selector should skip the 2nd message. + m = consumer.Receive(1000); + Assert.IsNotNull(m); + Assert.AreEqual("3rd", ((ITextMessage)m).Text); + + Assert.IsNull(consumer.ReceiveNoWait()); + } } } diff --git a/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs b/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs index 05a23851ef..1e82ffa7c2 100644 --- a/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs +++ b/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs @@ -73,12 +73,18 @@ namespace OpenWire.Client protected virtual IDestination CreateDestination(ISession session) { - string name = "Test.DotNet." + GetType().Name; + string name = CreateDestinationName(); IDestination destination = session.GetQueue(name); Console.WriteLine("Using queue: " + destination); return destination; } + + protected virtual string CreateDestinationName() + { + string name = "Test.DotNet." + GetType().Name; + return name; + } protected virtual IMessage CreateMessage(ISession session) { diff --git a/openwire-dotnet/tests/OpenWire.Client/TransactionTest.cs b/openwire-dotnet/tests/OpenWire.Client/TransactionTest.cs new file mode 100644 index 0000000000..2b26bbb3d6 --- /dev/null +++ b/openwire-dotnet/tests/OpenWire.Client/TransactionTest.cs @@ -0,0 +1,290 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.IO; + +using NUnit.Framework; + +using OpenWire.Client; +using OpenWire.Client.Core; +using System.Collections; + +namespace OpenWire.Client +{ + [TestFixture] + public class TransactionTest : TestSupport + { + private static int destinationCounter; + + IDestination destination; + IConnection connection; + ISession session; + IMessageProducer producer; + IMessageConsumer consumer; + + [SetUp] + public void SetUp() + { + Connect(); + + // lets consume any outstanding messages from previous test runs + while (consumer.Receive(1000) != null) + { + } + session.Commit(); + } + + + + [TearDown] + public void TearDown() + { + Disconnect(); + } + + [Test] + public void TestSendRollback() + { + IMessage[] outbound = new IMessage[]{ + session.CreateTextMessage("First Message"), + session.CreateTextMessage("Second Message") + }; + + //sends a message + producer.Send(outbound[0]); + session.Commit(); + + //sends a message that gets rollbacked + producer.Send(session.CreateTextMessage("I'm going to get rolled back.")); + session.Rollback(); + + //sends a message + producer.Send(outbound[1]); + session.Commit(); + + //receives the first message + ArrayList messages = new ArrayList(); + Console.WriteLine("About to consume message 1"); + IMessage message = consumer.Receive(1000); + messages.Add(message); + Console.WriteLine("Received: " + message); + + //receives the second message + Console.WriteLine("About to consume message 2"); + message = consumer.Receive(4000); + messages.Add(message); + Console.WriteLine("Received: " + message); + + //validates that the rollbacked was not consumed + session.Commit(); + IMessage[] inbound = new IMessage[messages.Count]; + messages.CopyTo(inbound); + AssertTextMessagesEqual("Rollback did not work.", outbound, inbound); + } + + [Test] + public void TestSendSessionClose() + { + IMessage[] outbound = new IMessage[]{ + session.CreateTextMessage("First Message"), + session.CreateTextMessage("Second Message") + }; + + //sends a message + producer.Send(outbound[0]); + session.Commit(); + + //sends a message that gets rollbacked + producer.Send(session.CreateTextMessage("I'm going to get rolled back.")); + consumer.Dispose(); + session.Dispose(); + + Reconnect(); + + //sends a message + producer.Send(outbound[1]); + session.Commit(); + + //receives the first message + ArrayList messages = new ArrayList(); + Console.WriteLine("About to consume message 1"); + IMessage message = consumer.Receive(1000); + messages.Add(message); + Console.WriteLine("Received: " + message); + + //receives the second message + Console.WriteLine("About to consume message 2"); + message = consumer.Receive(4000); + messages.Add(message); + Console.WriteLine("Received: " + message); + + //validates that the rollbacked was not consumed + session.Commit(); + IMessage[] inbound = new IMessage[messages.Count]; + messages.CopyTo(inbound); + AssertTextMessagesEqual("Rollback did not work.", outbound, inbound); + } + + [Test] + public void TestReceiveRollback() + { + IMessage[] outbound = new IMessage[]{ + session.CreateTextMessage("First Message"), + session.CreateTextMessage("Second Message") + }; + + //sent both messages + producer.Send(outbound[0]); + producer.Send(outbound[1]); + session.Commit(); + + Console.WriteLine("Sent 0: " + outbound[0]); + Console.WriteLine("Sent 1: " + outbound[1]); + + ArrayList messages = new ArrayList(); + IMessage message = consumer.Receive(1000); + messages.Add(message); + Assert.AreEqual(outbound[0], message); + session.Commit(); + + // rollback so we can get that last message again. + message = consumer.Receive(1000); + Assert.IsNotNull(message); + Assert.AreEqual(outbound[1], message); + session.Rollback(); + + // Consume again.. the previous message should + // get redelivered. + message = consumer.Receive(5000); + Assert.IsNotNull(message, "Should have re-received the message again!"); + messages.Add(message); + session.Commit(); + + IMessage[] inbound = new IMessage[messages.Count]; + messages.CopyTo(inbound); + AssertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + + + [Test] + public void TestReceiveTwoThenRollback() + { + IMessage[] outbound = new IMessage[]{ + session.CreateTextMessage("First Message"), + session.CreateTextMessage("Second Message") + }; + + producer.Send(outbound[0]); + producer.Send(outbound[1]); + session.Commit(); + + Console.WriteLine("Sent 0: " + outbound[0]); + Console.WriteLine("Sent 1: " + outbound[1]); + + ArrayList messages = new ArrayList(); + IMessage message = consumer.Receive(1000); + AssertTextMessageEqual("first mesage received before rollback", outbound[0], message); + + message = consumer.Receive(1000); + Assert.IsNotNull(message); + AssertTextMessageEqual("second message received before rollback", outbound[1], message); + session.Rollback(); + + // Consume again.. the previous message should + // get redelivered. + message = consumer.Receive(5000); + Assert.IsNotNull(message, "Should have re-received the first message again!"); + messages.Add(message); + AssertTextMessageEqual("first message received after rollback", outbound[0], message); + + message = consumer.Receive(5000); + Assert.IsNotNull(message, "Should have re-received the second message again!"); + messages.Add(message); + AssertTextMessageEqual("second message received after rollback", outbound[1], message); + + Assert.IsNull(consumer.ReceiveNoWait()); + session.Commit(); + + IMessage[] inbound = new IMessage[messages.Count]; + messages.CopyTo(inbound); + AssertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + + protected override string CreateDestinationName() + { + // TODO - how can we get the test name? + return base.CreateDestinationName() + (++destinationCounter); + } + + protected void AssertTextMessagesEqual(String message, IMessage[] expected, IMessage[] actual) + { + Assert.AreEqual(expected.Length, actual.Length, "Incorrect number of messages received"); + + for (int i = 0; i < expected.Length; i++) + { + AssertTextMessageEqual(message + ". Index: " + i, expected[i], actual[i]); + } + } + + protected void AssertTextMessageEqual(String message, IMessage expected, IMessage actual) + { + Assert.IsTrue(expected is ITextMessage, "expected object not a text message"); + Assert.IsTrue(actual is ITextMessage, "actual object not a text message"); + + String expectedText = ((ITextMessage) expected).Text; + String actualText = ((ITextMessage) actual).Text; + + Assert.AreEqual(expectedText, actualText, message); + } + + protected void Connect() + { + IConnectionFactory factory = new ConnectionFactory("localhost", 61616); + + connection = factory.CreateConnection(); + + session = connection.CreateSession(AcknowledgementMode.Transactional); + + // reuse the same destination if we reconnect + if (destination == null) + { + destination = CreateDestination(session); + } + + consumer = session.CreateConsumer(destination); + + producer = session.CreateProducer(destination); + } + + + protected void Disconnect() + { + if (connection != null) + { + connection.Dispose(); + connection = null; + } + } + + protected void Reconnect() + { + Disconnect(); + Connect(); + } + + } +}