mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-03-06 09:20:07 +00:00
This closes #1259
This commit is contained in:
commit
80e4c09435
216
examples/protocols/amqp/dotnet/DurableSubscriptions/Program.cs
Executable file
216
examples/protocols/amqp/dotnet/DurableSubscriptions/Program.cs
Executable file
@ -0,0 +1,216 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
using System;
|
||||||
|
using Amqp;
|
||||||
|
using Amqp.Framing;
|
||||||
|
using Amqp.Types;
|
||||||
|
using Amqp.Sasl;
|
||||||
|
using System.Threading;
|
||||||
|
|
||||||
|
namespace aorg.apache.activemq.examples
|
||||||
|
{
|
||||||
|
class Program
|
||||||
|
{
|
||||||
|
private static string DEFAULT_BROKER_URI = "amqp://localhost:5672";
|
||||||
|
private static string DEFAULT_CONTAINER_ID = "client-1";
|
||||||
|
private static string DEFAULT_SUBSCRIPTION_NAME = "test-subscription";
|
||||||
|
private static string DEFAULT_TOPIC_NAME = "test-topic";
|
||||||
|
|
||||||
|
static void Main(string[] args)
|
||||||
|
{
|
||||||
|
Console.WriteLine("Starting AMQP durable consumer example.");
|
||||||
|
|
||||||
|
Console.WriteLine("Creating a Durable Subscription");
|
||||||
|
CreateDurableSubscription();
|
||||||
|
|
||||||
|
Console.WriteLine("Attempting to recover a Durable Subscription");
|
||||||
|
RecoverDurableSubscription();
|
||||||
|
|
||||||
|
Console.WriteLine("Unsubscribe a durable subscription");
|
||||||
|
UnsubscribeDurableSubscription();
|
||||||
|
|
||||||
|
Console.WriteLine("Attempting to recover a non-existent durable subscription");
|
||||||
|
try
|
||||||
|
{
|
||||||
|
RecoverDurableSubscription();
|
||||||
|
throw new Exception("Subscription was not deleted.");
|
||||||
|
}
|
||||||
|
catch (AmqpException)
|
||||||
|
{
|
||||||
|
Console.WriteLine("Recover failed as expected");
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine("Example Complete.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creating a durable subscription involves creating a Receiver with a Source that
|
||||||
|
// has the address set to the Topic name where the client wants to subscribe along
|
||||||
|
// with an expiry policy of 'never', Terminus Durability set to 'unsettled' and the
|
||||||
|
// Distribution Mode set to 'Copy'. The link name of the Receiver represents the
|
||||||
|
// desired name of the Subscription and of course the Connection must carry a container
|
||||||
|
// ID uniqure to the client that is creating the subscription.
|
||||||
|
private static void CreateDurableSubscription()
|
||||||
|
{
|
||||||
|
Connection connection = new Connection(new Address(DEFAULT_BROKER_URI),
|
||||||
|
SaslProfile.Anonymous,
|
||||||
|
new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Session session = new Session(connection);
|
||||||
|
|
||||||
|
Source source = CreateBasicSource();
|
||||||
|
|
||||||
|
// Create a Durable Consumer Source.
|
||||||
|
source.Address = DEFAULT_TOPIC_NAME;
|
||||||
|
source.ExpiryPolicy = new Symbol("never");
|
||||||
|
source.Durable = 2;
|
||||||
|
source.DistributionMode = new Symbol("copy");
|
||||||
|
|
||||||
|
ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, source, null);
|
||||||
|
|
||||||
|
session.Close();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
connection.Close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recovering an existing subscription allows the client to ask the remote
|
||||||
|
// peer if a subscription with the given name for the current 'Container ID'
|
||||||
|
// exists. The process involves the client attaching a receiver with a null
|
||||||
|
// Source on a link with the desired subscription name as the link name and
|
||||||
|
// the broker will then return a Source instance if this current container
|
||||||
|
// has a subscription registered with that subscription (link) name.
|
||||||
|
private static void RecoverDurableSubscription()
|
||||||
|
{
|
||||||
|
Connection connection = new Connection(new Address(DEFAULT_BROKER_URI),
|
||||||
|
SaslProfile.Anonymous,
|
||||||
|
new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Session session = new Session(connection);
|
||||||
|
Source recoveredSource = null;
|
||||||
|
ManualResetEvent attached = new ManualResetEvent(false);
|
||||||
|
|
||||||
|
OnAttached onAttached = (link, attach) =>
|
||||||
|
{
|
||||||
|
recoveredSource = (Source) attach.Source;
|
||||||
|
attached.Set();
|
||||||
|
};
|
||||||
|
|
||||||
|
ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached);
|
||||||
|
|
||||||
|
attached.WaitOne(10000);
|
||||||
|
if (recoveredSource == null)
|
||||||
|
{
|
||||||
|
// The remote had no subscription matching what we asked for.
|
||||||
|
throw new AmqpException(new Error());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address);
|
||||||
|
Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy);
|
||||||
|
Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable);
|
||||||
|
Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
session.Close();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
connection.Close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribing a durable subscription involves recovering an existing
|
||||||
|
// subscription and then closing the receiver link explicitly or in AMQP
|
||||||
|
// terms the close value of the Detach frame should be 'true'
|
||||||
|
private static void UnsubscribeDurableSubscription()
|
||||||
|
{
|
||||||
|
Connection connection = new Connection(new Address(DEFAULT_BROKER_URI),
|
||||||
|
SaslProfile.Anonymous,
|
||||||
|
new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Session session = new Session(connection);
|
||||||
|
Source recoveredSource = null;
|
||||||
|
ManualResetEvent attached = new ManualResetEvent(false);
|
||||||
|
|
||||||
|
OnAttached onAttached = (link, attach) =>
|
||||||
|
{
|
||||||
|
recoveredSource = (Source) attach.Source;
|
||||||
|
attached.Set();
|
||||||
|
};
|
||||||
|
|
||||||
|
ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached);
|
||||||
|
|
||||||
|
attached.WaitOne(10000);
|
||||||
|
if (recoveredSource == null)
|
||||||
|
{
|
||||||
|
// The remote had no subscription matching what we asked for.
|
||||||
|
throw new AmqpException(new Error());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address);
|
||||||
|
Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy);
|
||||||
|
Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable);
|
||||||
|
Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Closing the Receiver vs. detaching it will unsubscribe
|
||||||
|
receiver.Close();
|
||||||
|
|
||||||
|
session.Close();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
connection.Close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates a basic Source type that contains common attributes needed
|
||||||
|
// to describe to the remote peer the features and expectations of the
|
||||||
|
// Source of the Receiver link.
|
||||||
|
private static Source CreateBasicSource()
|
||||||
|
{
|
||||||
|
Source source = new Source();
|
||||||
|
|
||||||
|
// These are the outcomes this link will accept.
|
||||||
|
Symbol[] outcomes = new Symbol[] {new Symbol("amqp:accepted:list"),
|
||||||
|
new Symbol("amqp:rejected:list"),
|
||||||
|
new Symbol("amqp:released:list"),
|
||||||
|
new Symbol("amqp:modified:list") };
|
||||||
|
|
||||||
|
// Default Outcome for deliveries not settled on this link
|
||||||
|
Modified defaultOutcome = new Modified();
|
||||||
|
defaultOutcome.DeliveryFailed = true;
|
||||||
|
defaultOutcome.UndeliverableHere = false;
|
||||||
|
|
||||||
|
// Configure Source.
|
||||||
|
source.DefaultOutcome = defaultOutcome;
|
||||||
|
source.Outcomes = outcomes;
|
||||||
|
|
||||||
|
return source;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
25
examples/protocols/amqp/dotnet/DurableSubscriptions/amqp-durables.csproj
Executable file
25
examples/protocols/amqp/dotnet/DurableSubscriptions/amqp-durables.csproj
Executable file
@ -0,0 +1,25 @@
|
|||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
<PropertyGroup>
|
||||||
|
<OutputType>Exe</OutputType>
|
||||||
|
<TargetFramework>netcoreapp1.1</TargetFramework>
|
||||||
|
</PropertyGroup>
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="AMQPNetLite.Core" Version="2.0.0-pre" />
|
||||||
|
</ItemGroup>
|
||||||
|
</Project>
|
@ -0,0 +1,54 @@
|
|||||||
|
# Running the .NET AMQP example
|
||||||
|
|
||||||
|
|
||||||
|
# Pre-requisites:
|
||||||
|
|
||||||
|
All of this can be done on Linux, Mac and... Windows
|
||||||
|
|
||||||
|
- Install .NET
|
||||||
|
|
||||||
|
https://www.microsoft.com/net/core
|
||||||
|
|
||||||
|
|
||||||
|
- Visual Studio Code is free and may be useful:
|
||||||
|
|
||||||
|
https://code.visualstudio.com
|
||||||
|
|
||||||
|
|
||||||
|
- Powershell might be also useful:
|
||||||
|
|
||||||
|
https://github.com/PowerShell/PowerShell/
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# running the example
|
||||||
|
|
||||||
|
- Create the broker, by running:
|
||||||
|
|
||||||
|
```
|
||||||
|
../../../../../bin/artemis create ./server1 --user a --password a --role a --allow-anonymous --force
|
||||||
|
./server1/bin/artemis-service start
|
||||||
|
```
|
||||||
|
|
||||||
|
Or simply use the start-broker.sh script on this directory
|
||||||
|
|
||||||
|
- Create an Topic to subscribe to
|
||||||
|
|
||||||
|
../../../../../bin/artemis queue create --name test-topic --auto-create-address --multicast --preserve-on-no-consumers --durable --address test-topic
|
||||||
|
|
||||||
|
- Compile the code
|
||||||
|
|
||||||
|
You need call restore to download AMQP Library and build it.
|
||||||
|
Restore is part of NuGET which is sort of the Maven Repo for Java devs.
|
||||||
|
|
||||||
|
```sh
|
||||||
|
dotnet restore
|
||||||
|
dotnet build
|
||||||
|
dotnet run
|
||||||
|
```
|
||||||
|
|
||||||
|
Or simply use the run-example.sh script on this directory
|
||||||
|
|
||||||
|
- Debugging
|
||||||
|
|
||||||
|
Visual Studio Code will make it fairly easy to do it
|
26
examples/protocols/amqp/dotnet/DurableSubscriptions/run-example.sh
Executable file
26
examples/protocols/amqp/dotnet/DurableSubscriptions/run-example.sh
Executable file
@ -0,0 +1,26 @@
|
|||||||
|
#!/usr/bin/env sh
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
echo "Make sure you run start-broker.sh first"
|
||||||
|
|
||||||
|
# Use this as an example for creating a Topic to subscribe to
|
||||||
|
../../../../../bin/artemis queue create --name test-topic --auto-create-address --multicast --preserve-on-no-consumers --durable --address test-topic
|
||||||
|
|
||||||
|
dotnet restore
|
||||||
|
dotnet build
|
||||||
|
dotnet run
|
22
examples/protocols/amqp/dotnet/DurableSubscriptions/start-broker.sh
Executable file
22
examples/protocols/amqp/dotnet/DurableSubscriptions/start-broker.sh
Executable file
@ -0,0 +1,22 @@
|
|||||||
|
#!/usr/bin/env sh
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
# if you are using a source distribution you will have to create and start the broker manually
|
||||||
|
# Use this as a reference!
|
||||||
|
../../../../../bin/artemis create ./server1 --user a --password a --role a --allow-anonymous --force
|
||||||
|
./server1/bin/artemis-service start
|
@ -26,7 +26,7 @@ https://github.com/PowerShell/PowerShell/
|
|||||||
- Create the broker, by running:
|
- Create the broker, by running:
|
||||||
|
|
||||||
```
|
```
|
||||||
../../../../bin/artemis create ./server1 --user a --password a --role a --allow-anonymous --force
|
../../../../../bin/artemis create ./server1 --user a --password a --role a --allow-anonymous --force
|
||||||
./server1/bin/artemis-service start
|
./server1/bin/artemis-service start
|
||||||
```
|
```
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user