This commit is contained in:
franz1981 2022-03-03 16:49:57 +01:00
commit 7effa488c2
1 changed files with 7 additions and 4 deletions

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.cli.commands.messages.perf; package org.apache.activemq.artemis.cli.commands.messages.perf;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier; import java.util.function.BooleanSupplier;
import io.netty.util.concurrent.OrderedEventExecutor; import io.netty.util.concurrent.OrderedEventExecutor;
@ -24,7 +23,9 @@ import org.HdrHistogram.SingleWriterRecorder;
public final class ProducerTargetRateLoadGenerator extends SkeletalProducerLoadGenerator { public final class ProducerTargetRateLoadGenerator extends SkeletalProducerLoadGenerator {
private final long usPeriod; private final double usPeriod;
private long startLoadMicros;
private long fireCount;
private long fireTimeMicros; private long fireTimeMicros;
private boolean started; private boolean started;
@ -39,7 +40,7 @@ public final class ProducerTargetRateLoadGenerator extends SkeletalProducerLoadG
final SingleWriterRecorder waitLatencies) { final SingleWriterRecorder waitLatencies) {
super(producer, executor, timeProvider, keepOnSending, group, msgContent, sendCompletedLatencies, waitLatencies); super(producer, executor, timeProvider, keepOnSending, group, msgContent, sendCompletedLatencies, waitLatencies);
this.fireTimeMicros = 0; this.fireTimeMicros = 0;
this.usPeriod = TimeUnit.NANOSECONDS.toMicros(nsPeriod); this.usPeriod = nsPeriod / 1000d;
this.started = false; this.started = false;
} }
@ -51,6 +52,7 @@ public final class ProducerTargetRateLoadGenerator extends SkeletalProducerLoadG
final long now = timeProvider.now(); final long now = timeProvider.now();
if (!started) { if (!started) {
started = true; started = true;
startLoadMicros = now;
fireTimeMicros = now; fireTimeMicros = now;
} }
if (!trySend(fireTimeMicros, now)) { if (!trySend(fireTimeMicros, now)) {
@ -61,7 +63,8 @@ public final class ProducerTargetRateLoadGenerator extends SkeletalProducerLoadG
stopLoad = true; stopLoad = true;
return; return;
} }
fireTimeMicros += usPeriod; fireCount++;
fireTimeMicros = startLoadMicros + (long) (fireCount * usPeriod);
final long delay = fireTimeMicros - timeProvider.now(); final long delay = fireTimeMicros - timeProvider.now();
final long usToNextFireTime = Math.max(0, delay); final long usToNextFireTime = Math.max(0, delay);
asyncContinue(usToNextFireTime); asyncContinue(usToNextFireTime);