This commit is contained in:
Clebert Suconic 2018-06-22 10:36:14 -04:00
commit faf99cd68f
7 changed files with 480 additions and 0 deletions

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using Amqp.Framing;
using Amqp;
using System.Threading.Tasks;
namespace Artemis.Perf
{
class App
{
static long ReceivedMessages = 0;
static void TheCallback(int id, Session session, ReceiverLink link, Message message)
{
Interlocked.Increment(ref ReceivedMessages);
link.Accept(message);
}
static void Main(string[] args) {
if (args.Length == 0) {
args = new string[1];
args[0] = "amqp://127.0.0.1:5672";
}
// it will start one client towards each server
for (int i = 0; i < args.Length; i++) {
string addr0 = args.Length >= 1 ? args[0] : "amqp://127.0.0.1:5672";
processOn(addr0, "orders", 100000000, 25000, "p1");
}
while (true) {
long previousRead = Interlocked.Read(ref ReceivedMessages);
long previousSent = Interlocked.Read(ref Producer.totalSent);
Thread.Sleep(1000);
long currentRead = Interlocked.Read(ref ReceivedMessages);
long currentSent = Interlocked.Read(ref Producer.totalSent);
Console.WriteLine("Received: " + currentRead + " TotalSent: " +
currentSent);
Console.WriteLine("Rate reading: " + (currentRead - previousRead) + ", Rate sending: " + (currentSent - previousSent));
}
}
static void processOn(string addr, string queue, int totalSend, int maxRateSend, String processName) {
Address address = new Address(addr);
Connection connection = new Connection(address);
ReceiverPool pool = new ReceiverPool(connection, 1, queue, 200, TheCallback);
pool.start();
Producer Producer = new Producer(processName, addr, queue, totalSend, maxRateSend);
Producer.produce(); // this will start an asynchronous producer
}
}
}

View File

@ -0,0 +1,29 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AMQPNetLite" Version="2.1.3" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using Amqp.Framing;
using Amqp;
using System.Threading.Tasks;
namespace Artemis.Perf
{
public class Producer
{
string name;
string addr;
string queue;
int numberOfMessages;
int messagesPerSecond;
long messagesSent;
public static long totalSent;
public Producer(string name, string addr, string queue, int numberOfMessages, int messagesPerSecond) {
this.name = name;
this.addr = addr;
this.queue = queue;
this.numberOfMessages = numberOfMessages;
this.messagesPerSecond = messagesPerSecond;
}
public void produce() {
Address address = new Address(addr);
Connection connection = new Connection(address);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender", queue);
OutcomeCallback callback = (l, msg, o, s) => {
Interlocked.Increment(ref messagesSent);
Interlocked.Increment(ref totalSent);
};
// This is just to limit the number of messages per second we are sending
TokenBucketLimiterImpl tokens = new TokenBucketLimiterImpl(messagesPerSecond);
Task.Factory.StartNew(() => {
Console.WriteLine("Sending {0} messages...", numberOfMessages);
for (var i = 0; i < numberOfMessages; i++)
{
tokens.limit();
Message message = new Message("a message!" + i);
message.Header = new Header();
message.Header.Durable = true;
// The callback here is to make the sending to happen as fast as possible
sender.Send(message, callback, null);
}
Console.WriteLine(".... Done sending");
}, TaskCreationOptions.LongRunning);
// Trace.TraceLevel = TraceLevel.Verbose | TraceLevel.Error |
// TraceLevel.Frame | TraceLevel.Information | TraceLevel.Warning;
// Trace.TraceListener = (l, f, o) => Console.WriteLine(DateTime.Now.ToString("[hh:mm:ss.fff]") + " " + string.Format(f, o));
// sender.Close();
// Task.Factory.StartNew(() => {
// while (true) {
// Console.WriteLine("Sent " + Interlocked.Read(ref messagesSent) + " on queue " + queue + " producer " + this.name);
// Thread.Sleep(1000);
// }
// }, TaskCreationOptions.LongRunning);
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using Amqp.Framing;
using Amqp;
using System.Threading.Tasks;
namespace Artemis.Perf
{
/**
* This class will start many consumers underneath it to satisfy a pool of consumers
* While calling a single callback for when messages are received.
*/
public class ReceiverPool
{
public delegate void MessageReceived(int id, Session session, ReceiverLink link, Message msg);
public int MessagesReceived;
MessageReceived _callback;
int _Workers;
private Object receiverLock = new Object();
private Boolean running = true;
private ReceiverLink[] Receivers;
private Session[] Sessions;
private Connection _Connection;
private int Credits;
public ReceiverPool(Connection Connection, int Workers, String queue, int Credits, MessageReceived callback)
{
this._Connection = Connection;
this.Receivers = new ReceiverLink[Workers];
this.Sessions = new Session[Workers];
for (int i = 0; i < Workers; i++)
{
// I was playing with using a single session versus multiple sessions
if (i == 0) {
Sessions[i] = new Session(Connection);
}
else {
Sessions[i] = Sessions[0];
}
Receivers[i] = new ReceiverLink(Sessions[i], "receiver " + queue + " " + i, queue);
}
this._Workers = Workers;
this._callback = callback;
this.Credits = Credits;
}
public void stop() {
running = false;
for (int i = 0; i < _Workers; i++) {
Receivers[i].Close();
Sessions[i].Close();
}
}
public void start() {
for (int i = 0; i < _Workers; i++) {
{
// This variable exists otherwise we would get an olderValue of i
int value = i;
Task.Factory.StartNew(() => WorkerRun(value), TaskCreationOptions.LongRunning);
}
}
}
void WorkerRun(int i) {
try {
Receivers[i].SetCredit(Credits);
while (running)
{
Message theMessage = Receivers[i].Receive(TimeSpan.FromSeconds(1));
if (theMessage != null)
{
_callback(i, Sessions[i], Receivers[i], theMessage);
}
}
} catch (Exception e) {
Console.WriteLine(e);
}
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using Amqp.Framing;
using Amqp;
using System.Threading.Tasks;
namespace Artemis.Perf
{
// this has been copied from Artemis' TokenBucketLimiter with some modifications
public class TokenBucketLimiterImpl {
private int rate;
/**
* Even thought we don't use TokenBucket in multiThread
* the implementation should keep this volatile for correctness
*/
private long last;
/**
* Even thought we don't use TokenBucket in multiThread
* the implementation should keep this volatile for correctness
*/
private int tokens;
public TokenBucketLimiterImpl(int rate) {
this.rate = rate;
}
private bool checkRate() {
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (last == 0) {
last = now;
}
long diff = now - last;
if (diff >= 1000) {
last = now;
tokens = rate;
}
if (tokens > 0) {
tokens--;
return true;
} else {
return false;
}
}
public void limit() {
if (!checkRate()) {
// Console.WriteLine("Limiting messages per max rate");
do {
Thread.Sleep(1);
} while (!checkRate());
}
}
}
}

View File

@ -0,0 +1,62 @@
# Running the .NET AMQP example
# Pre-requisites:
All of this can be done on Linux, Mac and... Windows
- Install .NET
https://www.microsoft.com/net/core
- Visual Studio Code is free and may be useful:
https://code.visualstudio.com
- Powershell might be also useful:
https://github.com/PowerShell/PowerShell/
# running the example
- Create and start the broker, by running:
```bash
./start-server.sh
```
This broker is created by simply using the CLI. you may do it manually if you like:
```bash
artemis create ./server1 --user a --password a --role a --allow-anonymous --force
cd server1/bin
./artemis run
```
- Compile the code
You need call restore to download AMQP Library and build it.
Restore is part of NuGET which is sort of the Maven Repo for Java devs.
```sh
dotnet restore
dotnet build
dotnet run
```
Or simply use the run-example.sh script on this directory
- Debugging
Visual Studio Code will make it fairly easy to do it
# About this example
This is sending messages, limited to 25K messages a second.
The consumer will have a pool of consumers, which will synchronously acknowledge messages.
.NET threading model is expensive, this example shows how to make most of your resources by a pool of consumers.

View File

@ -0,0 +1,29 @@
#!/usr/bin/env sh
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Setting the script to fail if anything goes wrong
set -e
rm -rf ./server1
../../../../../bin/artemis create ./server1 --user a --password a --role a --allow-anonymous --force
./target/server0/bin/artemis run
#sleep 1
#../../../../target/clustered-static-node1/bin/artemis run | tee server2.log &
#sleep 1
#../../../../target/clustered-static-node2/bin/artemis run | tee server3.log &
#sleep 1