Implemented the filtered layer concept similar to what we have with the Java implemenation.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383309 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-05 12:05:53 +00:00
parent 494e5bdd2a
commit 8f8bfee0be
16 changed files with 668 additions and 265 deletions

View File

@ -29,7 +29,8 @@ namespace ActiveMQ
{
this.transport = transport;
this.info = info;
this.transport.Command += new CommandHandler(OnCommand);
this.transport.Command = new CommandHandler(OnCommand);
this.transport.Exception = new ExceptionHandler(OnException);
this.transport.Start();
}
@ -255,9 +256,14 @@ namespace ActiveMQ
}
else
{
Console.WriteLine("ERROR:ÊUnknown command: " + command);
Console.WriteLine("ERROR: Unknown command: " + command);
}
}
protected void OnException(ITransport sender, Exception exception) {
Console.WriteLine("ERROR: Transport Exception: " + exception);
}
protected SessionInfo CreateSessionInfo(AcknowledgementMode acknowledgementMode)
{

View File

@ -16,6 +16,7 @@
*/
using ActiveMQ.Commands;
using ActiveMQ.Transport;
using ActiveMQ.Transport.Tcp;
using JMS;
using System;
@ -26,8 +27,7 @@ namespace ActiveMQ
/// </summary>
public class ConnectionFactory : IConnectionFactory
{
private string host = "localhost";
private int port = 61616;
private Uri brokerUri = new Uri("tcp://localhost:61616");
private string userName;
private string password;
private string clientId;
@ -36,10 +36,9 @@ namespace ActiveMQ
{
}
public ConnectionFactory(string host, int port)
public ConnectionFactory(Uri brokerUri)
{
this.host = host;
this.port = port;
this.brokerUri=brokerUri;
}
public IConnection CreateConnection()
@ -50,7 +49,7 @@ namespace ActiveMQ
public IConnection CreateConnection(string userName, string password)
{
ConnectionInfo info = CreateConnectionInfo(userName, password);
ITransport transport = CreateTransport();
ITransport transport = new TcpTransportFactory().CreateTransport(brokerUri);
Connection connection = new Connection(transport, info);
connection.ClientId = info.ClientId;
return connection;
@ -58,18 +57,12 @@ namespace ActiveMQ
// Properties
public string Host
public Uri BrokerUri
{
get { return host; }
set { host = value; }
get { return brokerUri; }
set { brokerUri = value; }
}
public int Port
{
get { return port; }
set { port = value; }
}
public string UserName
{
get { return userName; }
@ -112,9 +105,5 @@ namespace ActiveMQ
return Guid.NewGuid().ToString();
}
protected ITransport CreateTransport()
{
return new SocketTransport(host, port);
}
}
}

View File

@ -19,22 +19,14 @@ using ActiveMQ.Transport;
using JMS;
using System;
namespace ActiveMQ.Transport
{
public delegate void CommandHandler(ITransport sender, Command command);
}
namespace ActiveMQ.Transport
{
public delegate void ExceptionHandler(ITransport sender, Exception command);
}
/// <summary>
/// Represents the logical networking transport layer.
/// </summary>
namespace ActiveMQ.Transport
{
public delegate void CommandHandler(ITransport sender, Command command);
public delegate void ExceptionHandler(ITransport sender, Exception command);
public interface ITransport : IStartable, IDisposable
{
void Oneway(Command command);
@ -43,8 +35,15 @@ namespace ActiveMQ.Transport
Response Request(Command command);
event CommandHandler Command;
event ExceptionHandler Exception;
CommandHandler Command{
get;
set;
}
ExceptionHandler Exception{
get;
set;
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
namespace ActiveMQ.Transport
{
public interface ITransportFactory
{
ITransport CreateTransport(Uri location);
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using ActiveMQ.Commands;
using ActiveMQ.Transport;
using JMS;
using System;
/// <summary>
/// A Transport filter that is used to log the commands sent and received.
/// </summary>
namespace ActiveMQ.Transport
{
public class LoggingTransport : TransportFilter
{
public LoggingTransport(ITransport next) : base(next) {
}
protected override void OnCommand(ITransport sender, Command command) {
Console.WriteLine("RECEIVED: " + command);
this.command(sender, command);
}
protected override void OnException(ITransport sender, Exception error) {
Console.WriteLine("RECEIVED Exception: " + error);
this.exception(sender, error);
}
public override void Oneway(Command command)
{
Console.WriteLine("SENDING: " + command);
this.next.Oneway(command);
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using ActiveMQ.Commands;
using ActiveMQ.Transport;
using JMS;
using System;
/// <summary>
/// A Transport which gaurds access to the next transport using a mutex.
/// </summary>
namespace ActiveMQ.Transport
{
public class MutexTransport : TransportFilter
{
private readonly object transmissionLock = new object();
public MutexTransport(ITransport next) : base(next) {
}
public override void Oneway(Command command)
{
lock (transmissionLock)
{
this.next.Oneway(command);
}
}
public override FutureResponse AsyncRequest(Command command)
{
lock (transmissionLock)
{
return base.AsyncRequest(command);
}
}
public override Response Request(Command command)
{
lock (transmissionLock)
{
return base.Request(command);
}
}
public override void Dispose()
{
lock (transmissionLock)
{
base.Dispose();
}
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using ActiveMQ.Commands;
using ActiveMQ.Transport;
using JMS;
/// <summary>
/// A Transport which gaurds access to the next transport using a mutex.
/// </summary>
namespace ActiveMQ.Transport
{
public class ResponseCorrelator : TransportFilter
{
private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
private readonly Object mutex = new Object();
private short nextCommandId;
public ResponseCorrelator(ITransport next) : base(next) {
}
short GetNextCommandId() {
lock(mutex) {
return ++nextCommandId;
}
}
public override void Oneway(Command command)
{
command.CommandId = GetNextCommandId();
command.ResponseRequired = false;
next.Oneway(command);
}
public override FutureResponse AsyncRequest(Command command)
{
command.CommandId = GetNextCommandId();
command.ResponseRequired = true;
FutureResponse future = new FutureResponse();
requestMap[command.CommandId] = future;
next.Oneway(command);
return future;
}
public override Response Request(Command command)
{
FutureResponse future = AsyncRequest(command);
Response response = future.Response;
if (response is ExceptionResponse)
{
ExceptionResponse er = (ExceptionResponse) response;
BrokerError brokerError = er.Exception;
throw new BrokerException(brokerError);
}
return response;
}
protected override void OnCommand(ITransport sender, Command command)
{
if( command is Response ) {
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap[response.CorrelationId];
if (future != null)
{
if (response is ExceptionResponse)
{
ExceptionResponse er = (ExceptionResponse) response;
BrokerError brokerError = er.Exception;
this.exception(this, new BrokerException(brokerError));
}
future.Response = response;
}
else
{
Console.WriteLine("ERROR: Unknown response ID: " + response.CommandId + " for response: " + response);
}
} else {
this.command(sender, command);
}
}
}
}

View File

@ -1,227 +0,0 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using ActiveMQ.Commands;
using ActiveMQ.OpenWire;
using ActiveMQ.Transport;
using System;
using System.Collections;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
/// <summary>
/// An implementation of ITransport that uses sockets to communicate with the broker
/// </summary>
namespace ActiveMQ.Transport
{
public class SocketTransport : ITransport
{
private readonly object transmissionLock = new object();
private Socket socket;
private OpenWireFormat wireformat = new OpenWireFormat();
private BinaryReader socketReader;
private BinaryWriter socketWriter;
private Thread readThread;
private bool closed;
private IDictionary requestMap = new Hashtable(); // TODO threadsafe
private short nextCommandId;
private bool started;
public event CommandHandler Command;
public event ExceptionHandler Exception;
public SocketTransport(string host, int port)
{
//Console.WriteLine("Opening socket to: " + host + " on port: " + port);
socket = Connect(host, port);
}
/// <summary>
/// Method Start
/// </summary>
public void Start()
{
if (!started)
{
started = true;
NetworkStream networkStream = new NetworkStream(socket);
socketWriter = new BinaryWriter(networkStream);
socketReader = new BinaryReader(networkStream);
// now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop));
readThread.Start();
// lets send the wireformat we're using
Oneway(wireformat.WireFormatInfo);
}
}
public void Oneway(Command command)
{
command.CommandId = GetNextCommandId();
command.ResponseRequired = false;
Send(command);
}
public FutureResponse AsyncRequest(Command command)
{
command.CommandId = GetNextCommandId();
command.ResponseRequired = true;
Send(command);
FutureResponse future = new FutureResponse();
requestMap[command.CommandId] = future;
return future;
}
public Response Request(Command command)
{
FutureResponse future = AsyncRequest(command);
Response response = future.Response;
if (response is ExceptionResponse)
{
ExceptionResponse er = (ExceptionResponse) response;
BrokerError brokerError = er.Exception;
throw new BrokerException(brokerError);
}
return response;
}
public void Dispose()
{
lock (transmissionLock)
{
socket.Close();
closed = true;
}
socketWriter.Close();
socketReader.Close();
}
public void ReadLoop()
{
while (!closed)
{
Command command = null;
try
{
command = (Command) wireformat.Unmarshal(socketReader);
}
catch (EndOfStreamException)
{
// stream closed
break;
}
catch (ObjectDisposedException)
{
// stream closed
break;
}
catch (IOException)
{
// error, assume closing
break;
}
if (command is Response)
{
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap[response.CorrelationId];
if (future != null)
{
if (response is ExceptionResponse)
{
ExceptionResponse er = (ExceptionResponse) response;
BrokerError brokerError = er.Exception;
if (this.Exception != null)
{
this.Exception(this, new BrokerException(brokerError));
}
}
future.Response = response;
}
else
{
Console.WriteLine("ERROR: Unknown response ID: " + response.CommandId + " for response: " + response);
}
}
else
{
if (this.Command != null)
{
this.Command(this, command);
}
else
{
Console.WriteLine("ERROR: No handler available to process command: " + command);
}
}
}
}
// Implementation methods
protected void Send(Command command)
{
lock (transmissionLock)
{
//Console.WriteLine("Sending command: " + command + " with ID: " + command.CommandId + " response: " + command.ResponseRequired);
wireformat.Marshal(command, socketWriter);
socketWriter.Flush();
}
}
protected short GetNextCommandId()
{
lock (transmissionLock)
{
return++nextCommandId;
}
}
protected Socket Connect(string host, int port)
{
// Looping through the AddressList allows different type of connections to be tried
// (IPv4, IPv6 and whatever else may be available).
IPHostEntry hostEntry = Dns.Resolve(host);
foreach (IPAddress address in hostEntry.AddressList)
{
Socket socket = new Socket(
address.AddressFamily,
SocketType.Stream,
ProtocolType.Tcp);
socket.Connect(new IPEndPoint(address, port));
if (socket.Connected)
{
return socket;
}
}
throw new SocketException();
}
}
}

View File

@ -0,0 +1,145 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using ActiveMQ;
using ActiveMQ.Commands;
using ActiveMQ.OpenWire;
using ActiveMQ.Transport;
using System;
using System.Collections;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
/// <summary>
/// An implementation of ITransport that uses sockets to communicate with the broker
/// </summary>
namespace ActiveMQ.Transport.Tcp
{
public class TcpTransport : ITransport
{
private Socket socket;
private OpenWireFormat wireformat = new OpenWireFormat();
private BinaryReader socketReader;
private BinaryWriter socketWriter;
private Thread readThread;
private bool started;
volatile private bool closed;
public CommandHandler command;
public ExceptionHandler exception;
public TcpTransport(Socket socket)
{
this.socket = socket;
}
/// <summary>
/// Method Start
/// </summary>
public void Start()
{
if (!started)
{
if( command == null )
throw new InvalidOperationException ("command cannot be null when Start is called.");
if( exception == null )
throw new InvalidOperationException ("exception cannot be null when Start is called.");
started = true;
NetworkStream networkStream = new NetworkStream(socket);
socketWriter = new BinaryWriter(networkStream);
socketReader = new BinaryReader(networkStream);
// now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop));
readThread.Start();
// lets send the wireformat we're using
Oneway(wireformat.WireFormatInfo);
}
}
public void Oneway(Command command)
{
wireformat.Marshal(command, socketWriter);
socketWriter.Flush();
}
public FutureResponse AsyncRequest(Command command)
{
throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest calls");
}
public Response Request(Command command)
{
throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
}
public void Dispose()
{
closed = true;
socket.Close();
readThread.Join();
socketWriter.Close();
socketReader.Close();
}
public void ReadLoop()
{
while (!closed)
{
try
{
Command command = (Command) wireformat.Unmarshal(socketReader);
this.command(this, command);
}
catch (ObjectDisposedException)
{
break;
}
catch (Exception e)
{
this.exception(this,e);
}
}
}
// Implementation methods
public CommandHandler Command {
get { return command; }
set { this.command = value; }
}
public ExceptionHandler Exception {
get { return exception; }
set { this.exception = value; }
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Net;
using System.Net.Sockets;
using ActiveMQ.Transport;
namespace ActiveMQ.Transport.Tcp
{
public class TcpTransportFactory : ITransportFactory
{
public ITransport CreateTransport(Uri location) {
// Console.WriteLine("Opening socket to: " + host + " on port: " + port);
Socket socket = Connect(location.Host, location.Port);
ITransport rc = new TcpTransport(socket);
// TODO: use URI query string to enable the LoggingTransport
// rc = new LoggingTransport(rc);
rc = new ResponseCorrelator(rc);
rc = new MutexTransport(rc);
return rc;
}
protected Socket Connect(string host, int port)
{
// Looping through the AddressList allows different type of connections to be tried
// (IPv4, IPv6 and whatever else may be available).
IPHostEntry hostEntry = Dns.Resolve(host);
foreach (IPAddress address in hostEntry.AddressList)
{
Socket socket = new Socket(
address.AddressFamily,
SocketType.Stream,
ProtocolType.Tcp);
socket.Connect(new IPEndPoint(address, port));
if (socket.Connected)
{
return socket;
}
}
throw new SocketException();
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using ActiveMQ.Commands;
using ActiveMQ.Transport;
using JMS;
using System;
/// <summary>
/// Used to implement a filter on the transport layer.
/// </summary>
namespace ActiveMQ.Transport
{
public class TransportFilter : ITransport
{
protected readonly ITransport next;
protected CommandHandler command;
protected ExceptionHandler exception;
public TransportFilter(ITransport next) {
this.next = next;
this.next.Command = new CommandHandler(OnCommand);
this.next.Exception = new ExceptionHandler(OnException);
}
protected virtual void OnCommand(ITransport sender, Command command) {
this.command(sender, command);
}
protected virtual void OnException(ITransport sender, Exception command) {
this.exception(sender, command);
}
/// <summary>
/// Method Oneway
/// </summary>
/// <param name="command">A Command</param>
public virtual void Oneway(Command command)
{
this.next.Oneway(command);
}
/// <summary>
/// Method AsyncRequest
/// </summary>
/// <returns>A FutureResponse</returns>
/// <param name="command">A Command</param>
public virtual FutureResponse AsyncRequest(Command command)
{
return this.next.AsyncRequest(command);
}
/// <summary>
/// Method Request
/// </summary>
/// <returns>A Response</returns>
/// <param name="command">A Command</param>
public virtual Response Request(Command command)
{
return this.next.Request(command);
}
/// <summary>
/// Method Start
/// </summary>
public virtual void Start()
{
if( command == null )
throw new InvalidOperationException ("command cannot be null when Start is called.");
if( exception == null )
throw new InvalidOperationException ("exception cannot be null when Start is called.");
this.next.Start();
}
/// <summary>
/// Method Dispose
/// </summary>
public virtual void Dispose()
{
this.next.Dispose();
}
public CommandHandler Command {
get { return command; }
set { this.command = value; }
}
public ExceptionHandler Exception {
get { return exception; }
set { this.exception = value; }
}
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
/// <summary>
/// Represents a connection failure.
/// </summary>
namespace JMS
{
public class ConnectionException : JMSException
{
public JMSException(string message) : base(message)
{
}
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
/// <summary>
/// Represents a JMS exception
/// </summary>
namespace JMS
{
public class JMSException : Exception
{
public JMSException(string message) : base(message)
{
}
}
}

View File

@ -164,7 +164,13 @@
<Compile Include="ActiveMQ\TransactionContext.cs"/>
<Compile Include="ActiveMQ\Transport\FutureResponse.cs"/>
<Compile Include="ActiveMQ\Transport\ITransport.cs"/>
<Compile Include="ActiveMQ\Transport\SocketTransport.cs"/>
<Compile Include="ActiveMQ\Transport\ITransportFactory.cs"/>
<Compile Include="ActiveMQ\Transport\LoggingTransport.cs"/>
<Compile Include="ActiveMQ\Transport\MutexTransport.cs"/>
<Compile Include="ActiveMQ\Transport\ResponseCorrelator.cs"/>
<Compile Include="ActiveMQ\Transport\Tcp\TcpTransport.cs"/>
<Compile Include="ActiveMQ\Transport\Tcp\TcpTransportFactory.cs"/>
<Compile Include="ActiveMQ\Transport\TransportFilter.cs"/>
<Compile Include="JMS\IBytesMessage.cs"/>
<Compile Include="JMS\IConnection.cs"/>
<Compile Include="JMS\IConnectionFactory.cs"/>

View File

@ -29,7 +29,7 @@ namespace ActiveMQ
Console.WriteLine("About to connect to ActiveMQ");
// START SNIPPET: demo
IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:61616"));
using (IConnection connection = factory.CreateConnection())
{
Console.WriteLine("Created a connection!");

View File

@ -113,7 +113,7 @@ namespace JMS
}
protected virtual IConnectionFactory CreateConnectionFactory() {
return new ActiveMQ.ConnectionFactory("localhost", 61616);
return new ActiveMQ.ConnectionFactory(new Uri("tcp://localhost:61616"));
}
protected virtual IConnection CreateConnection()