mirror of https://github.com/apache/druid.git
Fix wrong counter getFailedSendingTimeCounter method (#6793)
* Fix wrong counter getFailedSendingTimeCounter method * Add testcases * Add getTimeSumAndCount for testcases
This commit is contained in:
parent
114a9fc38f
commit
e8ddd9942d
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.apache.druid.java.util.emitter.core;
|
package org.apache.druid.java.util.emitter.core;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.primitives.UnsignedInts;
|
import com.google.common.primitives.UnsignedInts;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -66,6 +67,12 @@ public class ConcurrentTimeCounter
|
||||||
} while (!this.min.compareAndSet(min, UnsignedInts.toLong(time)));
|
} while (!this.min.compareAndSet(min, UnsignedInts.toLong(time)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
long getTimeSumAndCount()
|
||||||
|
{
|
||||||
|
return timeSumAndCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
public long getTimeSumAndCountAndReset()
|
public long getTimeSumAndCountAndReset()
|
||||||
{
|
{
|
||||||
return timeSumAndCount.getAndSet(0L);
|
return timeSumAndCount.getAndSet(0L);
|
||||||
|
|
|
@ -927,7 +927,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
|
||||||
|
|
||||||
public ConcurrentTimeCounter getFailedSendingTimeCounter()
|
public ConcurrentTimeCounter getFailedSendingTimeCounter()
|
||||||
{
|
{
|
||||||
return emittingThread.successfulSendingTimeCounter;
|
return emittingThread.failedSendingTimeCounter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -382,6 +382,8 @@ public class EmitterTest
|
||||||
final UnitEvent event2 = new UnitEvent("test", 2);
|
final UnitEvent event2 = new UnitEvent("test", 2);
|
||||||
emitter = sizeBasedEmitter(1);
|
emitter = sizeBasedEmitter(1);
|
||||||
Assert.assertEquals(0, emitter.getTotalEmittedEvents());
|
Assert.assertEquals(0, emitter.getTotalEmittedEvents());
|
||||||
|
Assert.assertEquals(0, emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount());
|
||||||
|
Assert.assertEquals(0, emitter.getFailedSendingTimeCounter().getTimeSumAndCount());
|
||||||
|
|
||||||
httpClient.setGoHandler(
|
httpClient.setGoHandler(
|
||||||
new GoHandler()
|
new GoHandler()
|
||||||
|
@ -401,6 +403,8 @@ public class EmitterTest
|
||||||
|
|
||||||
// Failed to emit the first event.
|
// Failed to emit the first event.
|
||||||
Assert.assertEquals(0, emitter.getTotalEmittedEvents());
|
Assert.assertEquals(0, emitter.getTotalEmittedEvents());
|
||||||
|
Assert.assertEquals(0, emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount());
|
||||||
|
Assert.assertTrue(emitter.getFailedSendingTimeCounter().getTimeSumAndCount() > 0);
|
||||||
|
|
||||||
httpClient.setGoHandler(
|
httpClient.setGoHandler(
|
||||||
new GoHandler()
|
new GoHandler()
|
||||||
|
@ -423,6 +427,8 @@ public class EmitterTest
|
||||||
|
|
||||||
// Succeed to emit both events.
|
// Succeed to emit both events.
|
||||||
Assert.assertEquals(2, emitter.getTotalEmittedEvents());
|
Assert.assertEquals(2, emitter.getTotalEmittedEvents());
|
||||||
|
Assert.assertTrue(emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount() > 0);
|
||||||
|
Assert.assertTrue(emitter.getFailedSendingTimeCounter().getTimeSumAndCount() > 0);
|
||||||
|
|
||||||
Assert.assertTrue(httpClient.succeeded());
|
Assert.assertTrue(httpClient.succeeded());
|
||||||
}
|
}
|
||||||
|
@ -491,6 +497,8 @@ public class EmitterTest
|
||||||
final AtomicInteger counter = new AtomicInteger();
|
final AtomicInteger counter = new AtomicInteger();
|
||||||
emitter = manualFlushEmitterWithBatchSize(1024 * 1024);
|
emitter = manualFlushEmitterWithBatchSize(1024 * 1024);
|
||||||
Assert.assertEquals(0, emitter.getTotalEmittedEvents());
|
Assert.assertEquals(0, emitter.getTotalEmittedEvents());
|
||||||
|
Assert.assertEquals(0, emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount());
|
||||||
|
Assert.assertEquals(0, emitter.getFailedSendingTimeCounter().getTimeSumAndCount());
|
||||||
|
|
||||||
httpClient.setGoHandler(
|
httpClient.setGoHandler(
|
||||||
new GoHandler()
|
new GoHandler()
|
||||||
|
@ -522,10 +530,14 @@ public class EmitterTest
|
||||||
}
|
}
|
||||||
waitForEmission(emitter, 1);
|
waitForEmission(emitter, 1);
|
||||||
Assert.assertEquals(2, emitter.getTotalEmittedEvents());
|
Assert.assertEquals(2, emitter.getTotalEmittedEvents());
|
||||||
|
Assert.assertTrue(emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount() > 0);
|
||||||
|
Assert.assertEquals(0, emitter.getFailedSendingTimeCounter().getTimeSumAndCount());
|
||||||
|
|
||||||
emitter.flush();
|
emitter.flush();
|
||||||
waitForEmission(emitter, 2);
|
waitForEmission(emitter, 2);
|
||||||
Assert.assertEquals(4, emitter.getTotalEmittedEvents());
|
Assert.assertEquals(4, emitter.getTotalEmittedEvents());
|
||||||
|
Assert.assertTrue(emitter.getSuccessfulSendingTimeCounter().getTimeSumAndCount() > 0);
|
||||||
|
Assert.assertEquals(0, emitter.getFailedSendingTimeCounter().getTimeSumAndCount());
|
||||||
closeNoFlush(emitter);
|
closeNoFlush(emitter);
|
||||||
Assert.assertTrue(httpClient.succeeded());
|
Assert.assertTrue(httpClient.succeeded());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue