activemq-artemis/examples/protocols/amqp/dotnet/HighPerformanceLoad/Producer.cs

97 lines
3.4 KiB
C#

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using Amqp.Framing;
using Amqp;
using System.Threading.Tasks;
namespace Artemis.Perf
{
public class Producer
{
string name;
string addr;
string queue;
int numberOfMessages;
int messagesPerSecond;
long messagesSent;
public static long totalSent;
public Producer(string name, string addr, string queue, int numberOfMessages, int messagesPerSecond) {
this.name = name;
this.addr = addr;
this.queue = queue;
this.numberOfMessages = numberOfMessages;
this.messagesPerSecond = messagesPerSecond;
}
public void produce() {
Address address = new Address(addr);
Connection connection = new Connection(address);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender", queue);
OutcomeCallback callback = (l, msg, o, s) => {
Interlocked.Increment(ref messagesSent);
Interlocked.Increment(ref totalSent);
};
// This is just to limit the number of messages per second we are sending
TokenBucketLimiterImpl tokens = new TokenBucketLimiterImpl(messagesPerSecond);
Task.Factory.StartNew(() => {
Console.WriteLine("Sending {0} messages...", numberOfMessages);
for (var i = 0; i < numberOfMessages; i++)
{
tokens.limit();
Message message = new Message("a message!" + i);
message.Header = new Header();
message.Header.Durable = true;
// The callback here is to make the sending to happen as fast as possible
sender.Send(message, callback, null);
}
Console.WriteLine(".... Done sending");
}, TaskCreationOptions.LongRunning);
// Trace.TraceLevel = TraceLevel.Verbose | TraceLevel.Error |
// TraceLevel.Frame | TraceLevel.Information | TraceLevel.Warning;
// Trace.TraceListener = (l, f, o) => Console.WriteLine(DateTime.Now.ToString("[hh:mm:ss.fff]") + " " + string.Format(f, o));
// sender.Close();
// Task.Factory.StartNew(() => {
// while (true) {
// Console.WriteLine("Sent " + Interlocked.Read(ref messagesSent) + " on queue " + queue + " producer " + this.name);
// Thread.Sleep(1000);
// }
// }, TaskCreationOptions.LongRunning);
}
}
}