diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/App.cs b/examples/protocols/amqp/dotnet/HighPerformanceLoad/App.cs new file mode 100644 index 0000000000..c1a94b123d --- /dev/null +++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/App.cs @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.Threading; +using Amqp.Framing; +using Amqp; +using System.Threading.Tasks; + +namespace Artemis.Perf +{ + class App + { + static long ReceivedMessages = 0; + + static void TheCallback(int id, Session session, ReceiverLink link, Message message) + { + Interlocked.Increment(ref ReceivedMessages); + link.Accept(message); + } + + static void Main(string[] args) { + + if (args.Length == 0) { + args = new string[1]; + args[0] = "amqp://127.0.0.1:5672"; + } + + // it will start one client towards each server + for (int i = 0; i < args.Length; i++) { + string addr0 = args.Length >= 1 ? args[0] : "amqp://127.0.0.1:5672"; + processOn(addr0, "orders", 100000000, 25000, "p1"); + } + + while (true) { + long previousRead = Interlocked.Read(ref ReceivedMessages); + long previousSent = Interlocked.Read(ref Producer.totalSent); + Thread.Sleep(1000); + long currentRead = Interlocked.Read(ref ReceivedMessages); + long currentSent = Interlocked.Read(ref Producer.totalSent); + Console.WriteLine("Received: " + currentRead + " TotalSent: " + + currentSent); + Console.WriteLine("Rate reading: " + (currentRead - previousRead) + ", Rate sending: " + (currentSent - previousSent)); + } + + } + static void processOn(string addr, string queue, int totalSend, int maxRateSend, String processName) { + Address address = new Address(addr); + + Connection connection = new Connection(address); + + ReceiverPool pool = new ReceiverPool(connection, 1, queue, 200, TheCallback); + pool.start(); + + Producer Producer = new Producer(processName, addr, queue, totalSend, maxRateSend); + Producer.produce(); // this will start an asynchronous producer + + } + } +} diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/HighPerformance.csproj b/examples/protocols/amqp/dotnet/HighPerformanceLoad/HighPerformance.csproj new file mode 100644 index 0000000000..99a2d1a2ff --- /dev/null +++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/HighPerformance.csproj @@ -0,0 +1,29 @@ + + + + + + Exe + netcoreapp2.0 + + + + + + + diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/Producer.cs b/examples/protocols/amqp/dotnet/HighPerformanceLoad/Producer.cs new file mode 100644 index 0000000000..8e9cefe51a --- /dev/null +++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/Producer.cs @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Threading; +using Amqp.Framing; +using Amqp; +using System.Threading.Tasks; + +namespace Artemis.Perf +{ + public class Producer + { + string name; + + string addr; + string queue; + int numberOfMessages; + int messagesPerSecond; + long messagesSent; + + public static long totalSent; + + public Producer(string name, string addr, string queue, int numberOfMessages, int messagesPerSecond) { + this.name = name; + this.addr = addr; + this.queue = queue; + this.numberOfMessages = numberOfMessages; + this.messagesPerSecond = messagesPerSecond; + } + + + public void produce() { + Address address = new Address(addr); + + Connection connection = new Connection(address); + + + Session session = new Session(connection); + SenderLink sender = new SenderLink(session, "sender", queue); + + OutcomeCallback callback = (l, msg, o, s) => { + Interlocked.Increment(ref messagesSent); + Interlocked.Increment(ref totalSent); + }; + + // This is just to limit the number of messages per second we are sending + TokenBucketLimiterImpl tokens = new TokenBucketLimiterImpl(messagesPerSecond); + + Task.Factory.StartNew(() => { + Console.WriteLine("Sending {0} messages...", numberOfMessages); + for (var i = 0; i < numberOfMessages; i++) + { + tokens.limit(); + Message message = new Message("a message!" + i); + message.Header = new Header(); + message.Header.Durable = true; + + // The callback here is to make the sending to happen as fast as possible + sender.Send(message, callback, null); + } + Console.WriteLine(".... Done sending"); + }, TaskCreationOptions.LongRunning); + + // Trace.TraceLevel = TraceLevel.Verbose | TraceLevel.Error | + // TraceLevel.Frame | TraceLevel.Information | TraceLevel.Warning; + // Trace.TraceListener = (l, f, o) => Console.WriteLine(DateTime.Now.ToString("[hh:mm:ss.fff]") + " " + string.Format(f, o)); + + // sender.Close(); + + // Task.Factory.StartNew(() => { + // while (true) { + // Console.WriteLine("Sent " + Interlocked.Read(ref messagesSent) + " on queue " + queue + " producer " + this.name); + // Thread.Sleep(1000); + // } + // }, TaskCreationOptions.LongRunning); + + + + } + } +} diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/ReceiverPool.cs b/examples/protocols/amqp/dotnet/HighPerformanceLoad/ReceiverPool.cs new file mode 100644 index 0000000000..dce841a279 --- /dev/null +++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/ReceiverPool.cs @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Threading; +using Amqp.Framing; +using Amqp; +using System.Threading.Tasks; + +namespace Artemis.Perf +{ + /** + * This class will start many consumers underneath it to satisfy a pool of consumers + * While calling a single callback for when messages are received. + */ + public class ReceiverPool + { + public delegate void MessageReceived(int id, Session session, ReceiverLink link, Message msg); + + public int MessagesReceived; + + MessageReceived _callback; + int _Workers; + private Object receiverLock = new Object(); + private Boolean running = true; + + private ReceiverLink[] Receivers; + private Session[] Sessions; + + private Connection _Connection; + + private int Credits; + + public ReceiverPool(Connection Connection, int Workers, String queue, int Credits, MessageReceived callback) + { + this._Connection = Connection; + this.Receivers = new ReceiverLink[Workers]; + this.Sessions = new Session[Workers]; + + + for (int i = 0; i < Workers; i++) + { + + // I was playing with using a single session versus multiple sessions + if (i == 0) { + Sessions[i] = new Session(Connection); + } + else { + Sessions[i] = Sessions[0]; + } + Receivers[i] = new ReceiverLink(Sessions[i], "receiver " + queue + " " + i, queue); + } + this._Workers = Workers; + this._callback = callback; + this.Credits = Credits; + } + + + public void stop() { + running = false; + for (int i = 0; i < _Workers; i++) { + Receivers[i].Close(); + Sessions[i].Close(); + } + } + + + public void start() { + for (int i = 0; i < _Workers; i++) { + { + // This variable exists otherwise we would get an olderValue of i + int value = i; + Task.Factory.StartNew(() => WorkerRun(value), TaskCreationOptions.LongRunning); + } + } + } + + void WorkerRun(int i) { + try { + Receivers[i].SetCredit(Credits); + while (running) + { + Message theMessage = Receivers[i].Receive(TimeSpan.FromSeconds(1)); + + if (theMessage != null) + { + _callback(i, Sessions[i], Receivers[i], theMessage); + } + } + } catch (Exception e) { + Console.WriteLine(e); + } + } + } + +} \ No newline at end of file diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/TokenBucketLimiter.cs b/examples/protocols/amqp/dotnet/HighPerformanceLoad/TokenBucketLimiter.cs new file mode 100644 index 0000000000..2685e921ff --- /dev/null +++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/TokenBucketLimiter.cs @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Threading; +using Amqp.Framing; +using Amqp; +using System.Threading.Tasks; + +namespace Artemis.Perf +{ + + // this has been copied from Artemis' TokenBucketLimiter with some modifications + public class TokenBucketLimiterImpl { + + private int rate; + + /** + * Even thought we don't use TokenBucket in multiThread + * the implementation should keep this volatile for correctness + */ + private long last; + + /** + * Even thought we don't use TokenBucket in multiThread + * the implementation should keep this volatile for correctness + */ + private int tokens; + + public TokenBucketLimiterImpl(int rate) { + this.rate = rate; + } + private bool checkRate() { + + long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + + if (last == 0) { + last = now; + } + + long diff = now - last; + + if (diff >= 1000) { + last = now; + + tokens = rate; + } + + if (tokens > 0) { + tokens--; + + return true; + } else { + return false; + } + } + + public void limit() { + if (!checkRate()) { + // Console.WriteLine("Limiting messages per max rate"); + do { + Thread.Sleep(1); + } while (!checkRate()); + } + } + } +} \ No newline at end of file diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/readme.md b/examples/protocols/amqp/dotnet/HighPerformanceLoad/readme.md new file mode 100644 index 0000000000..65be3742d5 --- /dev/null +++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/readme.md @@ -0,0 +1,62 @@ +# Running the .NET AMQP example + + +# Pre-requisites: + +All of this can be done on Linux, Mac and... Windows + +- Install .NET + +https://www.microsoft.com/net/core + + +- Visual Studio Code is free and may be useful: + +https://code.visualstudio.com + + +- Powershell might be also useful: + +https://github.com/PowerShell/PowerShell/ + + + +# running the example + +- Create and start the broker, by running: + +```bash +./start-server.sh +``` + +This broker is created by simply using the CLI. you may do it manually if you like: + +```bash +artemis create ./server1 --user a --password a --role a --allow-anonymous --force +cd server1/bin +./artemis run +``` + +- Compile the code + +You need call restore to download AMQP Library and build it. +Restore is part of NuGET which is sort of the Maven Repo for Java devs. + +```sh +dotnet restore +dotnet build +dotnet run +``` + +Or simply use the run-example.sh script on this directory + +- Debugging + +Visual Studio Code will make it fairly easy to do it + + +# About this example + +This is sending messages, limited to 25K messages a second. +The consumer will have a pool of consumers, which will synchronously acknowledge messages. +.NET threading model is expensive, this example shows how to make most of your resources by a pool of consumers. diff --git a/examples/protocols/amqp/dotnet/HighPerformanceLoad/start-server.sh b/examples/protocols/amqp/dotnet/HighPerformanceLoad/start-server.sh new file mode 100755 index 0000000000..4b50a0cb0b --- /dev/null +++ b/examples/protocols/amqp/dotnet/HighPerformanceLoad/start-server.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env sh +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Setting the script to fail if anything goes wrong +set -e + +rm -rf ./server1 +../../../../../bin/artemis create ./server1 --user a --password a --role a --allow-anonymous --force +./target/server0/bin/artemis run +#sleep 1 +#../../../../target/clustered-static-node1/bin/artemis run | tee server2.log & +#sleep 1 +#../../../../target/clustered-static-node2/bin/artemis run | tee server3.log & +#sleep 1