Fixing intermittent failure on Proton tests

This commit is contained in:
Clebert Suconic 2016-01-06 20:58:52 -05:00
parent b3a8c23802
commit 1d7e8b38ea
5 changed files with 34 additions and 8 deletions

View File

@ -92,13 +92,15 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection);
ActiveMQProtonConnectionCallback connectionCallback = new ActiveMQProtonConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor());
long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
if (server.getConfiguration().getConnectionTTLOverride() != -1) {
ttl = server.getConfiguration().getConnectionTTLOverride();
}
AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX);
AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().
createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX);
Executor executor = server.getExecutorFactory().getExecutor();

View File

@ -46,9 +46,12 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
private final ReusableLatch latch = new ReusableLatch(0);
public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection) {
private final Executor closeExecutor;
public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection, Executor closeExecutor) {
this.manager = manager;
this.connection = connection;
this.closeExecutor = closeExecutor;
}
@Override
@ -114,7 +117,7 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
@Override
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
return new ProtonSessionIntegrationCallback(this, manager, connection, this.connection);
return new ProtonSessionIntegrationCallback(this, manager, connection, this.connection, closeExecutor);
}
}

View File

@ -66,14 +66,18 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
private AMQPSessionContext protonSession;
private final Executor closeExecutor;
public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
Connection transportConnection) {
Connection transportConnection,
Executor executor) {
this.protonSPI = protonSPI;
this.manager = manager;
this.connection = connection;
this.transportConnection = transportConnection;
this.closeExecutor = executor;
}
@Override
@ -220,7 +224,18 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
@Override
public void close() throws Exception {
serverSession.close(false);
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
serverSession.close(false);
}
catch (Exception e) {
// TODO Logger
e.printStackTrace();
}
}
});
}
@Override

View File

@ -36,6 +36,7 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Ignore;
import org.proton.plug.AMQPClientConnectionContext;
import org.proton.plug.AMQPClientSenderContext;
import org.proton.plug.AMQPClientSessionContext;
@ -52,7 +53,12 @@ import org.proton.plug.util.ByteUtil;
/**
* This is simulating a JMS client against a simple server
* This is being effectively tested by {@link org.apache.activemq.artemis.tests.integration.proton.ProtonTest} with a proper framework in place.
* This test eventually hungs on the testsuite.
* While it is still valid for debugging, for that reason the test will be ignored.
* and will be kept here for debug purposes.
*/
@Ignore // remove this to debug it
@RunWith(Parameterized.class)
public class ProtonTest extends AbstractJMSTest {

View File

@ -147,9 +147,9 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
public void close() {
System.out.println("Closing!!!");
running = false;
if (thread != null) {
if (thread != null && Thread.currentThread() != thread) {
try {
thread.join();
thread.join(1000);
}
catch (Throwable ignored) {
}