Issue #472 Use LongAdder for statistics

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2016-04-01 17:22:45 +11:00
parent 77a6514063
commit 91bef55924
5 changed files with 160 additions and 79 deletions

View File

@ -24,8 +24,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
@ -54,8 +54,8 @@ public class ConnectorStatistics extends AbstractLifeCycle implements Dumpable,
private final SampleStatistic _messagesOut = new SampleStatistic();
private final SampleStatistic _connectionDurationStats = new SampleStatistic();
private final ConcurrentMap<Connection, Sample> _samples = new ConcurrentHashMap<>();
private final AtomicInteger _closedIn = new AtomicInteger();
private final AtomicInteger _closedOut = new AtomicInteger();
private final LongAdder _closedIn = new LongAdder();
private final LongAdder _closedOut = new LongAdder();
private AtomicLong _nanoStamp=new AtomicLong();
private volatile int _messagesInPerSecond;
private volatile int _messagesOutPerSecond;
@ -85,8 +85,8 @@ public class ConnectorStatistics extends AbstractLifeCycle implements Dumpable,
Sample sample=_samples.remove(connection);
if (sample!=null)
{
_closedIn.addAndGet(msgsIn-sample._messagesIn);
_closedOut.addAndGet(msgsOut-sample._messagesOut);
_closedIn.add(msgsIn-sample._messagesIn);
_closedOut.add(msgsOut-sample._messagesOut);
}
}
}
@ -267,8 +267,8 @@ public class ConnectorStatistics extends AbstractLifeCycle implements Dumpable,
{
if (_nanoStamp.compareAndSet(then,now))
{
long msgsIn=_closedIn.getAndSet(0);
long msgsOut=_closedOut.getAndSet(0);
long msgsIn=_closedIn.sumThenReset();
long msgsOut=_closedOut.sumThenReset();
for (Map.Entry<Connection, Sample> entry : _samples.entrySet())
{

View File

@ -22,9 +22,9 @@ import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
@ -60,15 +60,15 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
private final SampleStatistic _dispatchedTimeStats = new SampleStatistic();
private final CounterStatistic _asyncWaitStats = new CounterStatistic();
private final AtomicInteger _asyncDispatches = new AtomicInteger();
private final AtomicInteger _expires = new AtomicInteger();
private final LongAdder _asyncDispatches = new LongAdder();
private final LongAdder _expires = new LongAdder();
private final AtomicInteger _responses1xx = new AtomicInteger();
private final AtomicInteger _responses2xx = new AtomicInteger();
private final AtomicInteger _responses3xx = new AtomicInteger();
private final AtomicInteger _responses4xx = new AtomicInteger();
private final AtomicInteger _responses5xx = new AtomicInteger();
private final AtomicLong _responsesTotalBytes = new AtomicLong();
private final LongAdder _responses1xx = new LongAdder();
private final LongAdder _responses2xx = new LongAdder();
private final LongAdder _responses3xx = new LongAdder();
private final LongAdder _responses4xx = new LongAdder();
private final LongAdder _responses5xx = new LongAdder();
private final LongAdder _responsesTotalBytes = new LongAdder();
private final AtomicReference<FutureCallback> _shutdown=new AtomicReference<>();
@ -79,7 +79,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@Override
public void onTimeout(AsyncEvent event) throws IOException
{
_expires.incrementAndGet();
_expires.increment();
}
@Override
@ -132,14 +132,14 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
_dispatchedTimeStats.reset();
_asyncWaitStats.reset();
_asyncDispatches.set(0);
_expires.set(0);
_responses1xx.set(0);
_responses2xx.set(0);
_responses3xx.set(0);
_responses4xx.set(0);
_responses5xx.set(0);
_responsesTotalBytes.set(0L);
_asyncDispatches.reset();
_expires.reset();
_responses1xx.reset();
_responses2xx.reset();
_responses3xx.reset();
_responses4xx.reset();
_responses5xx.reset();
_responsesTotalBytes.reset();
}
@Override
@ -159,7 +159,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
{
// resumed request
start = System.currentTimeMillis();
_asyncDispatches.incrementAndGet();
_asyncDispatches.increment();
}
try
@ -221,19 +221,19 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
switch (response.getStatus() / 100)
{
case 1:
_responses1xx.incrementAndGet();
_responses1xx.increment();
break;
case 2:
_responses2xx.incrementAndGet();
_responses2xx.increment();
break;
case 3:
_responses3xx.incrementAndGet();
_responses3xx.increment();
break;
case 4:
_responses4xx.incrementAndGet();
_responses4xx.increment();
break;
case 5:
_responses5xx.incrementAndGet();
_responses5xx.increment();
break;
default:
break;
@ -241,8 +241,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
}
else
// will fall through to not found handler
_responses4xx.incrementAndGet();
_responsesTotalBytes.addAndGet(response.getContentCount());
_responses4xx.increment();
_responsesTotalBytes.add(response.getContentCount());
}
@Override
@ -454,7 +454,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@ManagedAttribute("number of requested that have been asynchronously dispatched")
public int getAsyncDispatches()
{
return _asyncDispatches.get();
return _asyncDispatches.intValue();
}
/**
@ -464,7 +464,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@ManagedAttribute("number of async requests requests that have expired")
public int getExpires()
{
return _expires.get();
return _expires.intValue();
}
/**
@ -474,7 +474,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@ManagedAttribute("number of requests with 1xx response status")
public int getResponses1xx()
{
return _responses1xx.get();
return _responses1xx.intValue();
}
/**
@ -484,7 +484,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@ManagedAttribute("number of requests with 2xx response status")
public int getResponses2xx()
{
return _responses2xx.get();
return _responses2xx.intValue();
}
/**
@ -494,7 +494,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@ManagedAttribute("number of requests with 3xx response status")
public int getResponses3xx()
{
return _responses3xx.get();
return _responses3xx.intValue();
}
/**
@ -504,7 +504,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@ManagedAttribute("number of requests with 4xx response status")
public int getResponses4xx()
{
return _responses4xx.get();
return _responses4xx.intValue();
}
/**
@ -514,7 +514,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@ManagedAttribute("number of requests with 5xx response status")
public int getResponses5xx()
{
return _responses5xx.get();
return _responses5xx.intValue();
}
/**
@ -532,7 +532,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@ManagedAttribute("total number of bytes across all responses")
public long getResponsesBytesTotal()
{
return _responsesTotalBytes.get();
return _responsesTotalBytes.longValue();
}
public String toStatsHTML()

View File

@ -19,9 +19,8 @@
package org.eclipse.jetty.util.statistic;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.Atomics;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
/* ------------------------------------------------------------ */
/** Statistics on a counter value.
@ -33,30 +32,30 @@ import org.eclipse.jetty.util.Atomics;
*/
public class CounterStatistic
{
protected final AtomicLong _max = new AtomicLong();
protected final AtomicLong _curr = new AtomicLong();
protected final AtomicLong _total = new AtomicLong();
protected final LongAccumulator _max = new LongAccumulator(Math::max,0L);
protected final AtomicLong _current = new AtomicLong();
protected final LongAdder _total = new LongAdder();
/* ------------------------------------------------------------ */
public void reset()
{
_total.set(0);
_max.set(0);
long current=_curr.get();
_total.addAndGet(current);
Atomics.updateMax(_max,current);
_total.reset();
_max.reset();
long current=_current.get();
_total.add(current);
_max.accumulate(current);
}
/* ------------------------------------------------------------ */
public void reset(final long value)
{
_total.set(0);
_max.set(0);
_curr.set(value);
_current.set(value);
_total.reset();
_max.reset();
if (value>0)
{
_total.addAndGet(value);
Atomics.updateMax(_max,value);
_total.add(value);
_max.accumulate(value);
}
}
@ -67,11 +66,11 @@ public class CounterStatistic
*/
public long add(final long delta)
{
long value=_curr.addAndGet(delta);
long value=_current.addAndGet(delta);
if (delta > 0)
{
_total.addAndGet(delta);
Atomics.updateMax(_max,value);
_total.add(delta);
_max.accumulate(value);
}
return value;
}
@ -83,7 +82,10 @@ public class CounterStatistic
*/
public long increment()
{
return add(1);
long value=_current.incrementAndGet();
_total.increment();
_max.accumulate(value);
return value;
}
/* ------------------------------------------------------------ */
@ -93,7 +95,7 @@ public class CounterStatistic
*/
public long decrement()
{
return add(-1);
return _current.decrementAndGet();
}
/* ------------------------------------------------------------ */
@ -111,7 +113,7 @@ public class CounterStatistic
*/
public long getCurrent()
{
return _curr.get();
return _current.get();
}
/* ------------------------------------------------------------ */
@ -120,13 +122,13 @@ public class CounterStatistic
*/
public long getTotal()
{
return _total.get();
return _total.sum();
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return String.format("%s@%x{c=%d,m=%d,t=%d}",this.getClass().getSimpleName(),hashCode(),_curr.get(),_max.get(),_total.get());
return String.format("%s@%x{c=%d,m=%d,t=%d}",this.getClass().getSimpleName(),hashCode(),_current.get(),_max.get(),_total.sum());
}
}

View File

@ -19,6 +19,8 @@
package org.eclipse.jetty.util.statistic;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.util.Atomics;
@ -40,17 +42,17 @@ import org.eclipse.jetty.util.Atomics;
*/
public class SampleStatistic
{
protected final AtomicLong _max = new AtomicLong();
protected final LongAccumulator _max = new LongAccumulator(Math::max,0L);
protected final AtomicLong _total = new AtomicLong();
protected final AtomicLong _count = new AtomicLong();
protected final AtomicLong _totalVariance100 = new AtomicLong();
protected final LongAdder _totalVariance100 = new LongAdder();
public void reset()
{
_max.set(0);
_max.reset();
_total.set(0);
_count.set(0);
_totalVariance100.set(0);
_totalVariance100.reset();
}
public void set(final long sample)
@ -62,10 +64,10 @@ public class SampleStatistic
{
long mean10 = total*10/count;
long delta10 = sample*10 - mean10;
_totalVariance100.addAndGet(delta10*delta10);
_totalVariance100.add(delta10*delta10);
}
Atomics.updateMax(_max, sample);
_max.accumulate(sample);
}
/**
@ -93,7 +95,7 @@ public class SampleStatistic
public double getVariance()
{
final long variance100 = _totalVariance100.get();
final long variance100 = _totalVariance100.sum();
final long count = _count.get();
return count>1?((double)variance100)/100.0/(count-1):0.0;
@ -108,6 +110,6 @@ public class SampleStatistic
@Override
public String toString()
{
return String.format("%s@%x{c=%d,m=%d,t=%d,v100=%d}",this.getClass().getSimpleName(),hashCode(),_count.get(),_max.get(),_total.get(),_totalVariance100.get());
return String.format("%s@%x{c=%d,m=%d,t=%d,v100=%d}",this.getClass().getSimpleName(),hashCode(),_count.get(),_max.get(),_total.get(),_totalVariance100.sum());
}
}

View File

@ -19,13 +19,13 @@
package org.eclipse.jetty.util.statistic;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
import org.hamcrest.Matchers;
import org.junit.Assert;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import org.junit.Test;
@ -77,4 +77,81 @@ public class CounterStatisticTest
assertThat(count.getTotal(),equalTo(5L));
}
@Test
public void testCounterContended()
throws Exception
{
final CounterStatistic counter = new CounterStatistic();
final int N=100;
final int L=1000;
final Thread[] threads = new Thread[N];
final CyclicBarrier incBarrier = new CyclicBarrier(N);
final CountDownLatch decBarrier = new CountDownLatch(N/2);
for (int i=N;i-->0;)
{
final int I = i;
threads[i]=(i>=N/2)
?new Thread()
{
@Override
public void run()
{
try
{
incBarrier.await();
decBarrier.await();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
Random random = new Random();
for (int l=L;l-->0;)
{
counter.decrement();
if (random.nextInt(5)==0)
Thread.yield();
}
}
}
:new Thread()
{
@Override
public void run()
{
try
{
incBarrier.await();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
Random random = new Random();
for (int l=L;l-->0;)
{
counter.increment();
if (l==L/2)
decBarrier.countDown();
if (random.nextInt(5)==0)
Thread.yield();
}
}
};
threads[i].start();
}
for (int i=N;i-->0;)
threads[i].join();
assertThat(counter.getCurrent(),equalTo(0L));
assertThat(counter.getTotal(),equalTo(N*L/2L));
assertThat(counter.getMax(),greaterThanOrEqualTo((N/2)*(L/2L)));
}
}