ARTEMIS-1153 Adds new .NET client example for durable subscriptions
Adds an example for managing durable subscriptions using the .NET AmqpNetLite client that subscribes, looks up and removes a durable topic subscription.
This commit is contained in:
parent
ce61d20f5a
commit
6f34622cd9
|
@ -0,0 +1,216 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
using System;
|
||||
using Amqp;
|
||||
using Amqp.Framing;
|
||||
using Amqp.Types;
|
||||
using Amqp.Sasl;
|
||||
using System.Threading;
|
||||
|
||||
namespace aorg.apache.activemq.examples
|
||||
{
|
||||
class Program
|
||||
{
|
||||
private static string DEFAULT_BROKER_URI = "amqp://localhost:5672";
|
||||
private static string DEFAULT_CONTAINER_ID = "client-1";
|
||||
private static string DEFAULT_SUBSCRIPTION_NAME = "test-subscription";
|
||||
private static string DEFAULT_TOPIC_NAME = "test-topic";
|
||||
|
||||
static void Main(string[] args)
|
||||
{
|
||||
Console.WriteLine("Starting AMQP durable consumer example.");
|
||||
|
||||
Console.WriteLine("Creating a Durable Subscription");
|
||||
CreateDurableSubscription();
|
||||
|
||||
Console.WriteLine("Attempting to recover a Durable Subscription");
|
||||
RecoverDurableSubscription();
|
||||
|
||||
Console.WriteLine("Unsubscribe a durable subscription");
|
||||
UnsubscribeDurableSubscription();
|
||||
|
||||
Console.WriteLine("Attempting to recover a non-existent durable subscription");
|
||||
try
|
||||
{
|
||||
RecoverDurableSubscription();
|
||||
throw new Exception("Subscription was not deleted.");
|
||||
}
|
||||
catch (AmqpException)
|
||||
{
|
||||
Console.WriteLine("Recover failed as expected");
|
||||
}
|
||||
|
||||
Console.WriteLine("Example Complete.");
|
||||
}
|
||||
|
||||
// Creating a durable subscription involves creating a Receiver with a Source that
|
||||
// has the address set to the Topic name where the client wants to subscribe along
|
||||
// with an expiry policy of 'never', Terminus Durability set to 'unsettled' and the
|
||||
// Distribution Mode set to 'Copy'. The link name of the Receiver represents the
|
||||
// desired name of the Subscription and of course the Connection must carry a container
|
||||
// ID uniqure to the client that is creating the subscription.
|
||||
private static void CreateDurableSubscription()
|
||||
{
|
||||
Connection connection = new Connection(new Address(DEFAULT_BROKER_URI),
|
||||
SaslProfile.Anonymous,
|
||||
new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null);
|
||||
|
||||
try
|
||||
{
|
||||
Session session = new Session(connection);
|
||||
|
||||
Source source = CreateBasicSource();
|
||||
|
||||
// Create a Durable Consumer Source.
|
||||
source.Address = DEFAULT_TOPIC_NAME;
|
||||
source.ExpiryPolicy = new Symbol("never");
|
||||
source.Durable = 2;
|
||||
source.DistributionMode = new Symbol("copy");
|
||||
|
||||
ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, source, null);
|
||||
|
||||
session.Close();
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.Close();
|
||||
}
|
||||
}
|
||||
|
||||
// Recovering an existing subscription allows the client to ask the remote
|
||||
// peer if a subscription with the given name for the current 'Container ID'
|
||||
// exists. The process involves the client attaching a receiver with a null
|
||||
// Source on a link with the desired subscription name as the link name and
|
||||
// the broker will then return a Source instance if this current container
|
||||
// has a subscription registered with that subscription (link) name.
|
||||
private static void RecoverDurableSubscription()
|
||||
{
|
||||
Connection connection = new Connection(new Address(DEFAULT_BROKER_URI),
|
||||
SaslProfile.Anonymous,
|
||||
new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null);
|
||||
|
||||
try
|
||||
{
|
||||
Session session = new Session(connection);
|
||||
Source recoveredSource = null;
|
||||
ManualResetEvent attached = new ManualResetEvent(false);
|
||||
|
||||
OnAttached onAttached = (link, attach) =>
|
||||
{
|
||||
recoveredSource = (Source) attach.Source;
|
||||
attached.Set();
|
||||
};
|
||||
|
||||
ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached);
|
||||
|
||||
attached.WaitOne(10000);
|
||||
if (recoveredSource == null)
|
||||
{
|
||||
// The remote had no subscription matching what we asked for.
|
||||
throw new AmqpException(new Error());
|
||||
}
|
||||
else
|
||||
{
|
||||
Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address);
|
||||
Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy);
|
||||
Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable);
|
||||
Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode);
|
||||
}
|
||||
|
||||
session.Close();
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.Close();
|
||||
}
|
||||
}
|
||||
|
||||
// Unsubscribing a durable subscription involves recovering an existing
|
||||
// subscription and then closing the receiver link explicitly or in AMQP
|
||||
// terms the close value of the Detach frame should be 'true'
|
||||
private static void UnsubscribeDurableSubscription()
|
||||
{
|
||||
Connection connection = new Connection(new Address(DEFAULT_BROKER_URI),
|
||||
SaslProfile.Anonymous,
|
||||
new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null);
|
||||
|
||||
try
|
||||
{
|
||||
Session session = new Session(connection);
|
||||
Source recoveredSource = null;
|
||||
ManualResetEvent attached = new ManualResetEvent(false);
|
||||
|
||||
OnAttached onAttached = (link, attach) =>
|
||||
{
|
||||
recoveredSource = (Source) attach.Source;
|
||||
attached.Set();
|
||||
};
|
||||
|
||||
ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached);
|
||||
|
||||
attached.WaitOne(10000);
|
||||
if (recoveredSource == null)
|
||||
{
|
||||
// The remote had no subscription matching what we asked for.
|
||||
throw new AmqpException(new Error());
|
||||
}
|
||||
else
|
||||
{
|
||||
Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address);
|
||||
Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy);
|
||||
Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable);
|
||||
Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode);
|
||||
}
|
||||
|
||||
// Closing the Receiver vs. detaching it will unsubscribe
|
||||
receiver.Close();
|
||||
|
||||
session.Close();
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.Close();
|
||||
}
|
||||
}
|
||||
|
||||
// Creates a basic Source type that contains common attributes needed
|
||||
// to describe to the remote peer the features and expectations of the
|
||||
// Source of the Receiver link.
|
||||
private static Source CreateBasicSource()
|
||||
{
|
||||
Source source = new Source();
|
||||
|
||||
// These are the outcomes this link will accept.
|
||||
Symbol[] outcomes = new Symbol[] {new Symbol("amqp:accepted:list"),
|
||||
new Symbol("amqp:rejected:list"),
|
||||
new Symbol("amqp:released:list"),
|
||||
new Symbol("amqp:modified:list") };
|
||||
|
||||
// Default Outcome for deliveries not settled on this link
|
||||
Modified defaultOutcome = new Modified();
|
||||
defaultOutcome.DeliveryFailed = true;
|
||||
defaultOutcome.UndeliverableHere = false;
|
||||
|
||||
// Configure Source.
|
||||
source.DefaultOutcome = defaultOutcome;
|
||||
source.Outcomes = outcomes;
|
||||
|
||||
return source;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>netcoreapp1.1</TargetFramework>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="AMQPNetLite.Core" Version="2.0.0-pre" />
|
||||
</ItemGroup>
|
||||
</Project>
|
|
@ -0,0 +1,54 @@
|
|||
# Running the .NET AMQP example
|
||||
|
||||
|
||||
# Pre-requisites:
|
||||
|
||||
All of this can be done on Linux, Mac and... Windows
|
||||
|
||||
- Install .NET
|
||||
|
||||
https://www.microsoft.com/net/core
|
||||
|
||||
|
||||
- Visual Studio Code is free and may be useful:
|
||||
|
||||
https://code.visualstudio.com
|
||||
|
||||
|
||||
- Powershell might be also useful:
|
||||
|
||||
https://github.com/PowerShell/PowerShell/
|
||||
|
||||
|
||||
|
||||
# running the example
|
||||
|
||||
- Create the broker, by running:
|
||||
|
||||
```
|
||||
../../../../../bin/artemis create ./server1 --user a --password a --role a --allow-anonymous --force
|
||||
./server1/bin/artemis-service start
|
||||
```
|
||||
|
||||
Or simply use the start-broker.sh script on this directory
|
||||
|
||||
- Create an Topic to subscribe to
|
||||
|
||||
../../../../../bin/artemis queue create --name test-topic --auto-create-address --multicast --preserve-on-no-consumers --durable --address test-topic
|
||||
|
||||
- Compile the code
|
||||
|
||||
You need call restore to download AMQP Library and build it.
|
||||
Restore is part of NuGET which is sort of the Maven Repo for Java devs.
|
||||
|
||||
```sh
|
||||
dotnet restore
|
||||
dotnet build
|
||||
dotnet run
|
||||
```
|
||||
|
||||
Or simply use the run-example.sh script on this directory
|
||||
|
||||
- Debugging
|
||||
|
||||
Visual Studio Code will make it fairly easy to do it
|
|
@ -0,0 +1,26 @@
|
|||
#!/usr/bin/env sh
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
echo "Make sure you run start-broker.sh first"
|
||||
|
||||
# Use this as an example for creating a Topic to subscribe to
|
||||
../../../../../bin/artemis queue create --name test-topic --auto-create-address --multicast --preserve-on-no-consumers --durable --address test-topic
|
||||
|
||||
dotnet restore
|
||||
dotnet build
|
||||
dotnet run
|
|
@ -0,0 +1,22 @@
|
|||
#!/usr/bin/env sh
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# if you are using a source distribution you will have to create and start the broker manually
|
||||
# Use this as a reference!
|
||||
../../../../../bin/artemis create ./server1 --user a --password a --role a --allow-anonymous --force
|
||||
./server1/bin/artemis-service start
|
|
@ -26,7 +26,7 @@ https://github.com/PowerShell/PowerShell/
|
|||
- Create the broker, by running:
|
||||
|
||||
```
|
||||
../../../../bin/artemis create ./server1 --user a --password a --role a --allow-anonymous --force
|
||||
../../../../../bin/artemis create ./server1 --user a --password a --role a --allow-anonymous --force
|
||||
./server1/bin/artemis-service start
|
||||
```
|
||||
|
Loading…
Reference in New Issue