merging #95 - performance improvements
This commit is contained in:
commit
b8db8b0514
|
@ -35,6 +35,18 @@ public class ChannelBufferWrapper implements ActiveMQBuffer
|
||||||
protected ByteBuf buffer; // NO_UCD (use final)
|
protected ByteBuf buffer; // NO_UCD (use final)
|
||||||
private final boolean releasable;
|
private final boolean releasable;
|
||||||
|
|
||||||
|
public static ByteBuf unwrap(ByteBuf buffer)
|
||||||
|
{
|
||||||
|
ByteBuf parent;
|
||||||
|
while ((parent = buffer.unwrap()) != null &&
|
||||||
|
parent != buffer) // this last part is just in case the semantic
|
||||||
|
{ // ever changes where unwrap is returning itself
|
||||||
|
buffer = parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
|
||||||
public ChannelBufferWrapper(final ByteBuf buffer)
|
public ChannelBufferWrapper(final ByteBuf buffer)
|
||||||
{
|
{
|
||||||
this(buffer, false);
|
this(buffer, false);
|
||||||
|
|
|
@ -46,7 +46,9 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
|
||||||
|
|
||||||
public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message)
|
public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message)
|
||||||
{
|
{
|
||||||
super(buffer.byteBuf());
|
// a wrapped inside a wrapper will increase the stack size.
|
||||||
|
// we fixed this here due to some profiling testing
|
||||||
|
super(unwrap(buffer.byteBuf()));
|
||||||
|
|
||||||
this.limit = limit;
|
this.limit = limit;
|
||||||
|
|
||||||
|
|
|
@ -85,6 +85,4 @@ public interface Packet
|
||||||
* @return true if confirmation is required
|
* @return true if confirmation is required
|
||||||
*/
|
*/
|
||||||
boolean isRequiresConfirmations();
|
boolean isRequiresConfirmations();
|
||||||
|
|
||||||
boolean isAsyncExec();
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -482,7 +482,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
|
||||||
{
|
{
|
||||||
// no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
|
// no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
|
||||||
String handshake = "HORNETQ";
|
String handshake = "HORNETQ";
|
||||||
ActiveMQBuffer amqbuffer = connection.createBuffer(handshake.length());
|
ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length());
|
||||||
amqbuffer.writeBytes(handshake.getBytes());
|
amqbuffer.writeBytes(handshake.getBytes());
|
||||||
transportConnection.write(amqbuffer);
|
transportConnection.write(amqbuffer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -276,7 +276,7 @@ public class PacketImpl implements Packet
|
||||||
|
|
||||||
public ActiveMQBuffer encode(final RemotingConnection connection)
|
public ActiveMQBuffer encode(final RemotingConnection connection)
|
||||||
{
|
{
|
||||||
ActiveMQBuffer buffer = connection.createBuffer(PacketImpl.INITIAL_PACKET_SIZE);
|
ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE);
|
||||||
|
|
||||||
// The standard header fields
|
// The standard header fields
|
||||||
|
|
||||||
|
@ -333,11 +333,6 @@ public class PacketImpl implements Packet
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isAsyncExec()
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
|
|
|
@ -81,8 +81,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
||||||
|
|
||||||
private final Object failLock = new Object();
|
private final Object failLock = new Object();
|
||||||
|
|
||||||
private volatile boolean executing;
|
|
||||||
|
|
||||||
private final SimpleString nodeID;
|
private final SimpleString nodeID;
|
||||||
|
|
||||||
private String clientID;
|
private String clientID;
|
||||||
|
@ -381,39 +379,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
||||||
ActiveMQClientLogger.LOGGER.trace("handling packet " + packet);
|
ActiveMQClientLogger.LOGGER.trace("handling packet " + packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (packet.isAsyncExec() && executor != null)
|
dataReceived = true;
|
||||||
{
|
doBufferReceived(packet);
|
||||||
executing = true;
|
|
||||||
|
|
||||||
executor.execute(new Runnable()
|
|
||||||
{
|
|
||||||
public void run()
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
doBufferReceived(packet);
|
|
||||||
}
|
|
||||||
catch (Throwable t)
|
|
||||||
{
|
|
||||||
ActiveMQClientLogger.LOGGER.errorHandlingPacket(t, packet);
|
|
||||||
}
|
|
||||||
|
|
||||||
executing = false;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
//To prevent out of order execution if interleaving sync and async operations on same connection
|
|
||||||
while (executing)
|
|
||||||
{
|
|
||||||
Thread.yield();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pings must always be handled out of band so we can send pings back to the client quickly
|
|
||||||
// otherwise they would get in the queue with everything else which might give an intolerable delay
|
|
||||||
doBufferReceived(packet);
|
|
||||||
}
|
|
||||||
|
|
||||||
super.bufferReceived(connectionID, buffer);
|
super.bufferReceived(connectionID, buffer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,12 +69,6 @@ public class RollbackMessage extends PacketImpl
|
||||||
considerLastMessageAsDelivered = buffer.readBoolean();
|
considerLastMessageAsDelivered = buffer.readBoolean();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAsyncExec()
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode()
|
public int hashCode()
|
||||||
{
|
{
|
||||||
|
|
|
@ -49,10 +49,4 @@ public class SessionCloseMessage extends PacketImpl
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAsyncExec()
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,9 +30,4 @@ public class SessionCommitMessage extends PacketImpl
|
||||||
super(SESS_COMMIT);
|
super(SESS_COMMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAsyncExec()
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,12 +55,6 @@ public class SessionXACommitMessage extends PacketImpl
|
||||||
return onePhase;
|
return onePhase;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAsyncExec()
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void encodeRest(final ActiveMQBuffer buffer)
|
public void encodeRest(final ActiveMQBuffer buffer)
|
||||||
{
|
{
|
||||||
|
|
|
@ -61,12 +61,6 @@ public class SessionXAPrepareMessage extends PacketImpl
|
||||||
xid = XidCodecSupport.decodeXid(buffer);
|
xid = XidCodecSupport.decodeXid(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAsyncExec()
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode()
|
public int hashCode()
|
||||||
{
|
{
|
||||||
|
|
|
@ -62,12 +62,6 @@ public class SessionXARollbackMessage extends PacketImpl
|
||||||
xid = XidCodecSupport.decodeXid(buffer);
|
xid = XidCodecSupport.decodeXid(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAsyncExec()
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode()
|
public int hashCode()
|
||||||
{
|
{
|
||||||
|
|
|
@ -172,9 +172,9 @@ public class NettyConnection implements Connection
|
||||||
listener.connectionDestroyed(getID());
|
listener.connectionDestroyed(getID());
|
||||||
}
|
}
|
||||||
|
|
||||||
public ActiveMQBuffer createBuffer(final int size)
|
public ActiveMQBuffer createTransportBuffer(final int size)
|
||||||
{
|
{
|
||||||
return new ChannelBufferWrapper(channel.alloc().buffer(size));
|
return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object getID()
|
public Object getID()
|
||||||
|
@ -199,7 +199,7 @@ public class NettyConnection implements Connection
|
||||||
{
|
{
|
||||||
channel.writeAndFlush(batchBuffer.byteBuf());
|
channel.writeAndFlush(batchBuffer.byteBuf());
|
||||||
|
|
||||||
batchBuffer = createBuffer(BATCHING_BUFFER_SIZE);
|
batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
|
|
|
@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit;
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelDuplexHandler;
|
import io.netty.channel.ChannelDuplexHandler;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
|
@ -477,7 +476,7 @@ public class NettyConnector extends AbstractConnector
|
||||||
}
|
}
|
||||||
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
|
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
|
||||||
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
|
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
|
||||||
bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false));
|
bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
|
||||||
channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE);
|
channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE);
|
||||||
|
|
||||||
final SSLContext context;
|
final SSLContext context;
|
||||||
|
|
|
@ -182,9 +182,9 @@ public abstract class AbstractRemotingConnection implements RemotingConnection
|
||||||
closeListeners.addAll(listeners);
|
closeListeners.addAll(listeners);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ActiveMQBuffer createBuffer(final int size)
|
public ActiveMQBuffer createTransportBuffer(final int size)
|
||||||
{
|
{
|
||||||
return transportConnection.createBuffer(size);
|
return transportConnection.createTransportBuffer(size);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Connection getTransportConnection()
|
public Connection getTransportConnection()
|
||||||
|
|
|
@ -115,11 +115,12 @@ public interface RemotingConnection extends BufferHandler
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* creates a new ActiveMQBuffer of the specified size.
|
* creates a new ActiveMQBuffer of the specified size.
|
||||||
|
* For the purpose of i/o outgoing packets
|
||||||
*
|
*
|
||||||
* @param size the size of buffer required
|
* @param size the size of buffer required
|
||||||
* @return the buffer
|
* @return the buffer
|
||||||
*/
|
*/
|
||||||
ActiveMQBuffer createBuffer(int size);
|
ActiveMQBuffer createTransportBuffer(int size);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* called when the underlying connection fails.
|
* called when the underlying connection fails.
|
||||||
|
|
|
@ -36,7 +36,7 @@ public interface Connection
|
||||||
* @param size the size of buffer to create
|
* @param size the size of buffer to create
|
||||||
* @return the new buffer.
|
* @return the new buffer.
|
||||||
*/
|
*/
|
||||||
ActiveMQBuffer createBuffer(int size);
|
ActiveMQBuffer createTransportBuffer(int size);
|
||||||
|
|
||||||
|
|
||||||
RemotingConnection getProtocolConnection();
|
RemotingConnection getProtocolConnection();
|
||||||
|
|
|
@ -767,7 +767,7 @@ public class ActiveMQSession implements QueueSession, TopicSession
|
||||||
if (subscriptionName == null)
|
if (subscriptionName == null)
|
||||||
{
|
{
|
||||||
if (durability != ConsumerDurability.NON_DURABLE)
|
if (durability != ConsumerDurability.NON_DURABLE)
|
||||||
throw new RuntimeException();
|
throw new RuntimeException("Subscription name cannot be null for durable topic consumer");
|
||||||
// Non durable sub
|
// Non durable sub
|
||||||
|
|
||||||
queueName = new SimpleString(UUID.randomUUID().toString());
|
queueName = new SimpleString(UUID.randomUUID().toString());
|
||||||
|
@ -782,7 +782,7 @@ public class ActiveMQSession implements QueueSession, TopicSession
|
||||||
{
|
{
|
||||||
// Durable sub
|
// Durable sub
|
||||||
if (durability != ConsumerDurability.DURABLE)
|
if (durability != ConsumerDurability.DURABLE)
|
||||||
throw new RuntimeException();
|
throw new RuntimeException("Subscription name must be null for non-durable topic consumer");
|
||||||
if (connection.getClientID() == null)
|
if (connection.getClientID() == null)
|
||||||
{
|
{
|
||||||
throw new IllegalStateException("Cannot create durable subscription - client ID has not been set");
|
throw new IllegalStateException("Cannot create durable subscription - client ID has not been set");
|
||||||
|
|
|
@ -440,7 +440,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActiveMQBuffer createBuffer(int size)
|
public ActiveMQBuffer createTransportBuffer(int size)
|
||||||
{
|
{
|
||||||
return ActiveMQBuffers.dynamicBuffer(size);
|
return ActiveMQBuffers.dynamicBuffer(size);
|
||||||
}
|
}
|
||||||
|
|
|
@ -280,7 +280,7 @@ public final class StompConnection implements RemotingConnection
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActiveMQBuffer createBuffer(int size)
|
public ActiveMQBuffer createTransportBuffer(int size)
|
||||||
{
|
{
|
||||||
return ActiveMQBuffers.dynamicBuffer(size);
|
return ActiveMQBuffers.dynamicBuffer(size);
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,7 @@ public class InVMConnection implements Connection
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ActiveMQBuffer createBuffer(final int size)
|
public ActiveMQBuffer createTransportBuffer(final int size)
|
||||||
{
|
{
|
||||||
return ActiveMQBuffers.dynamicBuffer(size);
|
return ActiveMQBuffers.dynamicBuffer(size);
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ public class NettyServerConnection extends NettyConnection
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActiveMQBuffer createBuffer(int size)
|
public ActiveMQBuffer createTransportBuffer(int size)
|
||||||
{
|
{
|
||||||
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
|
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
|
||||||
}
|
}
|
||||||
|
|
7
pom.xml
7
pom.xml
|
@ -79,7 +79,12 @@
|
||||||
see https://intellij-support.jetbrains.com/entries/23395793
|
see https://intellij-support.jetbrains.com/entries/23395793
|
||||||
|
|
||||||
Also see: http://youtrack.jetbrains.com/issue/IDEA-125696
|
Also see: http://youtrack.jetbrains.com/issue/IDEA-125696
|
||||||
-->
|
|
||||||
|
|
||||||
|
For profiling add this line and use jmc (Java Mission Control) to evaluate the results:
|
||||||
|
-XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=delay=30s,duration=120s,filename=/tmp/myrecording.jfr
|
||||||
|
|
||||||
|
-->
|
||||||
|
|
||||||
<activemq-surefire-argline>-Djava.util.logging.manager=org.jboss.logmanager.LogManager
|
<activemq-surefire-argline>-Djava.util.logging.manager=org.jboss.logmanager.LogManager
|
||||||
-Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties
|
-Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties
|
||||||
|
|
|
@ -44,9 +44,11 @@ import org.apache.activemq.tests.util.SpawnedVMSupport;
|
||||||
*/
|
*/
|
||||||
public class ClientCrashTest extends ClientTestBase
|
public class ClientCrashTest extends ClientTestBase
|
||||||
{
|
{
|
||||||
static final int PING_PERIOD = 2000;
|
// using short values so this test can run fast
|
||||||
|
static final int PING_PERIOD = 100;
|
||||||
|
|
||||||
static final int CONNECTION_TTL = 6000;
|
// using short values so this test can run fast
|
||||||
|
static final int CONNECTION_TTL = 1000;
|
||||||
|
|
||||||
// Constants -----------------------------------------------------
|
// Constants -----------------------------------------------------
|
||||||
|
|
||||||
|
@ -76,18 +78,22 @@ public class ClientCrashTest extends ClientTestBase
|
||||||
{
|
{
|
||||||
assertActiveConnections(1);
|
assertActiveConnections(1);
|
||||||
|
|
||||||
// spawn a JVM that creates a Core client, which sends a message
|
|
||||||
Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName());
|
|
||||||
|
|
||||||
ClientSession session = sf.createSession(false, true, true);
|
ClientSession session = sf.createSession(false, true, true);
|
||||||
session.createQueue(ClientCrashTest.QUEUE, ClientCrashTest.QUEUE, null, false);
|
session.createQueue(ClientCrashTest.QUEUE, ClientCrashTest.QUEUE, null, false);
|
||||||
|
|
||||||
|
// spawn a JVM that creates a Core client, which sends a message
|
||||||
|
// It has to be spawned after the queue was created.
|
||||||
|
// if the client is too fast you race the send before the queue was created, missing a message
|
||||||
|
Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName());
|
||||||
|
|
||||||
ClientConsumer consumer = session.createConsumer(ClientCrashTest.QUEUE);
|
ClientConsumer consumer = session.createConsumer(ClientCrashTest.QUEUE);
|
||||||
ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE);
|
ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE);
|
||||||
|
|
||||||
|
|
||||||
session.start();
|
session.start();
|
||||||
|
|
||||||
// receive a message from the queue
|
// receive a message from the queue
|
||||||
Message messageFromClient = consumer.receive(500000);
|
Message messageFromClient = consumer.receive(5000);
|
||||||
Assert.assertNotNull("no message received", messageFromClient);
|
Assert.assertNotNull("no message received", messageFromClient);
|
||||||
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
|
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
|
||||||
|
|
||||||
|
@ -155,7 +161,7 @@ public class ClientCrashTest extends ClientTestBase
|
||||||
session.start();
|
session.start();
|
||||||
|
|
||||||
// receive a message from the queue
|
// receive a message from the queue
|
||||||
ClientMessage messageFromClient = consumer.receive(10000);
|
ClientMessage messageFromClient = consumer.receive(timeout);
|
||||||
Assert.assertNotNull("no message received", messageFromClient);
|
Assert.assertNotNull("no message received", messageFromClient);
|
||||||
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
|
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
|
||||||
|
|
||||||
|
|
|
@ -59,8 +59,9 @@ public class CrashClient
|
||||||
ClientSession session = sf.createSession(false, true, true);
|
ClientSession session = sf.createSession(false, true, true);
|
||||||
ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE);
|
ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE);
|
||||||
|
|
||||||
|
// it has to be durable otherwise it may race dying before the client is killed
|
||||||
ClientMessage message = session.createMessage(ActiveMQTextMessage.TYPE,
|
ClientMessage message = session.createMessage(ActiveMQTextMessage.TYPE,
|
||||||
false,
|
true,
|
||||||
0,
|
0,
|
||||||
System.currentTimeMillis(),
|
System.currentTimeMillis(),
|
||||||
(byte)1);
|
(byte)1);
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class CrashClient2
|
||||||
ClientSession session = sf.createSession(true, true, 1000000);
|
ClientSession session = sf.createSession(true, true, 1000000);
|
||||||
ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE2);
|
ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE2);
|
||||||
|
|
||||||
ClientMessage message = session.createMessage(false);
|
ClientMessage message = session.createMessage(true);
|
||||||
message.getBodyBuffer().writeString(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT);
|
message.getBodyBuffer().writeString(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT);
|
||||||
|
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
|
|
@ -1391,7 +1391,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase
|
||||||
}
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
/* (non-Javadoc)
|
||||||
* @see org.apache.activemq.api.core.client.ClientSession#createBuffer(byte[])
|
* @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(byte[])
|
||||||
*/
|
*/
|
||||||
public ActiveMQBuffer createBuffer(final byte[] bytes)
|
public ActiveMQBuffer createBuffer(final byte[] bytes)
|
||||||
{
|
{
|
||||||
|
@ -1400,7 +1400,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase
|
||||||
}
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
/* (non-Javadoc)
|
||||||
* @see org.apache.activemq.api.core.client.ClientSession#createBuffer(int)
|
* @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(int)
|
||||||
*/
|
*/
|
||||||
public ActiveMQBuffer createBuffer(final int size)
|
public ActiveMQBuffer createBuffer(final int size)
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,247 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.tests.performance.sends;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.activemq.api.core.TransportConfiguration;
|
||||||
|
import org.apache.activemq.api.jms.ActiveMQJMSClient;
|
||||||
|
import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
|
||||||
|
import org.apache.activemq.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.tests.util.JMSTestBase;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Client-ack time
|
||||||
|
*
|
||||||
|
* @author Clebert Suconic
|
||||||
|
*/
|
||||||
|
public abstract class AbstractSendReceivePerfTest extends JMSTestBase
|
||||||
|
{
|
||||||
|
protected static final String Q_NAME = "test-queue-01";
|
||||||
|
private Queue queue;
|
||||||
|
|
||||||
|
protected AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception
|
||||||
|
{
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
jmsServer.createQueue(false, Q_NAME, null, true, Q_NAME);
|
||||||
|
queue = ActiveMQJMSClient.createQueue(Q_NAME);
|
||||||
|
|
||||||
|
AddressSettings settings = new AddressSettings();
|
||||||
|
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
|
||||||
|
settings.setMaxSizeBytes(Long.MAX_VALUE);
|
||||||
|
server.getAddressSettingsRepository().clear();
|
||||||
|
server.getAddressSettingsRepository().addMatch("#", settings);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void registerConnectionFactory() throws Exception
|
||||||
|
{
|
||||||
|
List<TransportConfiguration> connectorConfigs = new ArrayList<TransportConfiguration>();
|
||||||
|
connectorConfigs.add(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
|
||||||
|
|
||||||
|
createCF(connectorConfigs, "/cf");
|
||||||
|
|
||||||
|
cf = (ConnectionFactory) namingContext.lookup("/cf");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static final java.util.logging.Logger LOGGER = java.util.logging.Logger.getLogger(AbstractSendReceivePerfTest.class.getName());
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendReceive() throws Exception
|
||||||
|
{
|
||||||
|
long numberOfSamples = Long.getLong("HORNETQ_TEST_SAMPLES", 1000);
|
||||||
|
|
||||||
|
|
||||||
|
MessageReceiver receiver = new MessageReceiver(Q_NAME, numberOfSamples);
|
||||||
|
receiver.start();
|
||||||
|
MessageSender sender = new MessageSender(Q_NAME);
|
||||||
|
sender.start();
|
||||||
|
|
||||||
|
receiver.join();
|
||||||
|
sender.join();
|
||||||
|
|
||||||
|
assertFalse(receiver.failed);
|
||||||
|
assertFalse(sender.failed);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
final Semaphore pendingCredit = new Semaphore(5000);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* to be called after a message is consumed
|
||||||
|
* so the flow control of the test kicks in.
|
||||||
|
*/
|
||||||
|
protected final void afterConsume(Message message)
|
||||||
|
{
|
||||||
|
if (message != null)
|
||||||
|
{
|
||||||
|
pendingCredit.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected final void beforeSend()
|
||||||
|
{
|
||||||
|
while (running.get())
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (pendingCredit.tryAcquire(1, TimeUnit.SECONDS))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
System.out.println("Couldn't get credits!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Throwable e)
|
||||||
|
{
|
||||||
|
throw new RuntimeException(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private class MessageReceiver extends Thread
|
||||||
|
{
|
||||||
|
private final String qName;
|
||||||
|
private final long numberOfSamples;
|
||||||
|
|
||||||
|
public boolean failed = false;
|
||||||
|
|
||||||
|
public MessageReceiver(String qname, long numberOfSamples) throws Exception
|
||||||
|
{
|
||||||
|
super("Receiver " + qname);
|
||||||
|
this.qName = qname;
|
||||||
|
this.numberOfSamples = numberOfSamples;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
LOGGER.info("Receiver: Connecting");
|
||||||
|
Connection c = cf.createConnection();
|
||||||
|
|
||||||
|
consumeMessages(c, qName);
|
||||||
|
|
||||||
|
c.close();
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
failed = true;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
running.set(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void consumeMessages(Connection c, String qName) throws Exception;
|
||||||
|
|
||||||
|
private class MessageSender extends Thread
|
||||||
|
{
|
||||||
|
protected String qName;
|
||||||
|
|
||||||
|
public boolean failed = false;
|
||||||
|
|
||||||
|
public MessageSender(String qname) throws Exception
|
||||||
|
{
|
||||||
|
super("Sender " + qname);
|
||||||
|
|
||||||
|
this.qName = qname;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
LOGGER.info("Sender: Connecting");
|
||||||
|
Connection c = cf.createConnection();
|
||||||
|
|
||||||
|
sendMessages(c, qName);
|
||||||
|
|
||||||
|
c.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
failed = true;
|
||||||
|
if (e instanceof InterruptedException)
|
||||||
|
{
|
||||||
|
LOGGER.info("Sender done.");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This will by default send non persistent messages */
|
||||||
|
protected void sendMessages(Connection c, String qName) throws JMSException
|
||||||
|
{
|
||||||
|
Session s = null;
|
||||||
|
s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
LOGGER.info("Sender: Using AUTO-ACK session");
|
||||||
|
|
||||||
|
|
||||||
|
Queue q = s.createQueue(qName);
|
||||||
|
MessageProducer producer = s.createProducer(null);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||||
|
|
||||||
|
|
||||||
|
long sent = 0;
|
||||||
|
while (running.get())
|
||||||
|
{
|
||||||
|
beforeSend();
|
||||||
|
producer.send(q, s.createTextMessage("Message_" + (sent++)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,130 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.tests.performance.sends;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author clebertsuconic
|
||||||
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class ClientACKPerf extends AbstractSendReceivePerfTest
|
||||||
|
{
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "batchSize={0}")
|
||||||
|
public static Collection<Object[]> data()
|
||||||
|
{
|
||||||
|
List<Object[]> list = Arrays.asList(new Object[][]{
|
||||||
|
{1},
|
||||||
|
{2000}});
|
||||||
|
|
||||||
|
System.out.println("Size = " + list.size());
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClientACKPerf(int batchSize)
|
||||||
|
{
|
||||||
|
super();
|
||||||
|
this.batchSize = batchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public final int batchSize;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void consumeMessages(Connection c, String qName) throws Exception
|
||||||
|
{
|
||||||
|
int mode = 0;
|
||||||
|
mode = Session.CLIENT_ACKNOWLEDGE;
|
||||||
|
|
||||||
|
System.out.println("Receiver: Using PRE-ACK mode");
|
||||||
|
|
||||||
|
Session s = c.createSession(false, mode);
|
||||||
|
Queue q = s.createQueue(qName);
|
||||||
|
MessageConsumer consumer = s.createConsumer(q, null, false);
|
||||||
|
|
||||||
|
c.start();
|
||||||
|
|
||||||
|
Message m = null;
|
||||||
|
|
||||||
|
long totalTimeACKTime = 0;
|
||||||
|
|
||||||
|
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
|
||||||
|
long nmessages = 0;
|
||||||
|
long timeout = System.currentTimeMillis() + 60 * 1000;
|
||||||
|
while (timeout > System.currentTimeMillis())
|
||||||
|
{
|
||||||
|
m = consumer.receive(5000);
|
||||||
|
afterConsume(m);
|
||||||
|
|
||||||
|
|
||||||
|
if (m == null)
|
||||||
|
{
|
||||||
|
throw new Exception("Failed with m = null");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nmessages++ % batchSize == 0)
|
||||||
|
{
|
||||||
|
long startACK = System.nanoTime();
|
||||||
|
m.acknowledge();
|
||||||
|
long endACK = System.nanoTime();
|
||||||
|
totalTimeACKTime += (endACK - startACK);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (nmessages % 10000 == 0)
|
||||||
|
{
|
||||||
|
printMsgsSec(start, nmessages, totalTimeACKTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
printMsgsSec(start, nmessages, totalTimeACKTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
protected void printMsgsSec(final long start, final double nmessages, final double totalTimeACKTime)
|
||||||
|
{
|
||||||
|
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
double elapsed = ((double) end - (double) start) / 1000f;
|
||||||
|
|
||||||
|
double messagesPerSecond = nmessages / elapsed;
|
||||||
|
double nAcks = nmessages / batchSize;
|
||||||
|
|
||||||
|
System.out.println("batchSize=" + batchSize + ", numberOfMessages="
|
||||||
|
+ nmessages + ", elapsedTime=" + elapsed + " msgs/sec= " + messagesPerSecond + ",totalTimeAcking=" + String.format("%10.4f", totalTimeACKTime) +
|
||||||
|
", avgACKTime=" + String.format("%10.4f", (totalTimeACKTime / nAcks)));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,81 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.tests.performance.sends;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author clebertsuconic
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class MeasureCommitPerfTest extends AbstractSendReceivePerfTest
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void consumeMessages(Connection c, String qName) throws Exception
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* This will by default send non persistent messages */
|
||||||
|
protected void sendMessages(Connection c, String qName) throws JMSException
|
||||||
|
{
|
||||||
|
Session s = c.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
|
||||||
|
|
||||||
|
long timeout = System.currentTimeMillis() + 30 * 1000;
|
||||||
|
|
||||||
|
long startMeasure = System.currentTimeMillis() + 5000;
|
||||||
|
long start = 0;
|
||||||
|
long committs = 0;
|
||||||
|
while (timeout > System.currentTimeMillis())
|
||||||
|
{
|
||||||
|
|
||||||
|
if (start == 0 && System.currentTimeMillis() > startMeasure)
|
||||||
|
{
|
||||||
|
System.out.println("heat up");
|
||||||
|
start = System.currentTimeMillis();
|
||||||
|
committs = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
s.commit();
|
||||||
|
committs++;
|
||||||
|
if (start > 0 && committs % 1000 == 0) printCommitsSecond(start, committs);
|
||||||
|
}
|
||||||
|
printCommitsSecond(start, committs);
|
||||||
|
|
||||||
|
s.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void printCommitsSecond(final long start, final double committs)
|
||||||
|
{
|
||||||
|
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
double elapsed = ((double) end - (double) start) / 1000f;
|
||||||
|
|
||||||
|
double commitsPerSecond = committs / elapsed;
|
||||||
|
|
||||||
|
System.out.println("end = " + end + ", start=" + start + ", numberOfMessages="
|
||||||
|
+ committs + ", elapsed=" + elapsed + " msgs/sec= " + commitsPerSecond);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,93 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.tests.performance.sends;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.api.jms.ActiveMQJMSConstants;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author clebertsuconic
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class PreACKPerf extends AbstractSendReceivePerfTest
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void consumeMessages(Connection c, String qName) throws Exception
|
||||||
|
{
|
||||||
|
int mode = 0;
|
||||||
|
mode = ActiveMQJMSConstants.PRE_ACKNOWLEDGE;
|
||||||
|
|
||||||
|
System.out.println("Receiver: Using PRE-ACK mode");
|
||||||
|
|
||||||
|
Session s = c.createSession(false, mode);
|
||||||
|
Queue q = s.createQueue(qName);
|
||||||
|
MessageConsumer consumer = s.createConsumer(q, null, false);
|
||||||
|
|
||||||
|
c.start();
|
||||||
|
|
||||||
|
Message m = null;
|
||||||
|
|
||||||
|
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
|
||||||
|
long nmessages = 0;
|
||||||
|
long timeout = System.currentTimeMillis() + 30 * 1000;
|
||||||
|
while (timeout > System.currentTimeMillis())
|
||||||
|
{
|
||||||
|
m = consumer.receive(5000);
|
||||||
|
|
||||||
|
nmessages++;
|
||||||
|
|
||||||
|
if (m == null)
|
||||||
|
{
|
||||||
|
throw new Exception("Failed with m = null");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nmessages % 10000 == 0)
|
||||||
|
{
|
||||||
|
printMsgsSec(start, nmessages);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
|
||||||
|
printMsgsSec(start, nmessages);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
protected void printMsgsSec(final long start, final double nmessages)
|
||||||
|
{
|
||||||
|
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
double elapsed = ((double) end - (double) start) / 1000f;
|
||||||
|
|
||||||
|
double messagesPerSecond = nmessages / elapsed;
|
||||||
|
|
||||||
|
System.out.println("end = " + end + ", start=" + start + ", numberOfMessages="
|
||||||
|
+ nmessages + ", elapsed=" + elapsed + " msgs/sec= " + messagesPerSecond);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -74,7 +74,7 @@ public class NettyConnectionTest extends UnitTestCase
|
||||||
|
|
||||||
final int size = 1234;
|
final int size = 1234;
|
||||||
|
|
||||||
ActiveMQBuffer buff = conn.createBuffer(size);
|
ActiveMQBuffer buff = conn.createTransportBuffer(size);
|
||||||
buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization.
|
buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization.
|
||||||
Assert.assertEquals(size, buff.capacity());
|
Assert.assertEquals(size, buff.capacity());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue