ARTEMIS-2685 Not Block Netty Thread in any way for OpenWire
This commit is contained in:
parent
0624b08b77
commit
bd77a536c6
|
@ -109,7 +109,7 @@ public class ServerUtil {
|
||||||
System.out.println("**********************************");
|
System.out.println("**********************************");
|
||||||
System.out.println("Killing server " + server);
|
System.out.println("Killing server " + server);
|
||||||
System.out.println("**********************************");
|
System.out.println("**********************************");
|
||||||
server.destroy();
|
server.destroyForcibly();
|
||||||
server.waitFor();
|
server.waitFor();
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,7 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
|
import org.apache.activemq.artemis.utils.actors.Actor;
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
@ -191,6 +192,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
private ConnectionEntry connectionEntry;
|
private ConnectionEntry connectionEntry;
|
||||||
private boolean useKeepAlive;
|
private boolean useKeepAlive;
|
||||||
private long maxInactivityDuration;
|
private long maxInactivityDuration;
|
||||||
|
private Actor<Command> openWireActor;
|
||||||
|
|
||||||
private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
|
private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
|
||||||
|
|
||||||
|
@ -270,10 +272,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
@Override
|
@Override
|
||||||
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
|
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
|
||||||
super.bufferReceived(connectionID, buffer);
|
super.bufferReceived(connectionID, buffer);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
recoverOperationContext();
|
|
||||||
|
|
||||||
Command command = (Command) inWireFormat.unmarshal(buffer);
|
Command command = (Command) inWireFormat.unmarshal(buffer);
|
||||||
|
|
||||||
// log the openwire command
|
// log the openwire command
|
||||||
|
@ -281,6 +281,23 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
traceBufferReceived(connectionID, command);
|
traceBufferReceived(connectionID, command);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (openWireActor != null) {
|
||||||
|
openWireActor.act(command);
|
||||||
|
} else {
|
||||||
|
act(command);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
ActiveMQServerLogger.LOGGER.debug(e);
|
||||||
|
sendException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void act(Command command) {
|
||||||
|
try {
|
||||||
|
recoverOperationContext();
|
||||||
|
|
||||||
boolean responseRequired = command.isResponseRequired();
|
boolean responseRequired = command.isResponseRequired();
|
||||||
int commandId = command.getCommandId();
|
int commandId = command.getCommandId();
|
||||||
|
|
||||||
|
@ -734,6 +751,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
|
|
||||||
createInternalSession(info);
|
createInternalSession(info);
|
||||||
|
|
||||||
|
// the actor can only be used after the WireFormat has been initialized with versioning
|
||||||
|
this.openWireActor = new Actor<>(executor, this::act);
|
||||||
|
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,15 +25,45 @@ import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
||||||
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
|
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
|
||||||
|
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class SoakPagingTest extends SmokeTestBase {
|
public class SoakPagingTest extends SmokeTestBase {
|
||||||
|
|
||||||
|
String protocol;
|
||||||
|
String consumerType;
|
||||||
|
boolean transaction;
|
||||||
|
final String destination;
|
||||||
|
|
||||||
|
public SoakPagingTest(String protocol, String consumerType, boolean transaction) {
|
||||||
|
this.protocol = protocol;
|
||||||
|
this.consumerType = consumerType;
|
||||||
|
this.transaction = transaction;
|
||||||
|
|
||||||
|
if (consumerType.equals("queue")) {
|
||||||
|
destination = "exampleQueue";
|
||||||
|
} else {
|
||||||
|
destination = "exampleTopic";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "protocol={0}, type={1}, tx={2}")
|
||||||
|
public static Collection<Object[]> getParams() {
|
||||||
|
return Arrays.asList(new Object[][]{{"AMQP", "shared", false}, {"AMQP", "queue", false}, {"OPENWIRE", "topic", false}, {"OPENWIRE", "queue", false}, {"CORE", "shared", false}, {"CORE", "queue", false},
|
||||||
|
{"AMQP", "shared", true}, {"AMQP", "queue", true}, {"OPENWIRE", "topic", true}, {"OPENWIRE", "queue", true}, {"CORE", "shared", true}, {"CORE", "queue", true}});
|
||||||
|
}
|
||||||
|
|
||||||
public static final String SERVER_NAME_0 = "replicated-static0";
|
public static final String SERVER_NAME_0 = "replicated-static0";
|
||||||
public static final String SERVER_NAME_1 = "replicated-static1";
|
public static final String SERVER_NAME_1 = "replicated-static1";
|
||||||
|
|
||||||
|
@ -48,27 +78,55 @@ public class SoakPagingTest extends SmokeTestBase {
|
||||||
cleanupData(SERVER_NAME_1);
|
cleanupData(SERVER_NAME_1);
|
||||||
|
|
||||||
server0 = startServer(SERVER_NAME_0, 0, 30000);
|
server0 = startServer(SERVER_NAME_0, 0, 30000);
|
||||||
server1 = startServer(SERVER_NAME_1, 0, 30000);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final String destination = "exampleTopic";
|
|
||||||
static final int consumer_threads = 20;
|
static final int consumer_threads = 20;
|
||||||
static final int producer_threads = 20;
|
static final int producer_threads = 20;
|
||||||
static AtomicInteger j = new AtomicInteger(0);
|
static AtomicInteger j = new AtomicInteger(0);
|
||||||
|
|
||||||
|
private static ConnectionFactory createConnectionFactory(String protocol, String uri) {
|
||||||
|
if (protocol.toUpperCase().equals("OPENWIRE")) {
|
||||||
|
return new org.apache.activemq.ActiveMQConnectionFactory(uri);
|
||||||
|
} else if (protocol.toUpperCase().equals("AMQP")) {
|
||||||
|
|
||||||
|
if (uri.startsWith("tcp://")) {
|
||||||
|
// replacing tcp:// by amqp://
|
||||||
|
uri = "amqp" + uri.substring(3);
|
||||||
|
}
|
||||||
|
return new JmsConnectionFactory(uri);
|
||||||
|
} else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) {
|
||||||
|
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri);
|
||||||
|
} else {
|
||||||
|
throw new IllegalStateException("Unkown:" + protocol);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] arg) {
|
public static void main(String[] arg) {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
if (arg.length != 4) {
|
||||||
|
System.err.println("You need to pass in protocol, consumerType, Time, transaction");
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
String protocol = arg[0];
|
||||||
|
String consumerType = arg[1];
|
||||||
|
int time = Integer.parseInt(arg[2]);
|
||||||
|
boolean tx = Boolean.parseBoolean(arg[3]);
|
||||||
|
if (time == 0) {
|
||||||
|
time = 15000;
|
||||||
|
}
|
||||||
|
|
||||||
final String host = "localhost";
|
final String host = "localhost";
|
||||||
final int port = 61616;
|
final int port = 61616;
|
||||||
|
|
||||||
final ConnectionFactory factory = new org.apache.qpid.jms.JmsConnectionFactory("failover:(amqp://" + host + ":" + port + ")");
|
final ConnectionFactory factory = createConnectionFactory(protocol, "tcp://" + host + ":" + port);
|
||||||
|
|
||||||
for (int i = 0; i < producer_threads; i++) {
|
for (int i = 0; i < producer_threads; i++) {
|
||||||
Thread t = new Thread(new Runnable() {
|
Thread t = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
SoakPagingTest app = new SoakPagingTest();
|
SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx);
|
||||||
app.produce(factory);
|
app.produce(factory);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -81,36 +139,43 @@ public class SoakPagingTest extends SmokeTestBase {
|
||||||
Thread t = new Thread(new Runnable() {
|
Thread t = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
SoakPagingTest app = new SoakPagingTest();
|
SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx);
|
||||||
app.consume(factory, j.getAndIncrement());
|
app.consume(factory, j.getAndIncrement());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
t.start();
|
t.start();
|
||||||
}
|
}
|
||||||
Thread.sleep(15000);
|
Thread.sleep(time);
|
||||||
|
|
||||||
System.exit(consumed.get());
|
System.exit(consumed.get() > 0 ? 1 : 0);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
System.exit(-1);
|
System.exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPagingReplication() throws Throwable {
|
public void testPagingReplication() throws Throwable {
|
||||||
for (int i = 0; i < 3; i++) {
|
|
||||||
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName());
|
Process queueProcess = null;
|
||||||
Assert.assertTrue(process.waitFor() > 0);
|
if (consumerType.equals("queue")) {
|
||||||
|
queueProcess = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "45000", "" + transaction);
|
||||||
}
|
}
|
||||||
|
|
||||||
server1.destroy();
|
for (int i = 0; i < 3; i++) {
|
||||||
|
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "15000", "" + transaction);
|
||||||
|
|
||||||
server1 = startServer(SERVER_NAME_1, 0, 30000);
|
if (i == 0) {
|
||||||
|
server1 = startServer(SERVER_NAME_1, 0, 30000);
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < 2; i++) {
|
int result = process.waitFor();
|
||||||
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName());
|
Assert.assertTrue(result > 0);
|
||||||
Assert.assertTrue(process.waitFor() > 0);
|
}
|
||||||
|
|
||||||
|
if (queueProcess != null) {
|
||||||
|
Assert.assertTrue(queueProcess.waitFor() > 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,9 +189,22 @@ public class SoakPagingTest extends SmokeTestBase {
|
||||||
Connection connection = factory.createConnection("admin", "admin");
|
Connection connection = factory.createConnection("admin", "admin");
|
||||||
|
|
||||||
connection.start();
|
connection.start();
|
||||||
final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
|
final Session session;
|
||||||
|
|
||||||
|
if (transaction) {
|
||||||
|
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
} else {
|
||||||
|
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
Destination address;
|
||||||
|
|
||||||
|
if (consumerType.equals("queue")) {
|
||||||
|
address = session.createQueue(destination);
|
||||||
|
} else {
|
||||||
|
address = session.createTopic(destination);
|
||||||
|
}
|
||||||
|
|
||||||
Destination address = session.createTopic(destination);
|
|
||||||
MessageProducer messageProducer = session.createProducer(address);
|
MessageProducer messageProducer = session.createProducer(address);
|
||||||
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
@ -142,8 +220,12 @@ public class SoakPagingTest extends SmokeTestBase {
|
||||||
messageProducer.send(message);
|
messageProducer.send(message);
|
||||||
produced.incrementAndGet();
|
produced.incrementAndGet();
|
||||||
i++;
|
i++;
|
||||||
if (i % 100 == 0)
|
if (i % 100 == 0) {
|
||||||
System.out.println("Published " + i + " messages");
|
System.out.println("Published " + i + " messages");
|
||||||
|
if (transaction) {
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
@ -154,11 +236,30 @@ public class SoakPagingTest extends SmokeTestBase {
|
||||||
try {
|
try {
|
||||||
Connection connection = factory.createConnection("admin", "admin");
|
Connection connection = factory.createConnection("admin", "admin");
|
||||||
|
|
||||||
final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
|
final Session session;
|
||||||
|
|
||||||
|
if (transaction) {
|
||||||
|
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
} else {
|
||||||
|
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
Destination address;
|
||||||
|
|
||||||
|
if (consumerType.equals("queue")) {
|
||||||
|
address = session.createQueue(destination);
|
||||||
|
} else {
|
||||||
|
address = session.createTopic(destination);
|
||||||
|
}
|
||||||
|
|
||||||
Topic address = session.createTopic(destination);
|
|
||||||
String consumerId = "ss" + (j % 5);
|
String consumerId = "ss" + (j % 5);
|
||||||
MessageConsumer messageConsumer = session.createSharedConsumer(address, consumerId);
|
MessageConsumer messageConsumer;
|
||||||
|
|
||||||
|
if (protocol.equals("shared")) {
|
||||||
|
messageConsumer = session.createSharedConsumer((Topic)address, consumerId);
|
||||||
|
} else {
|
||||||
|
messageConsumer = session.createConsumer(address);
|
||||||
|
}
|
||||||
|
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
connection.start();
|
connection.start();
|
||||||
|
@ -170,8 +271,12 @@ public class SoakPagingTest extends SmokeTestBase {
|
||||||
if (m == null)
|
if (m == null)
|
||||||
System.out.println("receive() returned null");
|
System.out.println("receive() returned null");
|
||||||
i++;
|
i++;
|
||||||
if (i % 100 == 0)
|
if (i % 100 == 0) {
|
||||||
System.out.println("Consumed " + i + " messages");
|
System.out.println("Consumed " + i + " messages");
|
||||||
|
if (transaction) {
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
|
Loading…
Reference in New Issue