added a rough implementation of the ITransport interface; needs more work on the concurrent processing side of things though

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@368086 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-01-11 18:58:57 +00:00
parent af9f61015d
commit f62ff2c1bf
8 changed files with 173 additions and 9 deletions

View File

@ -47,6 +47,7 @@ namespace OpenWire.Client {
session.Dispose();
}
sessions.Clear();
transport.Dispose();
closed = true;
}

View File

@ -78,8 +78,7 @@ namespace OpenWire.Client {
}
protected ITransport CreateITransport() {
// TODO
return null;
return new SocketTransport(host, port);
}
}
}

View File

@ -6,5 +6,17 @@ namespace OpenWire.Client.Core {
/// An OpenWire command
/// </summary>
public interface Command : DataStructure {
/* TODO
short CommandId {
get;
set;
}
bool ResponseRequired {
get;
set;
}
*/
}
}

View File

@ -32,12 +32,18 @@ namespace OpenWire.Client.Core {
}
public Response Response {
get { return response; }
get {
// TODO use the proper .Net version of notify/wait()
while (response == null) {
Thread.Sleep(100);
}
return response;
}
set {
asyncWaitHandle.WaitOne();
response = value;
isCompleted = true;
asyncWaitHandle.ReleaseMutex();
asyncWaitHandle.ReleaseMutex();
}
}
}

View File

@ -12,7 +12,7 @@ namespace OpenWire.Client.Core {
/// <summary>
/// Represents the logical networking transport layer.
/// </summary>
public interface ITransport {
public interface ITransport : IDisposable {
void Oneway(Command command);
FutureResponse AsyncRequest(Command command);

View File

@ -0,0 +1,144 @@
using System;
using System.Collections;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
using OpenWire.Client.IO;
namespace OpenWire.Client.Core {
/// <summary>
/// An implementation of ITransport that uses sockets to communicate with the broker
/// </summary>
public class SocketTransport : ITransport {
private readonly object transmissionLock = new object();
private readonly Socket socket;
private readonly BinaryReader socketReader;
private readonly BinaryWriter socketWriter;
private readonly Thread readThread;
private bool closed;
private IDictionary requestMap = new Hashtable(); // TODO threadsafe
private short nextCommandId;
public event CommandHandler Command;
public event ExceptionHandler Exception;
public SocketTransport(string host, int port) {
Console.WriteLine("Opening socket to: " + host + " on port: " + port);
socket = Connect(host, port);
socketWriter = new BinaryWriter(new NetworkStream(socket));
socketReader = new BinaryReader(new NetworkStream(socket));
// now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop));
readThread.Start();
}
public void Oneway(Command command) {
BaseCommand baseCommand = (BaseCommand) command;
baseCommand.CommandId = GetNextCommandId();
baseCommand.ResponseRequired = false;
Send(command);
}
public FutureResponse AsyncRequest(Command command) {
BaseCommand baseCommand = (BaseCommand) command;
baseCommand.CommandId = GetNextCommandId();
baseCommand.ResponseRequired = true;
Send(command);
FutureResponse future = new FutureResponse();
requestMap[baseCommand.CommandId] = future;
return future;
}
public Response Request(Command command) {
FutureResponse response = AsyncRequest(command);
return response.Response;
}
public void Dispose() {
Console.WriteLine("Closing the socket");
lock (transmissionLock) {
socket.Close();
closed = true;
}
socketWriter.Close();
socketReader.Close();
}
public void ReadLoop() {
Console.WriteLine("Starting to read commands from ActiveMQ");
while (!closed) {
BaseCommand command = null;
try {
command = (BaseCommand) CommandMarshallerRegistry.ReadCommand(socketReader);
} catch (ObjectDisposedException e) {
// stream closed
break;
}
if (command is Response) {
Console.WriteLine("Received response!: " + command);
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap[response.CommandId];
if (future != null) {
if (response is ExceptionResponse) {
ExceptionResponse er = (ExceptionResponse) response;
if (this.Exception != null) {
Exception e = new BrokerException(er.Exception);
this.Exception(this, e);
}
} else {
future.Response = response;
}
} else {
Console.WriteLine("Unknown response ID: " + response.CommandId);
}
} else {
if (this.Command != null) {
this.Command(this, command);
}
}
}
}
// Implementation methods
protected void Send(Command command) {
lock (transmissionLock) {
CommandMarshallerRegistry.WriteCommand(command, socketWriter);
socketWriter.Flush();
}
}
protected short GetNextCommandId() {
lock (transmissionLock) {
return++nextCommandId;
}
}
protected Socket Connect(string host, int port) {
// Looping through the AddressList allows different type of connections to be tried
// (IPv4, IPv6 and whatever else may be available).
IPHostEntry hostEntry = Dns.Resolve(host);
foreach (IPAddress address in hostEntry.AddressList) {
Socket socket = new Socket(
address.AddressFamily,
SocketType.Stream,
ProtocolType.Tcp);
socket.Connect(new IPEndPoint(address, port));
if (socket.Connected) {
return socket;
}
}
throw new SocketException();
}
}
}

View File

@ -17,10 +17,15 @@ namespace OpenWire.Client {
Assert.IsTrue(factory != null, "created valid factory: " + factory);
Console.WriteLine("Worked!");
/*
using (IConnection connection = factory.CreateConnection()) {
ISession session = connection.CreateSession();
Console.WriteLine("Created a session: " + session);
IDestination destination = session.GetQueue("FOO.BAR");
Assert.IsTrue(destination != null, "No queue available!");
Console.WriteLine("Using destination: " + destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
IMessageProducer producer = session.CreateProducer(destination);
@ -32,7 +37,6 @@ namespace OpenWire.Client {
Assert.AreEqual(expected, message.Text);
}
*/
}
}
}

View File

@ -29,8 +29,6 @@
<include name="log4net.dll" />
<include name="${build.dir}/bin/${project.name}.dll" />
</references>
<resources failonempty="false" basedir="Resources"
dynamicprefix="true" prefix="XML:">
<include name="**/*.xml" />