Get it to compile after refactor

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383070 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-04 08:18:29 +00:00
parent eb6e421ff6
commit 1701ac7fa5
17 changed files with 1906 additions and 1922 deletions

View File

@ -15,11 +15,10 @@
* limitations under the License.
*/
using System;
using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
using ActiveMQ.OpenWire;
using ActiveMQ.OpenWire.Commands;
namespace OpenWire.Client.Core
namespace ActiveMQ.OpenWire
{
/// <summary>
/// Summary description for AbstractCommand.

View File

@ -17,11 +17,11 @@
using System;
using System.IO;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
using OpenWire.Client.IO;
using ActiveMQ.OpenWire;
using ActiveMQ.OpenWire.Commands;
namespace ActiveMQ.OpenWire
namespace OpenWire.Client.Core
{
/// <summary>
/// Represents a stream of boolean flags

View File

@ -16,10 +16,10 @@
*/
using System;
using System.Text;
using OpenWire.Client.Core;
using System.IO;
namespace OpenWire.Client.Core
namespace ActiveMQ.OpenWire
{
public struct StackTraceElement
{

View File

@ -15,9 +15,9 @@
* limitations under the License.
*/
using System;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core
namespace ActiveMQ.OpenWire
{
/// <summary>
/// An OpenWire command

View File

@ -15,9 +15,9 @@
* limitations under the License.
*/
using System;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core {
namespace ActiveMQ.OpenWire
{
/// <summary>
/// An OpenWire command
/// </summary>

View File

@ -1,39 +1,40 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core {
/// <summary>
/// Summary description for DataStructureSupport.
/// </summary>
public abstract class DataStructureSupport : DataStructure {
protected DataStructureSupport() {
}
public virtual byte GetDataStructureType() {
return 0;
}
public virtual bool IsMarshallAware() {
return false;
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using ActiveMQ.OpenWire;
using ActiveMQ.OpenWire.Commands;
namespace ActiveMQ.OpenWire
{
/// <summary>
/// Summary description for DataStructureSupport.
/// </summary>
public abstract class DataStructureSupport : DataStructure {
protected DataStructureSupport() {
}
public virtual byte GetDataStructureType() {
return 0;
}
public virtual bool IsMarshallAware() {
return false;
}
}
}

View File

@ -1,124 +1,124 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Collections;
using OpenWire.Client.Commands;
using System;
using OpenWire.Client;
using System.Threading;
namespace OpenWire.Client.Core
{
/// <summary>
/// Handles the multi-threaded dispatching between the transport and the consumers
/// </summary>
public class Dispatcher
{
Queue queue = new Queue();
Object semaphore = new Object();
ArrayList messagesToRedeliver = new ArrayList();
/// <summary>
/// Whem we start a transaction we must redeliver any rolled back messages
/// </summary>
public void RedeliverRolledBackMessages() {
lock (semaphore)
{
Queue replacement = new Queue(queue.Count + messagesToRedeliver.Count);
foreach (ActiveMQMessage element in messagesToRedeliver) {
replacement.Enqueue(element);
}
messagesToRedeliver.Clear();
while (queue.Count > 0)
{
ActiveMQMessage element = (ActiveMQMessage) queue.Dequeue();
replacement.Enqueue(element);
}
queue = replacement;
Monitor.PulseAll(semaphore);
}
}
/// <summary>
/// Redeliver the given message, putting it at the head of the queue
/// </summary>
public void Redeliver(ActiveMQMessage message)
{
lock (semaphore) {
messagesToRedeliver.Add(message);
}
}
/// <summary>
/// Method Enqueue
/// </summary>
public void Enqueue(ActiveMQMessage message)
{
lock (semaphore)
{
queue.Enqueue(message);
Monitor.PulseAll(semaphore);
}
}
/// <summary>
/// Method DequeueNoWait
/// </summary>
public IMessage DequeueNoWait()
{
lock (semaphore)
{
if (queue.Count > 0)
{
return (IMessage) queue.Dequeue();
}
}
return null;
}
/// <summary>
/// Method Dequeue
/// </summary>
public IMessage Dequeue(int timeout)
{
lock (semaphore)
{
if (queue.Count == 0)
{
Monitor.Wait(semaphore, timeout);
}
if (queue.Count > 0)
{
return (IMessage) queue.Dequeue();
}
}
return null;
}
/// <summary>
/// Method Dequeue
/// </summary>
public IMessage Dequeue()
{
lock (semaphore)
{
return (IMessage) queue.Dequeue();
}
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Collections;
using System;
using System.Threading;
using ActiveMQ.OpenWire.Commands;
namespace ActiveMQ.OpenWire
{
/// <summary>
/// Handles the multi-threaded dispatching between the transport and the consumers
/// </summary>
public class Dispatcher
{
Queue queue = new Queue();
Object semaphore = new Object();
ArrayList messagesToRedeliver = new ArrayList();
/// <summary>
/// Whem we start a transaction we must redeliver any rolled back messages
/// </summary>
public void RedeliverRolledBackMessages() {
lock (semaphore)
{
Queue replacement = new Queue(queue.Count + messagesToRedeliver.Count);
foreach (ActiveMQMessage element in messagesToRedeliver) {
replacement.Enqueue(element);
}
messagesToRedeliver.Clear();
while (queue.Count > 0)
{
ActiveMQMessage element = (ActiveMQMessage) queue.Dequeue();
replacement.Enqueue(element);
}
queue = replacement;
Monitor.PulseAll(semaphore);
}
}
/// <summary>
/// Redeliver the given message, putting it at the head of the queue
/// </summary>
public void Redeliver(ActiveMQMessage message)
{
lock (semaphore) {
messagesToRedeliver.Add(message);
}
}
/// <summary>
/// Method Enqueue
/// </summary>
public void Enqueue(ActiveMQMessage message)
{
lock (semaphore)
{
queue.Enqueue(message);
Monitor.PulseAll(semaphore);
}
}
/// <summary>
/// Method DequeueNoWait
/// </summary>
public IMessage DequeueNoWait()
{
lock (semaphore)
{
if (queue.Count > 0)
{
return (IMessage) queue.Dequeue();
}
}
return null;
}
/// <summary>
/// Method Dequeue
/// </summary>
public IMessage Dequeue(int timeout)
{
lock (semaphore)
{
if (queue.Count == 0)
{
Monitor.Wait(semaphore, timeout);
}
if (queue.Count > 0)
{
return (IMessage) queue.Dequeue();
}
}
return null;
}
/// <summary>
/// Method Dequeue
/// </summary>
public IMessage Dequeue()
{
lock (semaphore)
{
return (IMessage) queue.Dequeue();
}
}
}
}

View File

@ -17,10 +17,9 @@
using System;
using System.Threading;
using OpenWire.Client;
using OpenWire.Client.Commands;
using ActiveMQ.OpenWire.Commands;
namespace OpenWire.Client.Core
namespace ActiveMQ.OpenWire
{
/// <summary>
/// Handles asynchronous responses

View File

@ -1,36 +1,36 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace OpenWire.Client.Core
{
public interface ISynchronization
{
/// <summary>
/// Called before a commit
/// </summary>
void BeforeCommit();
/// <summary>
/// Called after a commit
/// </summary>
void AfterCommit();
/// <summary>
/// Called after a transaction rollback
/// </summary>
void AfterRollback();
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace ActiveMQ.OpenWire
{
public interface ISynchronization
{
/// <summary>
/// Called before a commit
/// </summary>
void BeforeCommit();
/// <summary>
/// Called after a commit
/// </summary>
void AfterCommit();
/// <summary>
/// Called after a transaction rollback
/// </summary>
void AfterRollback();
}
}

View File

@ -16,11 +16,9 @@
*/
using System;
using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
using ActiveMQ.OpenWire.Commands;
namespace OpenWire.Client.Core
namespace ActiveMQ.OpenWire
{
public delegate void CommandHandler(ITransport sender, Command command);

View File

@ -1,39 +1,36 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
using OpenWire.Client.IO;
namespace OpenWire.Client.Core {
/// <summary>
/// Represents a marshallable entity
/// </summary>
public interface MarshallAware {
void BeforeMarshall(OpenWireFormat wireFormat);
void AfterMarshall(OpenWireFormat wireFormat);
void BeforeUnmarshall(OpenWireFormat wireFormat);
void AfterUnmarshall(OpenWireFormat wireFormat);
void SetMarshalledForm(OpenWireFormat wireFormat, byte[] data);
byte[] GetMarshalledForm(OpenWireFormat wireFormat);
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
namespace ActiveMQ.OpenWire
{
/// <summary>
/// Represents a marshallable entity
/// </summary>
public interface MarshallAware {
void BeforeMarshall(OpenWireFormat wireFormat);
void AfterMarshall(OpenWireFormat wireFormat);
void BeforeUnmarshall(OpenWireFormat wireFormat);
void AfterUnmarshall(OpenWireFormat wireFormat);
void SetMarshalledForm(OpenWireFormat wireFormat, byte[] data);
byte[] GetMarshalledForm(OpenWireFormat wireFormat);
}
}

View File

@ -1,57 +1,55 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using OpenWire.Client;
using OpenWire.Client.Core;
using OpenWire.Client.Commands;
namespace OpenWire.Client.Core
{
public delegate object PropertyGetter(ActiveMQMessage message);
public delegate void PropertySetter(ActiveMQMessage message, object value);
public class MessagePropertyHelper
{
private IDictionary setters = new Hashtable();
private IDictionary getters = new Hashtable();
public MessagePropertyHelper()
{
// TODO find all of the JMS properties via introspection
}
public object GetObjectProperty(ActiveMQMessage message, string name) {
object getter = getters[name];
if (getter != null) {
}
return message.Properties[name];
}
public void SetObjectProperty(ActiveMQMessage message, string name, object value) {
PropertySetter setter = (PropertySetter) setters[name];
if (setter != null) {
setter(message, value);
}
else {
message.Properties[name] = value;
}
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using ActiveMQ.OpenWire.Commands;
namespace ActiveMQ.OpenWire
{
public delegate object PropertyGetter(ActiveMQMessage message);
public delegate void PropertySetter(ActiveMQMessage message, object value);
public class MessagePropertyHelper
{
private IDictionary setters = new Hashtable();
private IDictionary getters = new Hashtable();
public MessagePropertyHelper()
{
// TODO find all of the JMS properties via introspection
}
public object GetObjectProperty(ActiveMQMessage message, string name) {
object getter = getters[name];
if (getter != null) {
}
return message.Properties[name];
}
public void SetObjectProperty(ActiveMQMessage message, string name, object value) {
PropertySetter setter = (PropertySetter) setters[name];
if (setter != null) {
setter(message, value);
}
else {
message.Properties[name] = value;
}
}
}
}

View File

@ -1,224 +1,223 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
using OpenWire.Client.IO;
namespace OpenWire.Client.Core
{
/// <summary>
/// Represents the wire format
/// </summary>
public class OpenWireFormat
{
static private char[] MAGIC = new char[] { 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' };
private BaseDataStreamMarshaller[] dataMarshallers;
private const byte NULL_TYPE = 0;
private WireFormatInfo wireFormatInfo = new WireFormatInfo();
public OpenWireFormat()
{
// lets configure the wire format
wireFormatInfo.Magic = CreateMagicBytes();
wireFormatInfo.Version = 1;
wireFormatInfo.StackTraceEnabled = true;
wireFormatInfo.TcpNoDelayEnabled = true;
wireFormatInfo.PrefixPacketSize = true;
wireFormatInfo.TightEncodingEnabled = true;
dataMarshallers = new BaseDataStreamMarshaller[256];
MarshallerFactory factory = new MarshallerFactory();
factory.configure(this);
}
public WireFormatInfo WireFormatInfo {
get {
return wireFormatInfo;
}
}
public bool StackTraceEnabled {
get {
return wireFormatInfo.StackTraceEnabled;
}
}
public void addMarshaller(BaseDataStreamMarshaller marshaller)
{
byte type = marshaller.GetDataStructureType();
dataMarshallers[type & 0xFF] = marshaller;
}
public void Marshal(Object o, BinaryWriter ds)
{
int size = 1;
if (o != null)
{
DataStructure c = (DataStructure) o;
byte type = c.GetDataStructureType();
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
BooleanStream bs = new BooleanStream();
size += dsm.TightMarshal1(this, c, bs);
size += bs.MarshalledSize();
BaseDataStreamMarshaller.WriteInt(size, ds);
BaseDataStreamMarshaller.WriteByte(type, ds);
bs.Marshal(ds);
dsm.TightMarshal2(this, c, ds, bs);
}
else
{
BaseDataStreamMarshaller.WriteInt(size, ds);
BaseDataStreamMarshaller.WriteByte(NULL_TYPE, ds);
}
}
public Object Unmarshal(BinaryReader dis)
{
// lets ignore the size of the packet
BaseDataStreamMarshaller.ReadInt(dis);
// first byte is the type of the packet
byte dataType = BaseDataStreamMarshaller.ReadByte(dis);
if (dataType != NULL_TYPE)
{
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + dataType);
//Console.WriteLine("Parsing type: " + dataType + " with: " + dsm);
Object data = dsm.CreateObject();
BooleanStream bs = new BooleanStream();
bs.Unmarshal(dis);
dsm.TightUnmarshal(this, data, dis, bs);
return data;
}
else
{
return null;
}
}
public int TightMarshalNestedObject1(DataStructure o, BooleanStream bs)
{
bs.WriteBoolean(o != null);
if (o == null)
return 0;
if (o.IsMarshallAware())
{
MarshallAware ma = (MarshallAware) o;
byte[] sequence = ma.GetMarshalledForm(this);
bs.WriteBoolean(sequence != null);
if (sequence != null)
{
return 1 + sequence.Length;
}
}
byte type = o.GetDataStructureType();
if (type == 0) {
throw new IOException("No valid data structure type for: " + o + " of type: " + o.GetType());
}
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
//Console.WriteLine("Marshalling type: " + type + " with structure: " + o);
return 1 + dsm.TightMarshal1(this, o, bs);
}
public void TightMarshalNestedObject2(DataStructure o, BinaryWriter ds, BooleanStream bs)
{
if (!bs.ReadBoolean())
return ;
byte type = o.GetDataStructureType();
BaseDataStreamMarshaller.WriteByte(type, ds);
if (o.IsMarshallAware() && bs.ReadBoolean())
{
MarshallAware ma = (MarshallAware) o;
byte[] sequence = ma.GetMarshalledForm(this);
ds.Write(sequence, 0, sequence.Length);
}
else
{
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
dsm.TightMarshal2(this, o, ds, bs);
}
}
public DataStructure TightUnmarshalNestedObject(BinaryReader dis, BooleanStream bs)
{
if (bs.ReadBoolean())
{
byte dataType = BaseDataStreamMarshaller.ReadByte(dis);
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + dataType);
DataStructure data = dsm.CreateObject();
if (data.IsMarshallAware() && bs.ReadBoolean())
{
BaseDataStreamMarshaller.ReadInt(dis);
BaseDataStreamMarshaller.ReadByte(dis);
BooleanStream bs2 = new BooleanStream();
bs2.Unmarshal(dis);
dsm.TightUnmarshal(this, data, dis, bs2);
// TODO: extract the sequence from the dis and associate it.
// MarshallAware ma = (MarshallAware)data
// ma.setCachedMarshalledForm(this, sequence);
}
else
{
dsm.TightUnmarshal(this, data, dis, bs);
}
return data;
}
else
{
return null;
}
}
/// <summary>
/// Method CreateMagicBytes
/// </summary>
private byte[] CreateMagicBytes()
{
byte[] answer = new byte[MAGIC.Length];
for (int i = 0; i < answer.Length; i++)
{
answer[i] = (byte) MAGIC[i];
}
return answer;
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using ActiveMQ.OpenWire.Commands;
using ActiveMQ.OpenWire.V1;
namespace ActiveMQ.OpenWire
{
/// <summary>
/// Represents the wire format
/// </summary>
public class OpenWireFormat
{
static private char[] MAGIC = new char[] { 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' };
private BaseDataStreamMarshaller[] dataMarshallers;
private const byte NULL_TYPE = 0;
private WireFormatInfo wireFormatInfo = new WireFormatInfo();
public OpenWireFormat()
{
// lets configure the wire format
wireFormatInfo.Magic = CreateMagicBytes();
wireFormatInfo.Version = 1;
wireFormatInfo.StackTraceEnabled = true;
wireFormatInfo.TcpNoDelayEnabled = true;
wireFormatInfo.PrefixPacketSize = true;
wireFormatInfo.TightEncodingEnabled = true;
dataMarshallers = new BaseDataStreamMarshaller[256];
MarshallerFactory factory = new MarshallerFactory();
factory.configure(this);
}
public WireFormatInfo WireFormatInfo {
get {
return wireFormatInfo;
}
}
public bool StackTraceEnabled {
get {
return wireFormatInfo.StackTraceEnabled;
}
}
public void addMarshaller(BaseDataStreamMarshaller marshaller)
{
byte type = marshaller.GetDataStructureType();
dataMarshallers[type & 0xFF] = marshaller;
}
public void Marshal(Object o, BinaryWriter ds)
{
int size = 1;
if (o != null)
{
DataStructure c = (DataStructure) o;
byte type = c.GetDataStructureType();
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
BooleanStream bs = new BooleanStream();
size += dsm.TightMarshal1(this, c, bs);
size += bs.MarshalledSize();
BaseDataStreamMarshaller.WriteInt(size, ds);
BaseDataStreamMarshaller.WriteByte(type, ds);
bs.Marshal(ds);
dsm.TightMarshal2(this, c, ds, bs);
}
else
{
BaseDataStreamMarshaller.WriteInt(size, ds);
BaseDataStreamMarshaller.WriteByte(NULL_TYPE, ds);
}
}
public Object Unmarshal(BinaryReader dis)
{
// lets ignore the size of the packet
BaseDataStreamMarshaller.ReadInt(dis);
// first byte is the type of the packet
byte dataType = BaseDataStreamMarshaller.ReadByte(dis);
if (dataType != NULL_TYPE)
{
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + dataType);
//Console.WriteLine("Parsing type: " + dataType + " with: " + dsm);
Object data = dsm.CreateObject();
BooleanStream bs = new BooleanStream();
bs.Unmarshal(dis);
dsm.TightUnmarshal(this, data, dis, bs);
return data;
}
else
{
return null;
}
}
public int TightMarshalNestedObject1(DataStructure o, BooleanStream bs)
{
bs.WriteBoolean(o != null);
if (o == null)
return 0;
if (o.IsMarshallAware())
{
MarshallAware ma = (MarshallAware) o;
byte[] sequence = ma.GetMarshalledForm(this);
bs.WriteBoolean(sequence != null);
if (sequence != null)
{
return 1 + sequence.Length;
}
}
byte type = o.GetDataStructureType();
if (type == 0) {
throw new IOException("No valid data structure type for: " + o + " of type: " + o.GetType());
}
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
//Console.WriteLine("Marshalling type: " + type + " with structure: " + o);
return 1 + dsm.TightMarshal1(this, o, bs);
}
public void TightMarshalNestedObject2(DataStructure o, BinaryWriter ds, BooleanStream bs)
{
if (!bs.ReadBoolean())
return ;
byte type = o.GetDataStructureType();
BaseDataStreamMarshaller.WriteByte(type, ds);
if (o.IsMarshallAware() && bs.ReadBoolean())
{
MarshallAware ma = (MarshallAware) o;
byte[] sequence = ma.GetMarshalledForm(this);
ds.Write(sequence, 0, sequence.Length);
}
else
{
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
dsm.TightMarshal2(this, o, ds, bs);
}
}
public DataStructure TightUnmarshalNestedObject(BinaryReader dis, BooleanStream bs)
{
if (bs.ReadBoolean())
{
byte dataType = BaseDataStreamMarshaller.ReadByte(dis);
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + dataType);
DataStructure data = dsm.CreateObject();
if (data.IsMarshallAware() && bs.ReadBoolean())
{
BaseDataStreamMarshaller.ReadInt(dis);
BaseDataStreamMarshaller.ReadByte(dis);
BooleanStream bs2 = new BooleanStream();
bs2.Unmarshal(dis);
dsm.TightUnmarshal(this, data, dis, bs2);
// TODO: extract the sequence from the dis and associate it.
// MarshallAware ma = (MarshallAware)data
// ma.setCachedMarshalledForm(this, sequence);
}
else
{
dsm.TightUnmarshal(this, data, dis, bs);
}
return data;
}
else
{
return null;
}
}
/// <summary>
/// Method CreateMagicBytes
/// </summary>
private byte[] CreateMagicBytes()
{
byte[] answer = new byte[MAGIC.Length];
for (int i = 0; i < answer.Length; i++)
{
answer[i] = (byte) MAGIC[i];
}
return answer;
}
}
}

View File

@ -1,241 +1,238 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core
{
/// <summary>
/// A default implementation of IPrimitiveMap
/// </summary>
public class PrimitiveMap : IPrimitiveMap
{
private IDictionary dictionary = new Hashtable();
/// <summary>
/// Unmarshalls the map from the given data or if the data is null just
/// return an empty map
/// </summary>
public static PrimitiveMap Unmarshal(byte[] data)
{
PrimitiveMap answer = new PrimitiveMap();
answer.dictionary = BaseDataStreamMarshaller.UnmarshalPrimitiveMap(data);
return answer;
}
public byte[] Marshal()
{
return BaseDataStreamMarshaller.MarshalPrimitiveMap(dictionary);
}
public void Clear()
{
dictionary.Clear();
}
public bool Contains(Object key)
{
return dictionary.Contains(key);
}
public void Remove(Object key)
{
dictionary.Remove(key);
}
public int Count
{
get {
return dictionary.Count;
}
}
public ICollection Keys
{
get {
return dictionary.Keys;
}
}
public ICollection Values
{
get {
return dictionary.Values;
}
}
public object this[string key]
{
get {
return GetValue(key);
}
set {
CheckValidType(value);
SetValue(key, value);
}
}
public string GetString(string key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(string));
return (string) value;
}
public void SetString(string key, string value)
{
SetValue(key, value);
}
public bool GetBool(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(bool));
return (bool) value;
}
public void SetByte(String key, bool value)
{
SetValue(key, value);
}
public byte GetByte(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(byte));
return (byte) value;
}
public void SetByte(String key, byte value)
{
SetValue(key, value);
}
public char GetChar(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(char));
return (char) value;
}
public void SetChar(String key, char value)
{
SetValue(key, value);
}
public short GetShort(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(short));
return (short) value;
}
public void SetShort(String key, short value)
{
SetValue(key, value);
}
public int GetInt(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(int));
return (int) value;
}
public void SetInt(String key, int value)
{
SetValue(key, value);
}
public long GetLong(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(long));
return (long) value;
}
public void SetLong(String key, long value)
{
SetValue(key, value);
}
public float GetFloat(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(float));
return (float) value;
}
public void SetFloat(String key, float value)
{
SetValue(key, value);
}
public double GetDouble(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(double));
return (double) value;
}
public void SetDouble(String key, double value)
{
SetValue(key, value);
}
protected virtual void SetValue(String key, Object value)
{
dictionary[key] = value;
}
protected virtual Object GetValue(String key)
{
return dictionary[key];
}
protected virtual void CheckValueType(Object value, Type type)
{
if (! type.IsInstanceOfType(value))
{
throw new OpenWireException("Expected type: " + type.Name + " but was: " + value);
}
}
protected virtual void CheckValidType(Object value)
{
if (value != null)
{
Type type = value.GetType();
if (! type.IsPrimitive && !type.IsValueType && !type.IsAssignableFrom(typeof(string)))
{
throw new OpenWireException("Invalid type: " + type.Name + " for value: " + value);
}
}
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
namespace ActiveMQ.OpenWire
{
/// <summary>
/// A default implementation of IPrimitiveMap
/// </summary>
public class PrimitiveMap : IPrimitiveMap
{
private IDictionary dictionary = new Hashtable();
/// <summary>
/// Unmarshalls the map from the given data or if the data is null just
/// return an empty map
/// </summary>
public static PrimitiveMap Unmarshal(byte[] data)
{
PrimitiveMap answer = new PrimitiveMap();
answer.dictionary = BaseDataStreamMarshaller.UnmarshalPrimitiveMap(data);
return answer;
}
public byte[] Marshal()
{
return BaseDataStreamMarshaller.MarshalPrimitiveMap(dictionary);
}
public void Clear()
{
dictionary.Clear();
}
public bool Contains(Object key)
{
return dictionary.Contains(key);
}
public void Remove(Object key)
{
dictionary.Remove(key);
}
public int Count
{
get {
return dictionary.Count;
}
}
public ICollection Keys
{
get {
return dictionary.Keys;
}
}
public ICollection Values
{
get {
return dictionary.Values;
}
}
public object this[string key]
{
get {
return GetValue(key);
}
set {
CheckValidType(value);
SetValue(key, value);
}
}
public string GetString(string key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(string));
return (string) value;
}
public void SetString(string key, string value)
{
SetValue(key, value);
}
public bool GetBool(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(bool));
return (bool) value;
}
public void SetByte(String key, bool value)
{
SetValue(key, value);
}
public byte GetByte(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(byte));
return (byte) value;
}
public void SetByte(String key, byte value)
{
SetValue(key, value);
}
public char GetChar(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(char));
return (char) value;
}
public void SetChar(String key, char value)
{
SetValue(key, value);
}
public short GetShort(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(short));
return (short) value;
}
public void SetShort(String key, short value)
{
SetValue(key, value);
}
public int GetInt(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(int));
return (int) value;
}
public void SetInt(String key, int value)
{
SetValue(key, value);
}
public long GetLong(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(long));
return (long) value;
}
public void SetLong(String key, long value)
{
SetValue(key, value);
}
public float GetFloat(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(float));
return (float) value;
}
public void SetFloat(String key, float value)
{
SetValue(key, value);
}
public double GetDouble(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(double));
return (double) value;
}
public void SetDouble(String key, double value)
{
SetValue(key, value);
}
protected virtual void SetValue(String key, Object value)
{
dictionary[key] = value;
}
protected virtual Object GetValue(String key)
{
return dictionary[key];
}
protected virtual void CheckValueType(Object value, Type type)
{
if (! type.IsInstanceOfType(value))
{
throw new OpenWireException("Expected type: " + type.Name + " but was: " + value);
}
}
protected virtual void CheckValidType(Object value)
{
if (value != null)
{
Type type = value.GetType();
if (! type.IsPrimitive && !type.IsValueType && !type.IsAssignableFrom(typeof(string)))
{
throw new OpenWireException("Invalid type: " + type.Name + " for value: " + value);
}
}
}
}
}

View File

@ -22,13 +22,9 @@ using System.Net.Sockets;
using System.Text;
using System.Threading;
using ActiveMQ.OpenWire.Commands;
using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
using OpenWire.Client.IO;
namespace OpenWire.Client.Core
namespace ActiveMQ.OpenWire
{
/// <summary>

View File

@ -1,110 +1,110 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Collections;
using OpenWire.Client.Commands;
using System;
using OpenWire.Client;
namespace OpenWire.Client.Core
{
public enum TransactionType
{
Begin = 0, Prepare = 1, CommitOnePhase = 2, CommitTwoPhase = 3, Rollback = 4, Recover=5, Forget = 6, End = 7
}
public class TransactionContext
{
private TransactionId transactionId;
private Session session;
private ArrayList synchronizations = new ArrayList();
public TransactionContext(Session session) {
this.session = session;
}
public TransactionId TransactionId
{
get { return transactionId; }
}
/// <summary>
/// Method AddSynchronization
/// </summary>
public void AddSynchronization(ISynchronization synchronization)
{
synchronizations.Add(synchronization);
}
public void Begin()
{
if (transactionId == null)
{
transactionId = session.Connection.CreateLocalTransactionId();
TransactionInfo info = new TransactionInfo();
info.ConnectionId = session.Connection.ConnectionId;
info.TransactionId = transactionId;
info.Type = (int) TransactionType.Begin;
session.Connection.OneWay(info);
}
}
public void Rollback()
{
if (transactionId != null)
{
TransactionInfo info = new TransactionInfo();
info.ConnectionId = session.Connection.ConnectionId;
info.TransactionId = transactionId;
info.Type = (int) TransactionType.Rollback;
transactionId = null;
session.Connection.OneWay(info);
}
foreach (ISynchronization synchronization in synchronizations) {
synchronization.AfterRollback();
}
synchronizations.Clear();
}
public void Commit()
{
foreach (ISynchronization synchronization in synchronizations) {
synchronization.BeforeCommit();
}
if (transactionId != null)
{
TransactionInfo info = new TransactionInfo();
info.ConnectionId = session.Connection.ConnectionId;
info.TransactionId = transactionId;
info.Type = (int) TransactionType.CommitOnePhase;
transactionId = null;
session.Connection.OneWay(info);
}
foreach (ISynchronization synchronization in synchronizations) {
synchronization.AfterCommit();
}
synchronizations.Clear();
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Collections;
using System;
using ActiveMQ.OpenWire.Commands;
namespace ActiveMQ.OpenWire
{
public enum TransactionType
{
Begin = 0, Prepare = 1, CommitOnePhase = 2, CommitTwoPhase = 3, Rollback = 4, Recover=5, Forget = 6, End = 7
}
public class TransactionContext
{
private TransactionId transactionId;
private Session session;
private ArrayList synchronizations = new ArrayList();
public TransactionContext(Session session) {
this.session = session;
}
public TransactionId TransactionId
{
get { return transactionId; }
}
/// <summary>
/// Method AddSynchronization
/// </summary>
public void AddSynchronization(ISynchronization synchronization)
{
synchronizations.Add(synchronization);
}
public void Begin()
{
if (transactionId == null)
{
transactionId = session.Connection.CreateLocalTransactionId();
TransactionInfo info = new TransactionInfo();
info.ConnectionId = session.Connection.ConnectionId;
info.TransactionId = transactionId;
info.Type = (int) TransactionType.Begin;
session.Connection.OneWay(info);
}
}
public void Rollback()
{
if (transactionId != null)
{
TransactionInfo info = new TransactionInfo();
info.ConnectionId = session.Connection.ConnectionId;
info.TransactionId = transactionId;
info.Type = (int) TransactionType.Rollback;
transactionId = null;
session.Connection.OneWay(info);
}
foreach (ISynchronization synchronization in synchronizations) {
synchronization.AfterRollback();
}
synchronizations.Clear();
}
public void Commit()
{
foreach (ISynchronization synchronization in synchronizations) {
synchronization.BeforeCommit();
}
if (transactionId != null)
{
TransactionInfo info = new TransactionInfo();
info.ConnectionId = session.Connection.ConnectionId;
info.TransactionId = transactionId;
info.Type = (int) TransactionType.CommitOnePhase;
transactionId = null;
session.Connection.OneWay(info);
}
foreach (ISynchronization synchronization in synchronizations) {
synchronization.AfterCommit();
}
synchronizations.Clear();
}
}
}