diff --git a/openwire-dotnet/activemq.csproj b/openwire-dotnet/activemq.csproj new file mode 100755 index 0000000000..62ba6fc6a0 --- /dev/null +++ b/openwire-dotnet/activemq.csproj @@ -0,0 +1,800 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/openwire-dotnet/activemq.sln b/openwire-dotnet/activemq.sln new file mode 100755 index 0000000000..f25ad1f3c6 --- /dev/null +++ b/openwire-dotnet/activemq.sln @@ -0,0 +1,30 @@ +Microsoft Visual Studio Solution File, Format Version 8.00 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "activemq", "activemq.csproj", "{E8825D2E-4EC5-43D3-9FAC-F16055294F49}" + ProjectSection(ProjectDependencies) = postProject + EndProjectSection +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "tests", "tests.csproj", "{3E5FA6F9-08E6-4F0D-86D9-77562D14F8AD}" + ProjectSection(ProjectDependencies) = postProject + {E8825D2E-4EC5-43D3-9FAC-F16055294F49} = {E8825D2E-4EC5-43D3-9FAC-F16055294F49} + EndProjectSection +EndProject +Global + GlobalSection(SolutionConfiguration) = preSolution + Debug = Debug + Release = Release + EndGlobalSection + GlobalSection(ProjectConfiguration) = postSolution + {E8825D2E-4EC5-43D3-9FAC-F16055294F49}.Debug.ActiveCfg = Debug|.NET + {E8825D2E-4EC5-43D3-9FAC-F16055294F49}.Debug.Build.0 = Debug|.NET + {E8825D2E-4EC5-43D3-9FAC-F16055294F49}.Release.ActiveCfg = Release|.NET + {E8825D2E-4EC5-43D3-9FAC-F16055294F49}.Release.Build.0 = Release|.NET + {3E5FA6F9-08E6-4F0D-86D9-77562D14F8AD}.Debug.ActiveCfg = Debug|.NET + {3E5FA6F9-08E6-4F0D-86D9-77562D14F8AD}.Debug.Build.0 = Debug|.NET + {3E5FA6F9-08E6-4F0D-86D9-77562D14F8AD}.Release.ActiveCfg = Release|.NET + {3E5FA6F9-08E6-4F0D-86D9-77562D14F8AD}.Release.Build.0 = Release|.NET + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + EndGlobalSection + GlobalSection(ExtensibilityAddIns) = postSolution + EndGlobalSection +EndGlobal diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/Message.cs b/openwire-dotnet/src/OpenWire.Client/Commands/Message.cs index 8f9e3e3a0e..2652563349 100644 --- a/openwire-dotnet/src/OpenWire.Client/Commands/Message.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/Message.cs @@ -1,19 +1,19 @@ /* -* Copyright 2006 The Apache Software Foundation or its licensors, as -* applicable. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ using System; using System.Collections; diff --git a/openwire-dotnet/src/OpenWire.Client/Connection.cs b/openwire-dotnet/src/OpenWire.Client/Connection.cs index 9ed59c8c9a..502c918d07 100755 --- a/openwire-dotnet/src/OpenWire.Client/Connection.cs +++ b/openwire-dotnet/src/OpenWire.Client/Connection.cs @@ -33,7 +33,20 @@ namespace OpenWire.Client this.transport.Command += new CommandHandler(OnCommand); this.transport.Start(); } + + /// + /// Starts message delivery for this connection. + /// + public void Start() + { + } + /// + /// Stop message delivery for this connection. + /// + public void Stop() + { + } /// /// Creates a new session to work on this connection diff --git a/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs b/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs index 3c59059083..703a25feaf 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/SocketTransport.cs @@ -109,17 +109,17 @@ namespace OpenWire.Client.Core { command = (Command) wireformat.Unmarshal(socketReader); } - catch (EndOfStreamException e) + catch (EndOfStreamException) { // stream closed break; } - catch (ObjectDisposedException e) + catch (ObjectDisposedException) { // stream closed break; } - catch (IOException e) + catch (IOException) { // error, assume closing break; diff --git a/openwire-dotnet/src/OpenWire.Client/IConnection.cs b/openwire-dotnet/src/OpenWire.Client/IConnection.cs index b44d49726e..54931745fa 100755 --- a/openwire-dotnet/src/OpenWire.Client/IConnection.cs +++ b/openwire-dotnet/src/OpenWire.Client/IConnection.cs @@ -11,7 +11,7 @@ namespace OpenWire.Client { /// /// Represents a connection with a message broker /// - public interface IConnection : IDisposable { + public interface IConnection : IDisposable, IStartable, IStopable { /// /// Creates a new session to work on this connection @@ -34,6 +34,14 @@ namespace OpenWire.Client { AcknowledgementMode AcknowledgementMode { get; set; - } - } + } + + String ClientId + { + get; + set; + } + + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/IMessage.cs b/openwire-dotnet/src/OpenWire.Client/IMessage.cs index 5b2cb75bd5..23c77811e4 100755 --- a/openwire-dotnet/src/OpenWire.Client/IMessage.cs +++ b/openwire-dotnet/src/OpenWire.Client/IMessage.cs @@ -2,13 +2,15 @@ using System; using OpenWire.Client.Commands; namespace OpenWire.Client { - /// - /// Represents a message either to be sent to a message broker or received from a message broker - /// - public interface IMessage { + /// + /// Represents a message either to be sent to a message broker or received from a message broker + /// + public interface IMessage { - IDestination FromDestination { - get; - } - } + IDestination FromDestination { + get; + } + + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs index 74089be473..f9e4c55587 100755 --- a/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs +++ b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs @@ -11,10 +11,15 @@ namespace OpenWire.Client public interface IMessageConsumer : IDisposable { - /// - /// Waits until a message is available and returns it + /// + /// Waits until a message is available and returns it + /// + IMessage Receive(); + + /// + /// If a message is available within the timeout duration it is returned otherwise this method returns null /// - IMessage Receive(); + IMessage Receive(long timeout); /// /// If a message is available immediately it is returned otherwise this method returns null diff --git a/openwire-dotnet/src/OpenWire.Client/IMessageProducer.cs b/openwire-dotnet/src/OpenWire.Client/IMessageProducer.cs index 8ca6b6a019..ba0c3df07b 100755 --- a/openwire-dotnet/src/OpenWire.Client/IMessageProducer.cs +++ b/openwire-dotnet/src/OpenWire.Client/IMessageProducer.cs @@ -2,19 +2,50 @@ using System; using OpenWire.Client.Commands; namespace OpenWire.Client { + /// + /// An object capable of sending messages to some destination + /// + public interface IMessageProducer : IDisposable { + /// - /// An object capable of sending messages to some destination + /// Sends the message to the default destination for this producer /// - public interface IMessageProducer : IDisposable { + void Send(IMessage message); - /// - /// Sends the message to the default destination for this producer - /// - void Send(IMessage message); + /// + /// Sends the message to the given destination + /// + void Send(IDestination destination, IMessage message); - /// - /// Sends the message to the given destination - /// - void Send(IDestination destination, IMessage message); - } + bool Persistent + { + get; + set; + } + + long TimeToLive + { + get; + set; + } + + int Priority + { + get; + set; + } + + bool DisableMessageID + { + get; + set; + } + + bool DisableMessageTimestamp + { + get; + set; + } + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/ISession.cs b/openwire-dotnet/src/OpenWire.Client/ISession.cs index 3383ee41cf..ef7db89f57 100755 --- a/openwire-dotnet/src/OpenWire.Client/ISession.cs +++ b/openwire-dotnet/src/OpenWire.Client/ISession.cs @@ -2,57 +2,60 @@ using System; using OpenWire.Client.Commands; namespace OpenWire.Client { + /// + /// Represents a single unit of work on an IConnection. + /// So the ISession can be used to perform transactional receive and sends + /// + public interface ISession : IDisposable { + /// - /// Represents a single unit of work on an IConnection. - /// So the ISession can be used to perform transactional receive and sends + /// Creates a producer of messages /// - public interface ISession : IDisposable { + IMessageProducer CreateProducer(); + /// + /// Creates a producer of messages on a given destination + /// + IMessageProducer CreateProducer(IDestination destination); - /// - /// Creates a producer of messages - /// - IMessageProducer CreateProducer(); + /// + /// Creates a consumer of messages on a given destination + /// + IMessageConsumer CreateConsumer(IDestination destination); - /// - /// Creates a producer of messages on a given destination - /// - IMessageProducer CreateProducer(IDestination destination); + /// + /// Creates a consumer of messages on a given destination with a selector + /// + IMessageConsumer CreateConsumer(IDestination destination, string selector); - /// - /// Creates a cpmsi,er of messages on a given destination - /// - IMessageConsumer CreateConsumer(IDestination destination); - - /// - /// Creates a cpmsi,er of messages on a given destination with a selector - /// - IMessageConsumer CreateConsumer(IDestination destination, string selector); - - /// - /// Returns the queue for the given name - /// - IQueue GetQueue(string name); - - /// - /// Returns the topic for the given name - /// - ITopic GetTopic(string name); - + /// + /// Creates a named durable consumer of messages on a given destination with a selector + /// + IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal); + + /// + /// Returns the queue for the given name + /// + IQueue GetQueue(string name); - /// - /// Creates a new message with an empty body - /// - IMessage CreateMessage(); - - /// - /// Creates a new text message with an empty body - /// - ITextMessage CreateTextMessage(); - - /// - /// Creates a new text message with the given body - /// - ITextMessage CreateTextMessage(string text); - } + /// + /// Returns the topic for the given name + /// + ITopic GetTopic(string name); + + /// + /// Creates a new message with an empty body + /// + IMessage CreateMessage(); + + /// + /// Creates a new text message with an empty body + /// + ITextMessage CreateTextMessage(); + + /// + /// Creates a new text message with the given body + /// + ITextMessage CreateTextMessage(string text); + } } diff --git a/openwire-dotnet/src/OpenWire.Client/IStopable.cs b/openwire-dotnet/src/OpenWire.Client/IStopable.cs new file mode 100755 index 0000000000..c67c4c98c6 --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/IStopable.cs @@ -0,0 +1,7 @@ +namespace OpenWire.Client +{ + public interface IStopable + { + void Stop(); + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs b/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs index 59129f8b08..622df3feb2 100755 --- a/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs +++ b/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs @@ -12,8 +12,13 @@ namespace OpenWire.Client private Session session; private ProducerInfo info; private long messageCounter; - - + + bool persistent; + long timeToLive; + int priority; + bool disableMessageID; + bool disableMessageTimestamp; + public MessageProducer(Session session, ProducerInfo info) { this.session = session; @@ -45,5 +50,36 @@ namespace OpenWire.Client { session.DisposeOf(info.ProducerId); } + + public bool Persistent + { + get { return persistent; } + set { this.persistent = value; } + } + + public long TimeToLive + { + get { return timeToLive; } + set { this.timeToLive = value; } + } + + public int Priority + { + get { return priority; } + set { this.priority = value; } + } + + public bool DisableMessageID + { + get { return disableMessageID; } + set { this.disableMessageID = value; } + } + + public bool DisableMessageTimestamp + { + get { return disableMessageTimestamp; } + set { this.disableMessageTimestamp = value; } + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Session.cs b/openwire-dotnet/src/OpenWire.Client/Session.cs index 941a4cf808..f39fc7ef5d 100755 --- a/openwire-dotnet/src/OpenWire.Client/Session.cs +++ b/openwire-dotnet/src/OpenWire.Client/Session.cs @@ -10,7 +10,7 @@ namespace OpenWire.Client public class Session : ISession { private Connection connection; - private AcknowledgementMode acknowledgementMode; + private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; private SessionInfo info; private long consumerCounter; private long producerCounter; @@ -75,6 +75,29 @@ namespace OpenWire.Client } } + public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) + { + ConsumerInfo command = CreateConsumerInfo(destination, selector); + ConsumerId consumerId = command.ConsumerId; + command.SubcriptionName = name; + command.NoLocal = noLocal; + + try + { + MessageConsumer consumer = new MessageConsumer(this, command); + // lets register the consumer first in case we start dispatching messages immediately + connection.AddConsumer(consumerId, consumer); + + connection.SyncRequest(command); + return consumer; + } + catch (Exception e) + { + connection.RemoveConsumer(consumerId); + throw e; + } + } + public IQueue GetQueue(string name) { return new ActiveMQQueue(name); diff --git a/openwire-dotnet/tests.csproj b/openwire-dotnet/tests.csproj new file mode 100755 index 0000000000..5a15beb15c --- /dev/null +++ b/openwire-dotnet/tests.csproj @@ -0,0 +1,131 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/openwire-dotnet/tests/OpenWire.Client/ConsumerTest.cs b/openwire-dotnet/tests/OpenWire.Client/ConsumerTest.cs new file mode 100755 index 0000000000..bea9c7cfb2 --- /dev/null +++ b/openwire-dotnet/tests/OpenWire.Client/ConsumerTest.cs @@ -0,0 +1,76 @@ +using System; +using System.IO; + +using NUnit.Framework; + +using OpenWire.Client; +using OpenWire.Client.Core; + +namespace OpenWire.Client +{ + [TestFixture] + public class ConsumerTest : TestSupport + { + IConnectionFactory factory; + IConnection connection; + IDestination destination; + + [SetUp] + protected void SetUp() + { + factory = new ConnectionFactory("localhost", 61616); + connection = factory.CreateConnection(); + } + + [TearDown] + protected void TearDown() + { + connection.Dispose(); + } + + [Test] + [Ignore("Not fully implemented yet.")] + public void testDurableConsumerSelectorChange() + { + + // Receive a message with the JMS API + connection.ClientId="test"; + connection.Start(); + + ISession session = connection.CreateSession(false, AcknowledgementMode.AutoAcknowledge); + destination = session.GetTopic("foo"); + IMessageProducer producer = session.CreateProducer(destination); + producer.Persistent = true; + IMessageConsumer consumer = session.CreateDurableConsumer((ITopic)destination, "test", "color='red'", false); + + // Send the messages + ITextMessage message = session.CreateTextMessage("1st"); + //message.SetStringProperty("color", "red"); + producer.Send(message); + + IMessage m = consumer.Receive(1000); + Assert.IsNotNull(m); + Assert.AreEqual("1st", ((ITextMessage)m).Text ); + + // Change the subscription. + consumer.Dispose(); + consumer = session.CreateDurableConsumer((ITopic)destination, "test", "color='blue'", false); + + message = session.CreateTextMessage("2nd"); + // message.setStringProperty("color", "red"); + producer.Send(message); + message = session.CreateTextMessage("3rd"); + // message.setStringProperty("color", "blue"); + producer.Send(message); + + // Selector should skip the 2nd message. + m = consumer.Receive(1000); + Assert.IsNotNull(m); + Assert.AreEqual("3rd", ((ITextMessage)m).Text); + + Assert.IsNull(consumer.ReceiveNoWait()); + } + + } +} + diff --git a/openwire-dotnet/tests/OpenWire.Client/EndianTest.cs b/openwire-dotnet/tests/OpenWire.Client/EndianTest.cs index c0049fbcd0..932e8cb7e4 100644 --- a/openwire-dotnet/tests/OpenWire.Client/EndianTest.cs +++ b/openwire-dotnet/tests/OpenWire.Client/EndianTest.cs @@ -11,7 +11,7 @@ namespace openwire_dotnet [Test] public void TestLongEndian() { - long value = 0x0102030405060708l; + long value = 0x0102030405060708L; long newValue = DataStreamMarshaller.SwitchEndian(value);