API updates.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384955 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-10 23:44:24 +00:00
parent fd2fa9da5c
commit 4727a0f452
10 changed files with 1336 additions and 1308 deletions

View File

@ -1,220 +1,220 @@
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003" DefaultTargets="Build">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>8.0.30703</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{08321F42-4B3D-4815-B592-95962BAC3B9F}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>activemq-dotnet</RootNamespace>
<AssemblyName>activemq-dotnet</AssemblyName>
<WarningLevel>4</WarningLevel>
<StartupObject/>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<AllowUnsafeBlocks>false</AllowUnsafeBlocks>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugSymbols>false</DebugSymbols>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<AllowUnsafeBlocks>false</AllowUnsafeBlocks>
</PropertyGroup>
<ItemGroup>
<Reference Include="nunit.framework"/>
<Reference Include="System"/>
<Reference Include="System.Data"/>
<Reference Include="System.Xml"/>
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSHARP.Targets"/>
<ItemGroup>
<Compile Include="src\main\csharp\ActiveMQ\BrokerException.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\AbstractCommand.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQBytesMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQDestination.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQMapMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQObjectMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQQueue.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQStreamMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQTempDestination.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQTempQueue.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQTempTopic.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQTextMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQTopic.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\BaseCommand.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\BooleanExpression.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\BrokerError.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\BrokerId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\BrokerInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\Command.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ConnectionError.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ConnectionId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ConnectionInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ConsumerId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ConsumerInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ControlCommand.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DataArrayResponse.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DataResponse.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DataStructure.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DataStructureSupport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DestinationInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DiscoveryEvent.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ExceptionResponse.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\FlushCommand.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\IntegerResponse.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\JournalQueueAck.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\JournalTopicAck.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\JournalTrace.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\JournalTransaction.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\KeepAliveInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\LocalTransactionId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MarshallAware.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\Message.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MessageAck.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MessageDispatch.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MessageDispatchNotification.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MessageId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MessageReference.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\NetworkBridgeFilter.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ProducerId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ProducerInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\RemoveInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\RemoveSubscriptionInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ReplayCommand.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\Response.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\SessionId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\SessionInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ShutdownInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\SubscriptionInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\TransactionId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\TransactionInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\WireFormatInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\XATransactionId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\Xid.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Connection.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\ConnectionClosedException.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\ConnectionFactory.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\ConsumerClosedException.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\DestinationFilter.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Dispatcher.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\ISynchronization.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\MessageConsumer.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\MessageProducer.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\BaseDataStreamMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\BooleanStream.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\EndianSupport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\MessagePropertyHelper.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\OpenWireBinaryReader.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\OpenWireBinaryWriter.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\OpenWireFormat.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\PrimitiveMap.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQBytesMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQDestinationMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQMapMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQObjectMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQQueueMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQStreamMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQTempDestinationMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQTempQueueMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQTempTopicMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQTextMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQTopicMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\BaseCommandMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\BrokerIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\BrokerInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ConnectionErrorMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ConnectionIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ConnectionInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ConsumerIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ConsumerInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ControlCommandMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\DataArrayResponseMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\DataResponseMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\DataStructureSupportMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\DestinationInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\DiscoveryEventMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ExceptionResponseMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\FlushCommandMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\IntegerResponseMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\JournalQueueAckMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\JournalTopicAckMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\JournalTraceMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\JournalTransactionMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\KeepAliveInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\LocalTransactionIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MarshallerFactory.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MessageAckMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MessageDispatchMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MessageDispatchNotificationMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MessageIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\NetworkBridgeFilterMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ProducerIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ProducerInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\RemoveInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\RemoveSubscriptionInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ReplayCommandMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ResponseMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\SessionIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\SessionInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ShutdownInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\SubscriptionInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\TransactionIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\TransactionInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\WireFormatInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\XATransactionIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Session.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\TransactionContext.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\FutureResponse.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\ITransport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\ITransportFactory.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\LoggingTransport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\MutexTransport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\ResponseCorrelator.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\Tcp\TcpTransport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\Tcp\TcpTransportFactory.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\TransportFilter.cs"/>
<Compile Include="src\main\csharp\CommonAssemblyInfo.cs"/>
<Compile Include="src\main\csharp\NMS\IBytesMessage.cs"/>
<Compile Include="src\main\csharp\NMS\IConnection.cs"/>
<Compile Include="src\main\csharp\NMS\IConnectionFactory.cs"/>
<Compile Include="src\main\csharp\NMS\IDestination.cs"/>
<Compile Include="src\main\csharp\NMS\IMapMessage.cs"/>
<Compile Include="src\main\csharp\NMS\IMessage.cs"/>
<Compile Include="src\main\csharp\NMS\IMessageConsumer.cs"/>
<Compile Include="src\main\csharp\NMS\IMessageProducer.cs"/>
<Compile Include="src\main\csharp\NMS\IPrimitiveMap.cs"/>
<Compile Include="src\main\csharp\NMS\IQueue.cs"/>
<Compile Include="src\main\csharp\NMS\ISession.cs"/>
<Compile Include="src\main\csharp\NMS\IStartable.cs"/>
<Compile Include="src\main\csharp\NMS\IStoppable.cs"/>
<Compile Include="src\main\csharp\NMS\ITemporaryQueue.cs"/>
<Compile Include="src\main\csharp\NMS\ITemporaryTopic.cs"/>
<Compile Include="src\main\csharp\NMS\ITextMessage.cs"/>
<Compile Include="src\main\csharp\NMS\ITopic.cs"/>
<Compile Include="src\main\csharp\NMS\NMSConnectionException.cs"/>
<Compile Include="src\main\csharp\NMS\NMSException.cs"/>
<Compile Include="src\test\csharp\ActiveMQ\Commands\CommandTest.cs"/>
<Compile Include="src\test\csharp\ActiveMQ\OpenWire\BooleanStreamTest.cs"/>
<Compile Include="src\test\csharp\ActiveMQ\OpenWire\EndianTest.cs"/>
<Compile Include="src\test\csharp\ActiveMQ\TestMain.cs"/>
<Compile Include="src\test\csharp\NMS\AsyncConsumeTest.cs"/>
<Compile Include="src\test\csharp\NMS\BadConsumeTest.cs"/>
<Compile Include="src\test\csharp\NMS\BytesMessageTest.cs"/>
<Compile Include="src\test\csharp\NMS\ConsumerTest.cs"/>
<Compile Include="src\test\csharp\NMS\JMSPropertyTest.cs"/>
<Compile Include="src\test\csharp\NMS\JMSTestSupport.cs"/>
<Compile Include="src\test\csharp\NMS\MapMessageTest.cs"/>
<Compile Include="src\test\csharp\NMS\MessageTest.cs"/>
<Compile Include="src\test\csharp\NMS\TextMessage.cs"/>
<Compile Include="src\test\csharp\NMS\TransactionTest.cs"/>
</ItemGroup>
</Project>
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003" DefaultTargets="Build">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>8.0.30703</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{08321F42-4B3D-4815-B592-95962BAC3B9F}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>activemq-dotnet</RootNamespace>
<AssemblyName>activemq-dotnet</AssemblyName>
<WarningLevel>4</WarningLevel>
<StartupObject/>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<AllowUnsafeBlocks>false</AllowUnsafeBlocks>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugSymbols>false</DebugSymbols>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<AllowUnsafeBlocks>false</AllowUnsafeBlocks>
</PropertyGroup>
<ItemGroup>
<Reference Include="nunit.framework"/>
<Reference Include="System"/>
<Reference Include="System.Data"/>
<Reference Include="System.Xml"/>
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSHARP.Targets"/>
<ItemGroup>
<Compile Include="src\main\csharp\ActiveMQ\BrokerException.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\AbstractCommand.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQBytesMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQDestination.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQMapMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQObjectMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQQueue.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQStreamMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQTempDestination.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQTempQueue.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQTempTopic.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQTextMessage.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ActiveMQTopic.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\BaseCommand.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\BooleanExpression.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\BrokerError.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\BrokerId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\BrokerInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\Command.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ConnectionError.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ConnectionId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ConnectionInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ConsumerId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ConsumerInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ControlCommand.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DataArrayResponse.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DataResponse.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DataStructure.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DataStructureSupport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DestinationInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\DiscoveryEvent.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ExceptionResponse.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\FlushCommand.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\IntegerResponse.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\JournalQueueAck.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\JournalTopicAck.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\JournalTrace.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\JournalTransaction.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\KeepAliveInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\LocalTransactionId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MarshallAware.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\Message.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MessageAck.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MessageDispatch.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MessageDispatchNotification.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MessageId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\MessageReference.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\NetworkBridgeFilter.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ProducerId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ProducerInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\RemoveInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\RemoveSubscriptionInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ReplayCommand.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\Response.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\SessionId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\SessionInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\ShutdownInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\SubscriptionInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\TransactionId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\TransactionInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\WireFormatInfo.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\XATransactionId.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Commands\Xid.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Connection.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\ConnectionClosedException.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\ConnectionFactory.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\ConsumerClosedException.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\DestinationFilter.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Dispatcher.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\ISynchronization.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\MessageConsumer.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\MessageProducer.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\BaseDataStreamMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\BooleanStream.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\EndianSupport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\MessagePropertyHelper.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\OpenWireBinaryReader.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\OpenWireBinaryWriter.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\OpenWireFormat.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\PrimitiveMap.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQBytesMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQDestinationMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQMapMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQObjectMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQQueueMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQStreamMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQTempDestinationMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQTempQueueMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQTempTopicMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQTextMessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ActiveMQTopicMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\BaseCommandMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\BrokerIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\BrokerInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ConnectionErrorMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ConnectionIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ConnectionInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ConsumerIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ConsumerInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ControlCommandMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\DataArrayResponseMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\DataResponseMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\DataStructureSupportMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\DestinationInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\DiscoveryEventMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ExceptionResponseMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\FlushCommandMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\IntegerResponseMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\JournalQueueAckMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\JournalTopicAckMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\JournalTraceMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\JournalTransactionMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\KeepAliveInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\LocalTransactionIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MarshallerFactory.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MessageAckMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MessageDispatchMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MessageDispatchNotificationMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MessageIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\MessageMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\NetworkBridgeFilterMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ProducerIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ProducerInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\RemoveInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\RemoveSubscriptionInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ReplayCommandMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ResponseMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\SessionIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\SessionInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\ShutdownInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\SubscriptionInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\TransactionIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\TransactionInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\WireFormatInfoMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\OpenWire\V1\XATransactionIdMarshaller.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Session.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\TransactionContext.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\FutureResponse.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\ITransport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\ITransportFactory.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\LoggingTransport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\MutexTransport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\ResponseCorrelator.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\Tcp\TcpTransport.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\Tcp\TcpTransportFactory.cs"/>
<Compile Include="src\main\csharp\ActiveMQ\Transport\TransportFilter.cs"/>
<Compile Include="src\main\csharp\CommonAssemblyInfo.cs"/>
<Compile Include="src\main\csharp\NMS\IBytesMessage.cs"/>
<Compile Include="src\main\csharp\NMS\IConnection.cs"/>
<Compile Include="src\main\csharp\NMS\IConnectionFactory.cs"/>
<Compile Include="src\main\csharp\NMS\IDestination.cs"/>
<Compile Include="src\main\csharp\NMS\IMapMessage.cs"/>
<Compile Include="src\main\csharp\NMS\IMessage.cs"/>
<Compile Include="src\main\csharp\NMS\IMessageConsumer.cs"/>
<Compile Include="src\main\csharp\NMS\IMessageProducer.cs"/>
<Compile Include="src\main\csharp\NMS\IPrimitiveMap.cs"/>
<Compile Include="src\main\csharp\NMS\IQueue.cs"/>
<Compile Include="src\main\csharp\NMS\ISession.cs"/>
<Compile Include="src\main\csharp\NMS\IStartable.cs"/>
<Compile Include="src\main\csharp\NMS\IStoppable.cs"/>
<Compile Include="src\main\csharp\NMS\ITemporaryQueue.cs"/>
<Compile Include="src\main\csharp\NMS\ITemporaryTopic.cs"/>
<Compile Include="src\main\csharp\NMS\ITextMessage.cs"/>
<Compile Include="src\main\csharp\NMS\ITopic.cs"/>
<Compile Include="src\main\csharp\NMS\NMSConnectionException.cs"/>
<Compile Include="src\main\csharp\NMS\NMSException.cs"/>
<Compile Include="src\test\csharp\ActiveMQ\Commands\CommandTest.cs"/>
<Compile Include="src\test\csharp\ActiveMQ\OpenWire\BooleanStreamTest.cs"/>
<Compile Include="src\test\csharp\ActiveMQ\OpenWire\EndianTest.cs"/>
<Compile Include="src\test\csharp\ActiveMQ\TestMain.cs"/>
<Compile Include="src\test\csharp\NMS\AsyncConsumeTest.cs"/>
<Compile Include="src\test\csharp\NMS\BadConsumeTest.cs"/>
<Compile Include="src\test\csharp\NMS\BytesMessageTest.cs"/>
<Compile Include="src\test\csharp\NMS\ConsumerTest.cs"/>
<Compile Include="src\test\csharp\NMS\JMSPropertyTest.cs"/>
<Compile Include="src\test\csharp\NMS\JMSTestSupport.cs"/>
<Compile Include="src\test\csharp\NMS\MapMessageTest.cs"/>
<Compile Include="src\test\csharp\NMS\MessageTest.cs"/>
<Compile Include="src\test\csharp\NMS\TextMessage.cs"/>
<Compile Include="src\test\csharp\NMS\TransactionTest.cs"/>
</ItemGroup>
</Project>

View File

@ -1,129 +1,129 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using ActiveMQ.Commands;
using NMS;
using System;
using System.Collections;
using System.Threading;
namespace ActiveMQ
{
/// <summary>
/// Handles the multi-threaded dispatching between the transport and the consumers
/// </summary>
public class Dispatcher
{
Queue queue = new Queue();
Object semaphore = new Object();
ArrayList messagesToRedeliver = new ArrayList();
/// <summary>
/// Whem we start a transaction we must redeliver any rolled back messages
/// </summary>
public void RedeliverRolledBackMessages()
{
lock (semaphore)
{
Queue replacement = new Queue(queue.Count + messagesToRedeliver.Count);
foreach (ActiveMQMessage element in messagesToRedeliver)
{
replacement.Enqueue(element);
}
messagesToRedeliver.Clear();
while (queue.Count > 0)
{
ActiveMQMessage element = (ActiveMQMessage) queue.Dequeue();
replacement.Enqueue(element);
}
queue = replacement;
Monitor.PulseAll(semaphore);
}
}
/// <summary>
/// Redeliver the given message, putting it at the head of the queue
/// </summary>
public void Redeliver(ActiveMQMessage message)
{
lock (semaphore)
{
messagesToRedeliver.Add(message);
}
}
/// <summary>
/// Method Enqueue
/// </summary>
public void Enqueue(ActiveMQMessage message)
{
lock (semaphore)
{
queue.Enqueue(message);
Monitor.PulseAll(semaphore);
}
}
/// <summary>
/// Method DequeueNoWait
/// </summary>
public IMessage DequeueNoWait()
{
lock (semaphore)
{
if (queue.Count > 0)
{
return (IMessage) queue.Dequeue();
}
}
return null;
}
/// <summary>
/// Method Dequeue
/// </summary>
public IMessage Dequeue(int timeout)
{
lock (semaphore)
{
if (queue.Count == 0)
{
Monitor.Wait(semaphore, timeout);
}
if (queue.Count > 0)
{
return (IMessage) queue.Dequeue();
}
}
return null;
}
/// <summary>
/// Method Dequeue
/// </summary>
public IMessage Dequeue()
{
lock (semaphore)
{
return (IMessage) queue.Dequeue();
}
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using ActiveMQ.Commands;
using NMS;
using System;
using System.Collections;
using System.Threading;
namespace ActiveMQ
{
/// <summary>
/// Handles the multi-threaded dispatching between the transport and the consumers
/// </summary>
public class Dispatcher
{
Queue queue = new Queue();
Object semaphore = new Object();
ArrayList messagesToRedeliver = new ArrayList();
/// <summary>
/// Whem we start a transaction we must redeliver any rolled back messages
/// </summary>
public void RedeliverRolledBackMessages()
{
lock (semaphore)
{
Queue replacement = new Queue(queue.Count + messagesToRedeliver.Count);
foreach (ActiveMQMessage element in messagesToRedeliver)
{
replacement.Enqueue(element);
}
messagesToRedeliver.Clear();
while (queue.Count > 0)
{
ActiveMQMessage element = (ActiveMQMessage) queue.Dequeue();
replacement.Enqueue(element);
}
queue = replacement;
Monitor.PulseAll(semaphore);
}
}
/// <summary>
/// Redeliver the given message, putting it at the head of the queue
/// </summary>
public void Redeliver(ActiveMQMessage message)
{
lock (semaphore)
{
messagesToRedeliver.Add(message);
}
}
/// <summary>
/// Method Enqueue
/// </summary>
public void Enqueue(ActiveMQMessage message)
{
lock (semaphore)
{
queue.Enqueue(message);
Monitor.PulseAll(semaphore);
}
}
/// <summary>
/// Method DequeueNoWait
/// </summary>
public IMessage DequeueNoWait()
{
lock (semaphore)
{
if (queue.Count > 0)
{
return (IMessage) queue.Dequeue();
}
}
return null;
}
/// <summary>
/// Method Dequeue
/// </summary>
public IMessage Dequeue(TimeSpan timeout)
{
lock (semaphore)
{
if (queue.Count == 0)
{
Monitor.Wait(semaphore, timeout);
}
if (queue.Count > 0)
{
return (IMessage) queue.Dequeue();
}
}
return null;
}
/// <summary>
/// Method Dequeue
/// </summary>
public IMessage Dequeue()
{
lock (semaphore)
{
return (IMessage) queue.Dequeue();
}
}
}
}

View File

@ -97,7 +97,7 @@ namespace ActiveMQ
return AutoAcknowledge(dispatcher.Dequeue());
}
public IMessage Receive(int timeout)
public IMessage Receive(System.TimeSpan timeout)
{
CheckClosed();
return AutoAcknowledge(dispatcher.Dequeue(timeout));

View File

@ -1,239 +1,239 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using NMS;
using System;
using System.Collections;
namespace ActiveMQ.OpenWire
{
/// <summary>
/// A default implementation of IPrimitiveMap
/// </summary>
public class PrimitiveMap : IPrimitiveMap
{
private IDictionary dictionary = new Hashtable();
/// <summary>
/// Unmarshalls the map from the given data or if the data is null just
/// return an empty map
/// </summary>
public static PrimitiveMap Unmarshal(byte[] data)
{
PrimitiveMap answer = new PrimitiveMap();
answer.dictionary = BaseDataStreamMarshaller.UnmarshalPrimitiveMap(data);
return answer;
}
public byte[] Marshal()
{
return BaseDataStreamMarshaller.MarshalPrimitiveMap(dictionary);
}
public void Clear()
{
dictionary.Clear();
}
public bool Contains(Object key)
{
return dictionary.Contains(key);
}
public void Remove(Object key)
{
dictionary.Remove(key);
}
public int Count
{
get {
return dictionary.Count;
}
}
public ICollection Keys
{
get {
return dictionary.Keys;
}
}
public ICollection Values
{
get {
return dictionary.Values;
}
}
public object this[string key]
{
get {
return GetValue(key);
}
set {
CheckValidType(value);
SetValue(key, value);
}
}
public string GetString(string key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(string));
return (string) value;
}
public void SetString(string key, string value)
{
SetValue(key, value);
}
public bool GetBool(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(bool));
return (bool) value;
}
public void SetByte(String key, bool value)
{
SetValue(key, value);
}
public byte GetByte(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(byte));
return (byte) value;
}
public void SetByte(String key, byte value)
{
SetValue(key, value);
}
public char GetChar(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(char));
return (char) value;
}
public void SetChar(String key, char value)
{
SetValue(key, value);
}
public short GetShort(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(short));
return (short) value;
}
public void SetShort(String key, short value)
{
SetValue(key, value);
}
public int GetInt(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(int));
return (int) value;
}
public void SetInt(String key, int value)
{
SetValue(key, value);
}
public long GetLong(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(long));
return (long) value;
}
public void SetLong(String key, long value)
{
SetValue(key, value);
}
public float GetFloat(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(float));
return (float) value;
}
public void SetFloat(String key, float value)
{
SetValue(key, value);
}
public double GetDouble(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(double));
return (double) value;
}
public void SetDouble(String key, double value)
{
SetValue(key, value);
}
protected virtual void SetValue(String key, Object value)
{
dictionary[key] = value;
}
protected virtual Object GetValue(String key)
{
return dictionary[key];
}
protected virtual void CheckValueType(Object value, Type type)
{
if (! type.IsInstanceOfType(value))
{
throw new NMSException("Expected type: " + type.Name + " but was: " + value);
}
}
protected virtual void CheckValidType(Object value)
{
if (value != null)
{
Type type = value.GetType();
if (! type.IsPrimitive && !type.IsValueType && !type.IsAssignableFrom(typeof(string)))
{
throw new NMSException("Invalid type: " + type.Name + " for value: " + value);
}
}
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using NMS;
using System;
using System.Collections;
namespace ActiveMQ.OpenWire
{
/// <summary>
/// A default implementation of IPrimitiveMap
/// </summary>
public class PrimitiveMap : IPrimitiveMap
{
private IDictionary dictionary = new Hashtable();
/// <summary>
/// Unmarshalls the map from the given data or if the data is null just
/// return an empty map
/// </summary>
public static PrimitiveMap Unmarshal(byte[] data)
{
PrimitiveMap answer = new PrimitiveMap();
answer.dictionary = BaseDataStreamMarshaller.UnmarshalPrimitiveMap(data);
return answer;
}
public byte[] Marshal()
{
return BaseDataStreamMarshaller.MarshalPrimitiveMap(dictionary);
}
public void Clear()
{
dictionary.Clear();
}
public bool Contains(Object key)
{
return dictionary.Contains(key);
}
public void Remove(Object key)
{
dictionary.Remove(key);
}
public int Count
{
get {
return dictionary.Count;
}
}
public ICollection Keys
{
get {
return dictionary.Keys;
}
}
public ICollection Values
{
get {
return dictionary.Values;
}
}
public object this[string key]
{
get {
return GetValue(key);
}
set {
CheckValidType(value);
SetValue(key, value);
}
}
public string GetString(string key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(string));
return (string) value;
}
public void SetString(string key, string value)
{
SetValue(key, value);
}
public bool GetBool(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(bool));
return (bool) value;
}
public void SetBool(String key, bool value)
{
SetValue(key, value);
}
public byte GetByte(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(byte));
return (byte) value;
}
public void SetByte(String key, byte value)
{
SetValue(key, value);
}
public char GetChar(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(char));
return (char) value;
}
public void SetChar(String key, char value)
{
SetValue(key, value);
}
public short GetShort(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(short));
return (short) value;
}
public void SetShort(String key, short value)
{
SetValue(key, value);
}
public int GetInt(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(int));
return (int) value;
}
public void SetInt(String key, int value)
{
SetValue(key, value);
}
public long GetLong(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(long));
return (long) value;
}
public void SetLong(String key, long value)
{
SetValue(key, value);
}
public float GetFloat(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(float));
return (float) value;
}
public void SetFloat(String key, float value)
{
SetValue(key, value);
}
public double GetDouble(String key)
{
Object value = GetValue(key);
CheckValueType(value, typeof(double));
return (double) value;
}
public void SetDouble(String key, double value)
{
SetValue(key, value);
}
protected virtual void SetValue(String key, Object value)
{
dictionary[key] = value;
}
protected virtual Object GetValue(String key)
{
return dictionary[key];
}
protected virtual void CheckValueType(Object value, Type type)
{
if (! type.IsInstanceOfType(value))
{
throw new NMSException("Expected type: " + type.Name + " but was: " + value);
}
}
protected virtual void CheckValidType(Object value)
{
if (value != null)
{
Type type = value.GetType();
if (! type.IsPrimitive && !type.IsValueType && !type.IsAssignableFrom(typeof(string)))
{
throw new NMSException("Invalid type: " + type.Name + " for value: " + value);
}
}
}
}
}

View File

@ -1,59 +1,88 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace NMS
{
public enum AcknowledgementMode
{
Unknown, AutoAcknowledge, ClientAcknowledge, Transactional
}
/// <summary>
/// Represents a connection with a message broker
/// </summary>
public interface IConnection : System.IDisposable, IStartable, IStoppable
{
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
ISession CreateSession();
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
ISession CreateSession(AcknowledgementMode acknowledgementMode);
// Properties
AcknowledgementMode AcknowledgementMode
{
get;
set;
}
string ClientId
{
get;
set;
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace NMS
{
public enum AcknowledgementMode
{
/**
* With this acknowledgment mode, the session automatically
* acknowledges a client's receipt of a message either when
* the session has successfully returned from a call to receive
* or when the message listener the session has called to
* process the message successfully returns.
*/
AutoAcknowledge,
/**
* With this acknowledgment mode, the session automatically
* acknowledges a client's receipt of a message either when
* the session has successfully returned from a call to receive
* or when the message listener the session has called to
* process the message successfully returns. Acknowlegements
* may be delayed in this mode to increase performance at
* the cost of the message being redelivered this client fails.
*/
DupsOkAcknowledge,
/**
* With this acknowledgment mode, the client acknowledges a
* consumed message by calling the message's acknowledge method.
*/
ClientAcknowledge,
/**
* Messages will be consumed when the transaction commits.
*/
Transactional
}
/// <summary>
/// Represents a connection with a message broker
/// </summary>
public interface IConnection : System.IDisposable, IStartable, IStoppable
{
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
ISession CreateSession();
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
ISession CreateSession(AcknowledgementMode acknowledgementMode);
// Properties
AcknowledgementMode AcknowledgementMode
{
get;
set;
}
string ClientId
{
get;
set;
}
}
}

View File

@ -1,52 +1,52 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace NMS
{
/// <summary>
/// A delegate that can receive messages async.
/// </summary>
public delegate void MessageListener(IMessage message);
/// <summary>
/// A consumer of messages
/// </summary>
public interface IMessageConsumer : System.IDisposable
{
/// <summary>
/// Waits until a message is available and returns it
/// </summary>
IMessage Receive();
/// <summary>
/// If a message is available within the timeout duration it is returned otherwise this method returns null
/// </summary>
IMessage Receive(int timeout);
/// <summary>
/// If a message is available immediately it is returned otherwise this method returns null
/// </summary>
IMessage ReceiveNoWait();
/// <summary>
/// An asynchronous listener which can be used to consume messages asynchronously
/// </summary>
event MessageListener Listener;
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace NMS
{
/// <summary>
/// A delegate that can receive messages async.
/// </summary>
public delegate void MessageListener(IMessage message);
/// <summary>
/// A consumer of messages
/// </summary>
public interface IMessageConsumer : System.IDisposable
{
/// <summary>
/// Waits until a message is available and returns it
/// </summary>
IMessage Receive();
/// <summary>
/// If a message is available within the timeout duration it is returned otherwise this method returns null
/// </summary>
IMessage Receive(System.TimeSpan timeout);
/// <summary>
/// If a message is available immediately it is returned otherwise this method returns null
/// </summary>
IMessage ReceiveNoWait();
/// <summary>
/// An asynchronous listener which can be used to consume messages asynchronously
/// </summary>
event MessageListener Listener;
}
}

View File

@ -1,84 +1,84 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace NMS
{
/// <summary>
/// Represents a Map of primitive types where the keys are all string instances
/// and the values are strings or numbers.
/// </summary>
public interface IPrimitiveMap
{
void Clear();
bool Contains(object key);
void Remove(object key);
int Count
{
get;
}
System.Collections.ICollection Keys
{
get;
}
System.Collections.ICollection Values
{
get;
}
object this[string key]
{
get;
set;
}
string GetString(string key);
void SetString(string key, string value);
bool GetBool(string key);
void SetByte(string key, bool value);
byte GetByte(string key);
void SetByte(string key, byte value);
char GetChar(string key);
void SetChar(string key, char value);
short GetShort(string key);
void SetShort(string key, short value);
int GetInt(string key);
void SetInt(string key, int value);
long GetLong(string key);
void SetLong(string key, long value);
float GetFloat(string key);
void SetFloat(string key, float value);
double GetDouble(string key);
void SetDouble(string key, double value);
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace NMS
{
/// <summary>
/// Represents a Map of primitive types where the keys are all string instances
/// and the values are strings or numbers.
/// </summary>
public interface IPrimitiveMap
{
void Clear();
bool Contains(object key);
void Remove(object key);
int Count
{
get;
}
System.Collections.ICollection Keys
{
get;
}
System.Collections.ICollection Values
{
get;
}
object this[string key]
{
get;
set;
}
string GetString(string key);
void SetString(string key, string value);
bool GetBool(string key);
void SetBool(string key, bool value);
byte GetByte(string key);
void SetByte(string key, byte value);
char GetChar(string key);
void SetChar(string key, char value);
short GetShort(string key);
void SetShort(string key, short value);
int GetInt(string key);
void SetInt(string key, int value);
long GetLong(string key);
void SetLong(string key, long value);
float GetFloat(string key);
void SetFloat(string key, float value);
double GetDouble(string key);
void SetDouble(string key, double value);
}
}

View File

@ -1,93 +1,92 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using NMS;
using NUnit.Framework;
namespace NMS
{
[TestFixture]
public class ConsumerTest : JMSTestSupport
{
[SetUp]
override public void SetUp()
{
base.SetUp();
}
[TearDown]
override public void TearDown()
{
base.TearDown();
}
protected override IConnection CreateConnection()
{
IConnection connection = base.CreateConnection();
connection.ClientId = "test";
return connection;
}
protected override IDestination CreateDestination()
{
return session.GetTopic(CreateDestinationName());
}
//[Ignore("Not fully implemented yet.")]
[Test]
public void testDurableConsumerSelectorChange()
{
IMessageProducer producer = session.CreateProducer(Destination);
producer.Persistent = true;
IMessageConsumer consumer = session.CreateDurableConsumer((ITopic)Destination, "test", "color='red'", false);
// Send the messages
ITextMessage message = session.CreateTextMessage("1st");
message.Properties["color"] = "red";
producer.Send(message);
IMessage m = consumer.Receive(receiveTimeout);
Assert.IsNotNull(m);
Assert.AreEqual("1st", ((ITextMessage)m).Text);
// Change the subscription.
consumer.Dispose();
consumer = session.CreateDurableConsumer((ITopic)Destination, "test", "color='blue'", false);
message = session.CreateTextMessage("2nd");
message.Properties["color"] = "red";
producer.Send(message);
message = session.CreateTextMessage("3rd");
message.Properties["color"] = "blue";
producer.Send(message);
// Selector should skip the 2nd message.
m = consumer.Receive(1000);
Assert.IsNotNull(m);
Assert.AreEqual("3rd", ((ITextMessage)m).Text);
Assert.IsNull(consumer.ReceiveNoWait());
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using NMS;
using NUnit.Framework;
using System;
namespace NMS
{
[TestFixture]
public class ConsumerTest : JMSTestSupport
{
[SetUp]
override public void SetUp()
{
base.SetUp();
}
[TearDown]
override public void TearDown()
{
base.TearDown();
}
protected override IConnection CreateConnection()
{
IConnection connection = base.CreateConnection();
connection.ClientId = "test";
return connection;
}
protected override IDestination CreateDestination()
{
return session.GetTopic(CreateDestinationName());
}
//[Ignore("Not fully implemented yet.")]
[Test]
public void testDurableConsumerSelectorChange()
{
IMessageProducer producer = session.CreateProducer(Destination);
producer.Persistent = true;
IMessageConsumer consumer = session.CreateDurableConsumer((ITopic)Destination, "test", "color='red'", false);
// Send the messages
ITextMessage message = session.CreateTextMessage("1st");
message.Properties["color"] = "red";
producer.Send(message);
IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(receiveTimeout));
Assert.IsNotNull(m);
Assert.AreEqual("1st", ((ITextMessage)m).Text);
// Change the subscription.
consumer.Dispose();
consumer = session.CreateDurableConsumer((ITopic)Destination, "test", "color='blue'", false);
message = session.CreateTextMessage("2nd");
message.Properties["color"] = "red";
producer.Send(message);
message = session.CreateTextMessage("3rd");
message.Properties["color"] = "blue";
producer.Send(message);
// Selector should skip the 2nd message.
m = consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("3rd", ((ITextMessage)m).Text);
Assert.IsNull(consumer.ReceiveNoWait());
}
}
}

View File

@ -1,178 +1,178 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using NMS;
using NUnit.Framework;
using System;
/// <summary>
/// useful base class for test cases
/// </summary>
namespace NMS
{
[ TestFixture ]
public abstract class JMSTestSupport
{
protected IConnectionFactory factory;
protected IConnection connection;
protected ISession session;
private IDestination destination;
protected int receiveTimeout = 1000;
protected AcknowledgementMode acknowledgementMode = AcknowledgementMode.ClientAcknowledge;
[SetUp]
virtual public void SetUp()
{
Connect();
}
[TearDown]
virtual public void TearDown()
{
Disconnect();
}
virtual protected void Connect()
{
Console.WriteLine("Connectting...");
factory = CreateConnectionFactory();
Assert.IsNotNull(factory, "no factory created");
connection = CreateConnection();
Assert.IsNotNull(connection, "no connection created");
connection.Start();
session = connection.CreateSession(acknowledgementMode);
Assert.IsNotNull(connection != null, "no session created");
Console.WriteLine("Connected.");
}
virtual protected void Disconnect()
{
if (connection != null)
{
Console.WriteLine("Disconnecting...");
connection.Dispose();
connection = null;
Console.WriteLine("Disconnected.");
}
}
virtual protected void Reconnect()
{
Disconnect();
Connect();
}
protected virtual void Drain()
{
using (ISession session = connection.CreateSession())
{
// Tries to consume any messages on the Destination
IMessageConsumer consumer = session.CreateConsumer(Destination);
// Should only need to wait for first message to arrive due to the way
// prefetching works.
IMessage msg = consumer.Receive(receiveTimeout);
while (msg != null)
{
msg = consumer.ReceiveNoWait();
}
}
}
public virtual void SendAndSyncReceive()
{
using (ISession session = connection.CreateSession())
{
IMessageConsumer consumer = session.CreateConsumer(Destination);
IMessageProducer producer = session.CreateProducer(Destination);
IMessage request = CreateMessage();
producer.Send(request);
IMessage message = consumer.Receive(receiveTimeout);
Assert.IsNotNull(message, "No message returned!");
AssertValidMessage(message);
}
}
protected virtual IConnectionFactory CreateConnectionFactory()
{
return new ActiveMQ.ConnectionFactory(new Uri("tcp://localhost:61616"));
}
protected virtual IConnection CreateConnection()
{
return factory.CreateConnection();
}
protected virtual IMessageProducer CreateProducer()
{
IMessageProducer producer = session.CreateProducer(destination);
return producer;
}
protected virtual IMessageConsumer CreateConsumer()
{
IMessageConsumer consumer = session.CreateConsumer(destination);
return consumer;
}
protected virtual IDestination CreateDestination()
{
return session.GetQueue(CreateDestinationName());
}
protected virtual string CreateDestinationName()
{
return "Test.DotNet." + GetType().Name;
}
protected virtual IMessage CreateMessage()
{
return session.CreateMessage();
}
protected virtual void AssertValidMessage(IMessage message)
{
Assert.IsNotNull(message, "Null Message!");
}
public IDestination Destination
{
get {
if (destination == null)
{
destination = CreateDestination();
Assert.IsNotNull(destination, "No destination available!");
Console.WriteLine("Using destination: " + destination);
}
return destination;
}
set {
destination = value;
}
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using NMS;
using NUnit.Framework;
using System;
/// <summary>
/// useful base class for test cases
/// </summary>
namespace NMS
{
[ TestFixture ]
public abstract class JMSTestSupport
{
protected IConnectionFactory factory;
protected IConnection connection;
protected ISession session;
private IDestination destination;
protected int receiveTimeout = 1000;
protected AcknowledgementMode acknowledgementMode = AcknowledgementMode.ClientAcknowledge;
[SetUp]
virtual public void SetUp()
{
Connect();
}
[TearDown]
virtual public void TearDown()
{
Disconnect();
}
virtual protected void Connect()
{
Console.WriteLine("Connectting...");
factory = CreateConnectionFactory();
Assert.IsNotNull(factory, "no factory created");
connection = CreateConnection();
Assert.IsNotNull(connection, "no connection created");
connection.Start();
session = connection.CreateSession(acknowledgementMode);
Assert.IsNotNull(connection != null, "no session created");
Console.WriteLine("Connected.");
}
virtual protected void Disconnect()
{
if (connection != null)
{
Console.WriteLine("Disconnecting...");
connection.Dispose();
connection = null;
Console.WriteLine("Disconnected.");
}
}
virtual protected void Reconnect()
{
Disconnect();
Connect();
}
protected virtual void Drain()
{
using (ISession session = connection.CreateSession())
{
// Tries to consume any messages on the Destination
IMessageConsumer consumer = session.CreateConsumer(Destination);
// Should only need to wait for first message to arrive due to the way
// prefetching works.
IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(receiveTimeout));
while (msg != null)
{
msg = consumer.ReceiveNoWait();
}
}
}
public virtual void SendAndSyncReceive()
{
using (ISession session = connection.CreateSession())
{
IMessageConsumer consumer = session.CreateConsumer(Destination);
IMessageProducer producer = session.CreateProducer(Destination);
IMessage request = CreateMessage();
producer.Send(request);
IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(receiveTimeout));
Assert.IsNotNull(message, "No message returned!");
AssertValidMessage(message);
}
}
protected virtual IConnectionFactory CreateConnectionFactory()
{
return new ActiveMQ.ConnectionFactory(new Uri("tcp://localhost:61616"));
}
protected virtual IConnection CreateConnection()
{
return factory.CreateConnection();
}
protected virtual IMessageProducer CreateProducer()
{
IMessageProducer producer = session.CreateProducer(destination);
return producer;
}
protected virtual IMessageConsumer CreateConsumer()
{
IMessageConsumer consumer = session.CreateConsumer(destination);
return consumer;
}
protected virtual IDestination CreateDestination()
{
return session.GetQueue(CreateDestinationName());
}
protected virtual string CreateDestinationName()
{
return "Test.DotNet." + GetType().Name;
}
protected virtual IMessage CreateMessage()
{
return session.CreateMessage();
}
protected virtual void AssertValidMessage(IMessage message)
{
Assert.IsNotNull(message, "Null Message!");
}
public IDestination Destination
{
get {
if (destination == null)
{
destination = CreateDestination();
Assert.IsNotNull(destination, "No destination available!");
Console.WriteLine("Using destination: " + destination);
}
return destination;
}
set {
destination = value;
}
}
}
}

View File

@ -1,253 +1,253 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using NMS;
using NUnit.Framework;
using System;
using System.Collections;
namespace NMS
{
[TestFixture]
public class TransactionTest : JMSTestSupport
{
private static int destinationCounter;
IMessageProducer producer;
IMessageConsumer consumer;
[SetUp]
override public void SetUp()
{
acknowledgementMode = AcknowledgementMode.Transactional;
base.SetUp();
Drain();
}
[TearDown]
override public void TearDown()
{
base.TearDown();
}
[Test]
public void TestSendRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
//sends a message
producer.Send(outbound[0]);
session.Commit();
//sends a message that gets rollbacked
producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
session.Rollback();
//sends a message
producer.Send(outbound[1]);
session.Commit();
//receives the first message
ArrayList messages = new ArrayList();
Console.WriteLine("About to consume message 1");
IMessage message = consumer.Receive(1000);
messages.Add(message);
Console.WriteLine("Received: " + message);
//receives the second message
Console.WriteLine("About to consume message 2");
message = consumer.Receive(4000);
messages.Add(message);
Console.WriteLine("Received: " + message);
//validates that the rollbacked was not consumed
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work.", outbound, inbound);
}
[Test]
public void TestSendSessionClose()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
//sends a message
producer.Send(outbound[0]);
session.Commit();
//sends a message that gets rollbacked
producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
consumer.Dispose();
session.Dispose();
Reconnect();
//sends a message
producer.Send(outbound[1]);
session.Commit();
//receives the first message
ArrayList messages = new ArrayList();
Console.WriteLine("About to consume message 1");
IMessage message = consumer.Receive(1000);
messages.Add(message);
Console.WriteLine("Received: " + message);
//receives the second message
Console.WriteLine("About to consume message 2");
message = consumer.Receive(4000);
messages.Add(message);
Console.WriteLine("Received: " + message);
//validates that the rollbacked was not consumed
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work.", outbound, inbound);
}
[Test]
public void TestReceiveRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
//sent both messages
producer.Send(outbound[0]);
producer.Send(outbound[1]);
session.Commit();
Console.WriteLine("Sent 0: " + outbound[0]);
Console.WriteLine("Sent 1: " + outbound[1]);
ArrayList messages = new ArrayList();
IMessage message = consumer.Receive(1000);
messages.Add(message);
Assert.AreEqual(outbound[0], message);
session.Commit();
// rollback so we can get that last message again.
message = consumer.Receive(1000);
Assert.IsNotNull(message);
Assert.AreEqual(outbound[1], message);
session.Rollback();
// Consume again.. the previous message should
// get redelivered.
message = consumer.Receive(5000);
Assert.IsNotNull(message, "Should have re-received the message again!");
messages.Add(message);
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work", outbound, inbound);
}
[Test]
public void TestReceiveTwoThenRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
producer.Send(outbound[0]);
producer.Send(outbound[1]);
session.Commit();
Console.WriteLine("Sent 0: " + outbound[0]);
Console.WriteLine("Sent 1: " + outbound[1]);
ArrayList messages = new ArrayList();
IMessage message = consumer.Receive(1000);
AssertTextMessageEqual("first mesage received before rollback", outbound[0], message);
message = consumer.Receive(1000);
Assert.IsNotNull(message);
AssertTextMessageEqual("second message received before rollback", outbound[1], message);
session.Rollback();
// Consume again.. the previous message should
// get redelivered.
message = consumer.Receive(5000);
Assert.IsNotNull(message, "Should have re-received the first message again!");
messages.Add(message);
AssertTextMessageEqual("first message received after rollback", outbound[0], message);
message = consumer.Receive(5000);
Assert.IsNotNull(message, "Should have re-received the second message again!");
messages.Add(message);
AssertTextMessageEqual("second message received after rollback", outbound[1], message);
Assert.IsNull(consumer.ReceiveNoWait());
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work", outbound, inbound);
}
protected override string CreateDestinationName()
{
// TODO - how can we get the test name?
return base.CreateDestinationName() + (++destinationCounter);
}
protected void AssertTextMessagesEqual(String message, IMessage[] expected, IMessage[] actual)
{
Assert.AreEqual(expected.Length, actual.Length, "Incorrect number of messages received");
for (int i = 0; i < expected.Length; i++)
{
AssertTextMessageEqual(message + ". Index: " + i, expected[i], actual[i]);
}
}
protected void AssertTextMessageEqual(String message, IMessage expected, IMessage actual)
{
Assert.IsTrue(expected is ITextMessage, "expected object not a text message");
Assert.IsTrue(actual is ITextMessage, "actual object not a text message");
String expectedText = ((ITextMessage) expected).Text;
String actualText = ((ITextMessage) actual).Text;
Assert.AreEqual(expectedText, actualText, message);
}
protected override void Connect()
{
base.Connect();
consumer = session.CreateConsumer(Destination);
producer = session.CreateProducer(Destination);
}
}
}
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using NMS;
using NUnit.Framework;
using System;
using System.Collections;
namespace NMS
{
[TestFixture]
public class TransactionTest : JMSTestSupport
{
private static int destinationCounter;
IMessageProducer producer;
IMessageConsumer consumer;
[SetUp]
override public void SetUp()
{
acknowledgementMode = AcknowledgementMode.Transactional;
base.SetUp();
Drain();
}
[TearDown]
override public void TearDown()
{
base.TearDown();
}
[Test]
public void TestSendRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
//sends a message
producer.Send(outbound[0]);
session.Commit();
//sends a message that gets rollbacked
producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
session.Rollback();
//sends a message
producer.Send(outbound[1]);
session.Commit();
//receives the first message
ArrayList messages = new ArrayList();
Console.WriteLine("About to consume message 1");
IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
messages.Add(message);
Console.WriteLine("Received: " + message);
//receives the second message
Console.WriteLine("About to consume message 2");
message = consumer.Receive(TimeSpan.FromMilliseconds(4000));
messages.Add(message);
Console.WriteLine("Received: " + message);
//validates that the rollbacked was not consumed
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work.", outbound, inbound);
}
[Test]
public void TestSendSessionClose()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
//sends a message
producer.Send(outbound[0]);
session.Commit();
//sends a message that gets rollbacked
producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
consumer.Dispose();
session.Dispose();
Reconnect();
//sends a message
producer.Send(outbound[1]);
session.Commit();
//receives the first message
ArrayList messages = new ArrayList();
Console.WriteLine("About to consume message 1");
IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
messages.Add(message);
Console.WriteLine("Received: " + message);
//receives the second message
Console.WriteLine("About to consume message 2");
message = consumer.Receive(TimeSpan.FromMilliseconds(4000));
messages.Add(message);
Console.WriteLine("Received: " + message);
//validates that the rollbacked was not consumed
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work.", outbound, inbound);
}
[Test]
public void TestReceiveRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
//sent both messages
producer.Send(outbound[0]);
producer.Send(outbound[1]);
session.Commit();
Console.WriteLine("Sent 0: " + outbound[0]);
Console.WriteLine("Sent 1: " + outbound[1]);
ArrayList messages = new ArrayList();
IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
messages.Add(message);
Assert.AreEqual(outbound[0], message);
session.Commit();
// rollback so we can get that last message again.
message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(message);
Assert.AreEqual(outbound[1], message);
session.Rollback();
// Consume again.. the previous message should
// get redelivered.
message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
Assert.IsNotNull(message, "Should have re-received the message again!");
messages.Add(message);
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work", outbound, inbound);
}
[Test]
public void TestReceiveTwoThenRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
producer.Send(outbound[0]);
producer.Send(outbound[1]);
session.Commit();
Console.WriteLine("Sent 0: " + outbound[0]);
Console.WriteLine("Sent 1: " + outbound[1]);
ArrayList messages = new ArrayList();
IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
AssertTextMessageEqual("first mesage received before rollback", outbound[0], message);
message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(message);
AssertTextMessageEqual("second message received before rollback", outbound[1], message);
session.Rollback();
// Consume again.. the previous message should
// get redelivered.
message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
Assert.IsNotNull(message, "Should have re-received the first message again!");
messages.Add(message);
AssertTextMessageEqual("first message received after rollback", outbound[0], message);
message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
Assert.IsNotNull(message, "Should have re-received the second message again!");
messages.Add(message);
AssertTextMessageEqual("second message received after rollback", outbound[1], message);
Assert.IsNull(consumer.ReceiveNoWait());
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work", outbound, inbound);
}
protected override string CreateDestinationName()
{
// TODO - how can we get the test name?
return base.CreateDestinationName() + (++destinationCounter);
}
protected void AssertTextMessagesEqual(String message, IMessage[] expected, IMessage[] actual)
{
Assert.AreEqual(expected.Length, actual.Length, "Incorrect number of messages received");
for (int i = 0; i < expected.Length; i++)
{
AssertTextMessageEqual(message + ". Index: " + i, expected[i], actual[i]);
}
}
protected void AssertTextMessageEqual(String message, IMessage expected, IMessage actual)
{
Assert.IsTrue(expected is ITextMessage, "expected object not a text message");
Assert.IsTrue(actual is ITextMessage, "actual object not a text message");
String expectedText = ((ITextMessage) expected).Text;
String actualText = ((ITextMessage) actual).Text;
Assert.AreEqual(expectedText, actualText, message);
}
protected override void Connect()
{
base.Connect();
consumer = session.CreateConsumer(Destination);
producer = session.CreateProducer(Destination);
}
}
}