/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using Amqp; using Amqp.Framing; using Amqp.Types; using Amqp.Sasl; using System.Threading; namespace aorg.apache.activemq.examples { class Program { private static string DEFAULT_BROKER_URI = "amqp://localhost:5672"; private static string DEFAULT_CONTAINER_ID = "client-1"; private static string DEFAULT_SUBSCRIPTION_NAME = "test-subscription"; private static string DEFAULT_TOPIC_NAME = "test-topic"; static void Main(string[] args) { Console.WriteLine("Starting AMQP durable consumer example."); Console.WriteLine("Creating a Durable Subscription"); CreateDurableSubscription(); Console.WriteLine("Attempting to recover a Durable Subscription"); RecoverDurableSubscription(); Console.WriteLine("Unsubscribe a durable subscription"); UnsubscribeDurableSubscription(); Console.WriteLine("Attempting to recover a non-existent durable subscription"); try { RecoverDurableSubscription(); throw new Exception("Subscription was not deleted."); } catch (AmqpException) { Console.WriteLine("Recover failed as expected"); } Console.WriteLine("Example Complete."); } // Creating a durable subscription involves creating a Receiver with a Source that // has the address set to the Topic name where the client wants to subscribe along // with an expiry policy of 'never', Terminus Durability set to 'unsettled' and the // Distribution Mode set to 'Copy'. The link name of the Receiver represents the // desired name of the Subscription and of course the Connection must carry a container // ID uniqure to the client that is creating the subscription. private static void CreateDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source source = CreateBasicSource(); // Create a Durable Consumer Source. source.Address = DEFAULT_TOPIC_NAME; source.ExpiryPolicy = new Symbol("never"); source.Durable = 2; source.DistributionMode = new Symbol("copy"); ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, source, null); session.Close(); } finally { connection.Close(); } } // Recovering an existing subscription allows the client to ask the remote // peer if a subscription with the given name for the current 'Container ID' // exists. The process involves the client attaching a receiver with a null // Source on a link with the desired subscription name as the link name and // the broker will then return a Source instance if this current container // has a subscription registered with that subscription (link) name. private static void RecoverDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source recoveredSource = null; ManualResetEvent attached = new ManualResetEvent(false); OnAttached onAttached = (link, attach) => { recoveredSource = (Source) attach.Source; attached.Set(); }; ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached); attached.WaitOne(10000); if (recoveredSource == null) { // The remote had no subscription matching what we asked for. throw new AmqpException(new Error()); } else { Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address); Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy); Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable); Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode); } session.Close(); } finally { connection.Close(); } } // Unsubscribing a durable subscription involves recovering an existing // subscription and then closing the receiver link explicitly or in AMQP // terms the close value of the Detach frame should be 'true' private static void UnsubscribeDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source recoveredSource = null; ManualResetEvent attached = new ManualResetEvent(false); OnAttached onAttached = (link, attach) => { recoveredSource = (Source) attach.Source; attached.Set(); }; ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached); attached.WaitOne(10000); if (recoveredSource == null) { // The remote had no subscription matching what we asked for. throw new AmqpException(new Error()); } else { Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address); Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy); Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable); Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode); } // Closing the Receiver vs. detaching it will unsubscribe receiver.Close(); session.Close(); } finally { connection.Close(); } } // Creates a basic Source type that contains common attributes needed // to describe to the remote peer the features and expectations of the // Source of the Receiver link. private static Source CreateBasicSource() { Source source = new Source(); // These are the outcomes this link will accept. Symbol[] outcomes = new Symbol[] {new Symbol("amqp:accepted:list"), new Symbol("amqp:rejected:list"), new Symbol("amqp:released:list"), new Symbol("amqp:modified:list") }; // Default Outcome for deliveries not settled on this link Modified defaultOutcome = new Modified(); defaultOutcome.DeliveryFailed = true; defaultOutcome.UndeliverableHere = false; // Configure Source. source.DefaultOutcome = defaultOutcome; source.Outcomes = outcomes; return source; } } }