From 8f8bfee0be58f4c4665a52496db2903ae26991d0 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Sun, 5 Mar 2006 12:05:53 +0000 Subject: [PATCH] Implemented the filtered layer concept similar to what we have with the Java implemenation. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383309 13f79535-47bb-0310-9956-ffa450edef68 --- openwire-dotnet/src/ActiveMQ/Connection.cs | 10 +- .../src/ActiveMQ/ConnectionFactory.cs | 29 +-- .../src/ActiveMQ/Transport/ITransport.cs | 25 +- .../ActiveMQ/Transport/ITransportFactory.cs | 26 ++ .../ActiveMQ/Transport/LoggingTransport.cs | 50 ++++ .../src/ActiveMQ/Transport/MutexTransport.cs | 70 ++++++ .../ActiveMQ/Transport/ResponseCorrelator.cs | 104 ++++++++ .../src/ActiveMQ/Transport/SocketTransport.cs | 227 ------------------ .../ActiveMQ/Transport/Tcp/TcpTransport.cs | 145 +++++++++++ .../Transport/Tcp/TcpTransportFactory.cs | 62 +++++ .../src/ActiveMQ/Transport/TransportFilter.cs | 109 +++++++++ .../src/JMS/JMSConnectionException.cs | 32 +++ openwire-dotnet/src/JMS/JMSException.cs | 32 +++ openwire-dotnet/src/src.csproj | 8 +- openwire-dotnet/tests/ActiveMQ/TestMain.cs | 2 +- openwire-dotnet/tests/JMS/JMSTestSupport.cs | 2 +- 16 files changed, 668 insertions(+), 265 deletions(-) create mode 100644 openwire-dotnet/src/ActiveMQ/Transport/ITransportFactory.cs create mode 100644 openwire-dotnet/src/ActiveMQ/Transport/LoggingTransport.cs create mode 100644 openwire-dotnet/src/ActiveMQ/Transport/MutexTransport.cs create mode 100644 openwire-dotnet/src/ActiveMQ/Transport/ResponseCorrelator.cs delete mode 100755 openwire-dotnet/src/ActiveMQ/Transport/SocketTransport.cs create mode 100644 openwire-dotnet/src/ActiveMQ/Transport/Tcp/TcpTransport.cs create mode 100644 openwire-dotnet/src/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs create mode 100644 openwire-dotnet/src/ActiveMQ/Transport/TransportFilter.cs create mode 100644 openwire-dotnet/src/JMS/JMSConnectionException.cs create mode 100644 openwire-dotnet/src/JMS/JMSException.cs diff --git a/openwire-dotnet/src/ActiveMQ/Connection.cs b/openwire-dotnet/src/ActiveMQ/Connection.cs index c0fcaca4bc..9e481ffe0b 100755 --- a/openwire-dotnet/src/ActiveMQ/Connection.cs +++ b/openwire-dotnet/src/ActiveMQ/Connection.cs @@ -29,7 +29,8 @@ namespace ActiveMQ { this.transport = transport; this.info = info; - this.transport.Command += new CommandHandler(OnCommand); + this.transport.Command = new CommandHandler(OnCommand); + this.transport.Exception = new ExceptionHandler(OnException); this.transport.Start(); } @@ -255,9 +256,14 @@ namespace ActiveMQ } else { - Console.WriteLine("ERROR:ÊUnknown command: " + command); + Console.WriteLine("ERROR: Unknown command: " + command); } } + + protected void OnException(ITransport sender, Exception exception) { + Console.WriteLine("ERROR: Transport Exception: " + exception); + } + protected SessionInfo CreateSessionInfo(AcknowledgementMode acknowledgementMode) { diff --git a/openwire-dotnet/src/ActiveMQ/ConnectionFactory.cs b/openwire-dotnet/src/ActiveMQ/ConnectionFactory.cs index e78fd7d3b3..fe01636ec0 100755 --- a/openwire-dotnet/src/ActiveMQ/ConnectionFactory.cs +++ b/openwire-dotnet/src/ActiveMQ/ConnectionFactory.cs @@ -16,6 +16,7 @@ */ using ActiveMQ.Commands; using ActiveMQ.Transport; +using ActiveMQ.Transport.Tcp; using JMS; using System; @@ -26,8 +27,7 @@ namespace ActiveMQ /// public class ConnectionFactory : IConnectionFactory { - private string host = "localhost"; - private int port = 61616; + private Uri brokerUri = new Uri("tcp://localhost:61616"); private string userName; private string password; private string clientId; @@ -36,10 +36,9 @@ namespace ActiveMQ { } - public ConnectionFactory(string host, int port) + public ConnectionFactory(Uri brokerUri) { - this.host = host; - this.port = port; + this.brokerUri=brokerUri; } public IConnection CreateConnection() @@ -50,7 +49,7 @@ namespace ActiveMQ public IConnection CreateConnection(string userName, string password) { ConnectionInfo info = CreateConnectionInfo(userName, password); - ITransport transport = CreateTransport(); + ITransport transport = new TcpTransportFactory().CreateTransport(brokerUri); Connection connection = new Connection(transport, info); connection.ClientId = info.ClientId; return connection; @@ -58,18 +57,12 @@ namespace ActiveMQ // Properties - public string Host + public Uri BrokerUri { - get { return host; } - set { host = value; } + get { return brokerUri; } + set { brokerUri = value; } } - - public int Port - { - get { return port; } - set { port = value; } - } - + public string UserName { get { return userName; } @@ -112,9 +105,5 @@ namespace ActiveMQ return Guid.NewGuid().ToString(); } - protected ITransport CreateTransport() - { - return new SocketTransport(host, port); - } } } diff --git a/openwire-dotnet/src/ActiveMQ/Transport/ITransport.cs b/openwire-dotnet/src/ActiveMQ/Transport/ITransport.cs index 77179149d1..20c1a2f9c6 100755 --- a/openwire-dotnet/src/ActiveMQ/Transport/ITransport.cs +++ b/openwire-dotnet/src/ActiveMQ/Transport/ITransport.cs @@ -19,22 +19,14 @@ using ActiveMQ.Transport; using JMS; using System; - - -namespace ActiveMQ.Transport -{ - public delegate void CommandHandler(ITransport sender, Command command); -} -namespace ActiveMQ.Transport -{ - public delegate void ExceptionHandler(ITransport sender, Exception command); -} - /// /// Represents the logical networking transport layer. /// namespace ActiveMQ.Transport { + public delegate void CommandHandler(ITransport sender, Command command); + public delegate void ExceptionHandler(ITransport sender, Exception command); + public interface ITransport : IStartable, IDisposable { void Oneway(Command command); @@ -43,8 +35,15 @@ namespace ActiveMQ.Transport Response Request(Command command); - event CommandHandler Command; - event ExceptionHandler Exception; + CommandHandler Command{ + get; + set; + } + + ExceptionHandler Exception{ + get; + set; + } } } diff --git a/openwire-dotnet/src/ActiveMQ/Transport/ITransportFactory.cs b/openwire-dotnet/src/ActiveMQ/Transport/ITransportFactory.cs new file mode 100644 index 0000000000..6fa23fd6d9 --- /dev/null +++ b/openwire-dotnet/src/ActiveMQ/Transport/ITransportFactory.cs @@ -0,0 +1,26 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; + +namespace ActiveMQ.Transport +{ + public interface ITransportFactory + { + ITransport CreateTransport(Uri location); + } +} diff --git a/openwire-dotnet/src/ActiveMQ/Transport/LoggingTransport.cs b/openwire-dotnet/src/ActiveMQ/Transport/LoggingTransport.cs new file mode 100644 index 0000000000..d71aa7cb74 --- /dev/null +++ b/openwire-dotnet/src/ActiveMQ/Transport/LoggingTransport.cs @@ -0,0 +1,50 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using ActiveMQ.Commands; +using ActiveMQ.Transport; +using JMS; +using System; + +/// +/// A Transport filter that is used to log the commands sent and received. +/// +namespace ActiveMQ.Transport +{ + public class LoggingTransport : TransportFilter + { + public LoggingTransport(ITransport next) : base(next) { + } + + protected override void OnCommand(ITransport sender, Command command) { + Console.WriteLine("RECEIVED: " + command); + this.command(sender, command); + } + + protected override void OnException(ITransport sender, Exception error) { + Console.WriteLine("RECEIVED Exception: " + error); + this.exception(sender, error); + } + + public override void Oneway(Command command) + { + Console.WriteLine("SENDING: " + command); + this.next.Oneway(command); + } + + } +} + diff --git a/openwire-dotnet/src/ActiveMQ/Transport/MutexTransport.cs b/openwire-dotnet/src/ActiveMQ/Transport/MutexTransport.cs new file mode 100644 index 0000000000..f4d02cd85b --- /dev/null +++ b/openwire-dotnet/src/ActiveMQ/Transport/MutexTransport.cs @@ -0,0 +1,70 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using ActiveMQ.Commands; +using ActiveMQ.Transport; +using JMS; +using System; + +/// +/// A Transport which gaurds access to the next transport using a mutex. +/// +namespace ActiveMQ.Transport +{ + public class MutexTransport : TransportFilter + { + + private readonly object transmissionLock = new object(); + + public MutexTransport(ITransport next) : base(next) { + } + + + public override void Oneway(Command command) + { + lock (transmissionLock) + { + this.next.Oneway(command); + } + } + + public override FutureResponse AsyncRequest(Command command) + { + lock (transmissionLock) + { + return base.AsyncRequest(command); + } + } + + public override Response Request(Command command) + { + lock (transmissionLock) + { + return base.Request(command); + } + } + + public override void Dispose() + { + lock (transmissionLock) + { + base.Dispose(); + } + } + + } +} + diff --git a/openwire-dotnet/src/ActiveMQ/Transport/ResponseCorrelator.cs b/openwire-dotnet/src/ActiveMQ/Transport/ResponseCorrelator.cs new file mode 100644 index 0000000000..c7e3c6d787 --- /dev/null +++ b/openwire-dotnet/src/ActiveMQ/Transport/ResponseCorrelator.cs @@ -0,0 +1,104 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections; + +using ActiveMQ.Commands; +using ActiveMQ.Transport; +using JMS; + +/// +/// A Transport which gaurds access to the next transport using a mutex. +/// +namespace ActiveMQ.Transport +{ + public class ResponseCorrelator : TransportFilter + { + + private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable()); + private readonly Object mutex = new Object(); + private short nextCommandId; + + public ResponseCorrelator(ITransport next) : base(next) { + } + + short GetNextCommandId() { + lock(mutex) { + return ++nextCommandId; + } + } + + public override void Oneway(Command command) + { + command.CommandId = GetNextCommandId(); + command.ResponseRequired = false; + next.Oneway(command); + } + + public override FutureResponse AsyncRequest(Command command) + { + command.CommandId = GetNextCommandId(); + command.ResponseRequired = true; + FutureResponse future = new FutureResponse(); + requestMap[command.CommandId] = future; + next.Oneway(command); + return future; + + } + + public override Response Request(Command command) + { + FutureResponse future = AsyncRequest(command); + Response response = future.Response; + if (response is ExceptionResponse) + { + ExceptionResponse er = (ExceptionResponse) response; + BrokerError brokerError = er.Exception; + throw new BrokerException(brokerError); + } + return response; + } + + protected override void OnCommand(ITransport sender, Command command) + { + if( command is Response ) { + + Response response = (Response) command; + FutureResponse future = (FutureResponse) requestMap[response.CorrelationId]; + if (future != null) + { + if (response is ExceptionResponse) + { + ExceptionResponse er = (ExceptionResponse) response; + BrokerError brokerError = er.Exception; + this.exception(this, new BrokerException(brokerError)); + } + future.Response = response; + } + else + { + Console.WriteLine("ERROR: Unknown response ID: " + response.CommandId + " for response: " + response); + } + } else { + this.command(sender, command); + } + } + + } +} + diff --git a/openwire-dotnet/src/ActiveMQ/Transport/SocketTransport.cs b/openwire-dotnet/src/ActiveMQ/Transport/SocketTransport.cs deleted file mode 100755 index a2c8b66ce9..0000000000 --- a/openwire-dotnet/src/ActiveMQ/Transport/SocketTransport.cs +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Copyright 2006 The Apache Software Foundation or its licensors, as - * applicable. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using ActiveMQ.Commands; -using ActiveMQ.OpenWire; -using ActiveMQ.Transport; -using System; -using System.Collections; -using System.IO; -using System.Net; -using System.Net.Sockets; -using System.Threading; - - - -/// -/// An implementation of ITransport that uses sockets to communicate with the broker -/// -namespace ActiveMQ.Transport -{ - public class SocketTransport : ITransport - { - private readonly object transmissionLock = new object(); - private Socket socket; - private OpenWireFormat wireformat = new OpenWireFormat(); - private BinaryReader socketReader; - private BinaryWriter socketWriter; - private Thread readThread; - private bool closed; - private IDictionary requestMap = new Hashtable(); // TODO threadsafe - private short nextCommandId; - private bool started; - - public event CommandHandler Command; - public event ExceptionHandler Exception; - - - - public SocketTransport(string host, int port) - { - //Console.WriteLine("Opening socket to: " + host + " on port: " + port); - socket = Connect(host, port); - } - - /// - /// Method Start - /// - public void Start() - { - if (!started) - { - started = true; - - NetworkStream networkStream = new NetworkStream(socket); - socketWriter = new BinaryWriter(networkStream); - socketReader = new BinaryReader(networkStream); - - // now lets create the background read thread - readThread = new Thread(new ThreadStart(ReadLoop)); - readThread.Start(); - - // lets send the wireformat we're using - Oneway(wireformat.WireFormatInfo); - } - } - - - public void Oneway(Command command) - { - command.CommandId = GetNextCommandId(); - command.ResponseRequired = false; - Send(command); - } - - public FutureResponse AsyncRequest(Command command) - { - command.CommandId = GetNextCommandId(); - command.ResponseRequired = true; - Send(command); - FutureResponse future = new FutureResponse(); - requestMap[command.CommandId] = future; - return future; - } - - public Response Request(Command command) - { - FutureResponse future = AsyncRequest(command); - Response response = future.Response; - if (response is ExceptionResponse) - { - ExceptionResponse er = (ExceptionResponse) response; - BrokerError brokerError = er.Exception; - throw new BrokerException(brokerError); - } - - return response; - } - - public void Dispose() - { - lock (transmissionLock) - { - socket.Close(); - closed = true; - } - socketWriter.Close(); - socketReader.Close(); - } - - public void ReadLoop() - { - while (!closed) - { - Command command = null; - try - { - command = (Command) wireformat.Unmarshal(socketReader); - } - catch (EndOfStreamException) - { - // stream closed - break; - } - catch (ObjectDisposedException) - { - // stream closed - break; - } - catch (IOException) - { - // error, assume closing - break; - } - if (command is Response) - { - Response response = (Response) command; - FutureResponse future = (FutureResponse) requestMap[response.CorrelationId]; - if (future != null) - { - if (response is ExceptionResponse) - { - ExceptionResponse er = (ExceptionResponse) response; - BrokerError brokerError = er.Exception; - if (this.Exception != null) - { - this.Exception(this, new BrokerException(brokerError)); - } - } - future.Response = response; - } - else - { - Console.WriteLine("ERROR: Unknown response ID: " + response.CommandId + " for response: " + response); - } - } - else - { - if (this.Command != null) - { - this.Command(this, command); - } - else - { - Console.WriteLine("ERROR: No handler available to process command: " + command); - } - } - } - } - - - // Implementation methods - - protected void Send(Command command) - { - lock (transmissionLock) - { - //Console.WriteLine("Sending command: " + command + " with ID: " + command.CommandId + " response: " + command.ResponseRequired); - - wireformat.Marshal(command, socketWriter); - socketWriter.Flush(); - } - } - - protected short GetNextCommandId() - { - lock (transmissionLock) - { - return++nextCommandId; - } - } - - protected Socket Connect(string host, int port) - { - // Looping through the AddressList allows different type of connections to be tried - // (IPv4, IPv6 and whatever else may be available). - IPHostEntry hostEntry = Dns.Resolve(host); - foreach (IPAddress address in hostEntry.AddressList) - { - Socket socket = new Socket( - address.AddressFamily, - SocketType.Stream, - ProtocolType.Tcp); - socket.Connect(new IPEndPoint(address, port)); - if (socket.Connected) - { - return socket; - } - } - throw new SocketException(); - } - } -} - - diff --git a/openwire-dotnet/src/ActiveMQ/Transport/Tcp/TcpTransport.cs b/openwire-dotnet/src/ActiveMQ/Transport/Tcp/TcpTransport.cs new file mode 100644 index 0000000000..c79301548c --- /dev/null +++ b/openwire-dotnet/src/ActiveMQ/Transport/Tcp/TcpTransport.cs @@ -0,0 +1,145 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using ActiveMQ; +using ActiveMQ.Commands; +using ActiveMQ.OpenWire; +using ActiveMQ.Transport; +using System; +using System.Collections; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Threading; + + + +/// +/// An implementation of ITransport that uses sockets to communicate with the broker +/// + +namespace ActiveMQ.Transport.Tcp +{ + public class TcpTransport : ITransport + { + private Socket socket; + private OpenWireFormat wireformat = new OpenWireFormat(); + private BinaryReader socketReader; + private BinaryWriter socketWriter; + private Thread readThread; + private bool started; + volatile private bool closed; + + public CommandHandler command; + public ExceptionHandler exception; + + public TcpTransport(Socket socket) + { + this.socket = socket; + } + + /// + /// Method Start + /// + public void Start() + { + if (!started) + { + if( command == null ) + throw new InvalidOperationException ("command cannot be null when Start is called."); + if( exception == null ) + throw new InvalidOperationException ("exception cannot be null when Start is called."); + + started = true; + + NetworkStream networkStream = new NetworkStream(socket); + socketWriter = new BinaryWriter(networkStream); + socketReader = new BinaryReader(networkStream); + + // now lets create the background read thread + readThread = new Thread(new ThreadStart(ReadLoop)); + readThread.Start(); + + // lets send the wireformat we're using + Oneway(wireformat.WireFormatInfo); + } + } + + public void Oneway(Command command) + { + wireformat.Marshal(command, socketWriter); + socketWriter.Flush(); + } + + public FutureResponse AsyncRequest(Command command) + { + throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest calls"); + } + + public Response Request(Command command) + { + throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls"); + } + + public void Dispose() + { + closed = true; + socket.Close(); + readThread.Join(); + socketWriter.Close(); + socketReader.Close(); + } + + public void ReadLoop() + { + while (!closed) + { + try + { + Command command = (Command) wireformat.Unmarshal(socketReader); + this.command(this, command); + } + catch (ObjectDisposedException) + { + break; + } + catch (Exception e) + { + this.exception(this,e); + } + } + } + + + + + // Implementation methods + + public CommandHandler Command { + get { return command; } + set { this.command = value; } + } + + public ExceptionHandler Exception { + get { return exception; } + set { this.exception = value; } + } + + } +} + + + diff --git a/openwire-dotnet/src/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs b/openwire-dotnet/src/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs new file mode 100644 index 0000000000..082962a9e6 --- /dev/null +++ b/openwire-dotnet/src/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs @@ -0,0 +1,62 @@ +/* +* Copyright 2006 The Apache Software Foundation or its licensors, as +* applicable. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Net; +using System.Net.Sockets; +using ActiveMQ.Transport; + +namespace ActiveMQ.Transport.Tcp +{ + public class TcpTransportFactory : ITransportFactory + { + public ITransport CreateTransport(Uri location) { + + // Console.WriteLine("Opening socket to: " + host + " on port: " + port); + Socket socket = Connect(location.Host, location.Port); + ITransport rc = new TcpTransport(socket); + // TODO: use URI query string to enable the LoggingTransport + // rc = new LoggingTransport(rc); + rc = new ResponseCorrelator(rc); + rc = new MutexTransport(rc); + return rc; + + } + + protected Socket Connect(string host, int port) + { + // Looping through the AddressList allows different type of connections to be tried + // (IPv4, IPv6 and whatever else may be available). + IPHostEntry hostEntry = Dns.Resolve(host); + foreach (IPAddress address in hostEntry.AddressList) + { + Socket socket = new Socket( + address.AddressFamily, + SocketType.Stream, + ProtocolType.Tcp); + socket.Connect(new IPEndPoint(address, port)); + if (socket.Connected) + { + return socket; + } + } + throw new SocketException(); + } + + } + +} diff --git a/openwire-dotnet/src/ActiveMQ/Transport/TransportFilter.cs b/openwire-dotnet/src/ActiveMQ/Transport/TransportFilter.cs new file mode 100644 index 0000000000..dbc8b6482b --- /dev/null +++ b/openwire-dotnet/src/ActiveMQ/Transport/TransportFilter.cs @@ -0,0 +1,109 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using ActiveMQ.Commands; +using ActiveMQ.Transport; +using JMS; +using System; + +/// +/// Used to implement a filter on the transport layer. +/// +namespace ActiveMQ.Transport +{ + public class TransportFilter : ITransport + { + protected readonly ITransport next; + protected CommandHandler command; + protected ExceptionHandler exception; + + public TransportFilter(ITransport next) { + this.next = next; + this.next.Command = new CommandHandler(OnCommand); + this.next.Exception = new ExceptionHandler(OnException); + } + + protected virtual void OnCommand(ITransport sender, Command command) { + this.command(sender, command); + } + + protected virtual void OnException(ITransport sender, Exception command) { + this.exception(sender, command); + } + + + /// + /// Method Oneway + /// + /// A Command + public virtual void Oneway(Command command) + { + this.next.Oneway(command); + } + + /// + /// Method AsyncRequest + /// + /// A FutureResponse + /// A Command + public virtual FutureResponse AsyncRequest(Command command) + { + return this.next.AsyncRequest(command); + } + + /// + /// Method Request + /// + /// A Response + /// A Command + public virtual Response Request(Command command) + { + return this.next.Request(command); + } + + /// + /// Method Start + /// + public virtual void Start() + { + if( command == null ) + throw new InvalidOperationException ("command cannot be null when Start is called."); + if( exception == null ) + throw new InvalidOperationException ("exception cannot be null when Start is called."); + this.next.Start(); + } + + /// + /// Method Dispose + /// + public virtual void Dispose() + { + this.next.Dispose(); + } + + public CommandHandler Command { + get { return command; } + set { this.command = value; } + } + + public ExceptionHandler Exception { + get { return exception; } + set { this.exception = value; } + } + + } +} + diff --git a/openwire-dotnet/src/JMS/JMSConnectionException.cs b/openwire-dotnet/src/JMS/JMSConnectionException.cs new file mode 100644 index 0000000000..99769225fb --- /dev/null +++ b/openwire-dotnet/src/JMS/JMSConnectionException.cs @@ -0,0 +1,32 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; + + +/// +/// Represents a connection failure. +/// +namespace JMS +{ + public class ConnectionException : JMSException + { + public JMSException(string message) : base(message) + { + } + } +} + diff --git a/openwire-dotnet/src/JMS/JMSException.cs b/openwire-dotnet/src/JMS/JMSException.cs new file mode 100644 index 0000000000..1bf725acd6 --- /dev/null +++ b/openwire-dotnet/src/JMS/JMSException.cs @@ -0,0 +1,32 @@ +/* + * Copyright 2006 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; + + +/// +/// Represents a JMS exception +/// +namespace JMS +{ + public class JMSException : Exception + { + public JMSException(string message) : base(message) + { + } + } +} + diff --git a/openwire-dotnet/src/src.csproj b/openwire-dotnet/src/src.csproj index bdd3b9a3e0..3fe9200bd6 100644 --- a/openwire-dotnet/src/src.csproj +++ b/openwire-dotnet/src/src.csproj @@ -164,7 +164,13 @@ - + + + + + + + diff --git a/openwire-dotnet/tests/ActiveMQ/TestMain.cs b/openwire-dotnet/tests/ActiveMQ/TestMain.cs index 8d645ad830..8829a2566f 100644 --- a/openwire-dotnet/tests/ActiveMQ/TestMain.cs +++ b/openwire-dotnet/tests/ActiveMQ/TestMain.cs @@ -29,7 +29,7 @@ namespace ActiveMQ Console.WriteLine("About to connect to ActiveMQ"); // START SNIPPET: demo - IConnectionFactory factory = new ConnectionFactory("localhost", 61616); + IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616")); using (IConnection connection = factory.CreateConnection()) { Console.WriteLine("Created a connection!"); diff --git a/openwire-dotnet/tests/JMS/JMSTestSupport.cs b/openwire-dotnet/tests/JMS/JMSTestSupport.cs index 8f011319d1..82d8f76692 100644 --- a/openwire-dotnet/tests/JMS/JMSTestSupport.cs +++ b/openwire-dotnet/tests/JMS/JMSTestSupport.cs @@ -113,7 +113,7 @@ namespace JMS } protected virtual IConnectionFactory CreateConnectionFactory() { - return new ActiveMQ.ConnectionFactory("localhost", 61616); + return new ActiveMQ.ConnectionFactory(new Uri("tcp://localhost:61616")); } protected virtual IConnection CreateConnection()