NO-JIRA Removing Thread usage for Pings on Stomp

This commit is contained in:
Clebert Suconic 2016-09-26 18:32:17 -04:00
parent d09aba4cbe
commit 1548a4e217
6 changed files with 59 additions and 46 deletions

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@ -44,6 +45,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.VersionLoader;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@ -96,6 +98,10 @@ public final class StompConnection implements RemotingConnection {
private final int minLargeMessageSize;
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorFactory factory;
@Override
public boolean isSupportReconnect() {
return false;
@ -111,7 +117,7 @@ public final class StompConnection implements RemotingConnection {
case ActiveMQStompException.INVALID_EOL_V10:
if (version != null)
throw e;
frameHandler = new StompFrameHandlerV12(this);
frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, factory);
buffer.resetReaderIndex();
frame = decode(buffer);
break;
@ -136,12 +142,18 @@ public final class StompConnection implements RemotingConnection {
StompConnection(final Acceptor acceptorUsed,
final Connection transportConnection,
final StompProtocolManager manager) {
final StompProtocolManager manager,
final ScheduledExecutorService scheduledExecutorService,
final ExecutorFactory factory) {
this.scheduledExecutorService = scheduledExecutorService;
this.factory = factory;
this.transportConnection = transportConnection;
this.manager = manager;
this.frameHandler = new StompFrameHandlerV10(this);
this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, factory);
this.creationTime = System.currentTimeMillis();
@ -452,7 +464,7 @@ public final class StompConnection implements RemotingConnection {
}
if (this.version != (StompVersions.V1_0)) {
VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version);
VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, factory);
newHandler.initDecoder(this.frameHandler);
this.frameHandler = newHandler;
}

View File

@ -117,7 +117,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,Sto
@Override
public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) {
StompConnection conn = new StompConnection(acceptorUsed, connection, this);
StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getScheduledPool(), server.getExecutorFactory());
// Note that STOMP 1.0 has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
// will be timed out and closed!

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.stomp;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@ -37,21 +39,26 @@ public abstract class VersionedStompFrameHandler {
protected StompConnection connection;
protected StompDecoder decoder;
public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version) {
protected final ScheduledExecutorService scheduledExecutorService;
protected final ExecutorFactory executorFactory;
public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) {
if (version == StompVersions.V1_0) {
return new StompFrameHandlerV10(connection);
return new StompFrameHandlerV10(connection, scheduledExecutorService, executorFactory);
}
if (version == StompVersions.V1_1) {
return new StompFrameHandlerV11(connection);
return new StompFrameHandlerV11(connection, scheduledExecutorService, executorFactory);
}
if (version == StompVersions.V1_2) {
return new StompFrameHandlerV12(connection);
return new StompFrameHandlerV12(connection, scheduledExecutorService, executorFactory);
}
return null;
}
protected VersionedStompFrameHandler(StompConnection connection) {
protected VersionedStompFrameHandler(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) {
this.connection = connection;
this.scheduledExecutorService = scheduledExecutorService;
this.executorFactory = executorFactory;
}
public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v10;
import javax.security.cert.X509Certificate;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.core.protocol.stomp.FrameEventListener;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
@ -30,13 +31,14 @@ import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandle
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.utils.CertificateUtil;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements FrameEventListener {
public StompFrameHandlerV10(StompConnection connection) {
super(connection);
public StompFrameHandlerV10(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory factory) {
super(connection, scheduledExecutorService, factory);
decoder = new StompDecoder(this);
decoder.init();
connection.addStompEventListener(this);

View File

@ -18,6 +18,9 @@ package org.apache.activemq.artemis.core.protocol.stomp.v11;
import javax.security.cert.X509Certificate;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
@ -31,9 +34,11 @@ import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandle
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.utils.CertificateUtil;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@ -43,8 +48,8 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
private HeartBeater heartBeater;
public StompFrameHandlerV11(StompConnection connection) {
super(connection);
public StompFrameHandlerV11(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) {
super(connection, scheduledExecutorService, executorFactory);
connection.addStompEventListener(this);
decoder = new StompDecoderV11(this);
decoder.init();
@ -127,19 +132,13 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
//client receive ping
long minAcceptInterval = Long.valueOf(params[1]);
heartBeater = new HeartBeater(minPingInterval, minAcceptInterval);
heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval);
}
@Override
public StompFrame onDisconnect(StompFrame frame) {
if (this.heartBeater != null) {
heartBeater.shutdown();
try {
heartBeater.join();
}
catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.errorOnStompHeartBeat(e);
}
}
return null;
}
@ -250,7 +249,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
* (b) configure connection ttl so that org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.FailureCheckAndFlushThread
* can deal with closing connections which go stale
*/
private class HeartBeater extends Thread {
private class HeartBeater extends ActiveMQScheduledComponent {
private static final int MIN_SERVER_PING = 500;
@ -260,7 +259,13 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
AtomicLong lastPingTimestamp = new AtomicLong(0);
ConnectionEntry connectionEntry;
private HeartBeater(final long clientPing, final long clientAcceptPing) {
private HeartBeater(ScheduledExecutorService scheduledExecutorService, Executor executor, final long clientPing, final long clientAcceptPing) {
super(scheduledExecutorService, executor, clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING, TimeUnit.MILLISECONDS, false);
if (clientAcceptPing != 0) {
serverPingPeriod = super.getPeriod();
}
connectionEntry = ((RemotingServiceImpl)connection.getManager().getServer().getRemotingService()).getConnectionEntry(connection.getID());
if (connectionEntry != null) {
@ -299,14 +304,11 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
}
}
if (clientAcceptPing != 0) {
serverPingPeriod = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING;
}
}
public synchronized void shutdown() {
shutdown = true;
this.notify();
public void shutdown() {
this.stop();
}
public void pinged() {
@ -315,21 +317,8 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
@Override
public void run() {
synchronized (this) {
while (!shutdown) {
long lastPingPeriod = System.currentTimeMillis() - lastPingTimestamp.get();
if (lastPingPeriod >= serverPingPeriod) {
lastPingTimestamp.set(System.currentTimeMillis());
connection.ping(createPingFrame());
lastPingPeriod = 0;
}
try {
this.wait(serverPingPeriod - lastPingPeriod);
}
catch (InterruptedException e) {
}
}
}
lastPingTimestamp.set(System.currentTimeMillis());
connection.ping(createPingFrame());
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp.v12;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
@ -26,13 +28,14 @@ import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
public class StompFrameHandlerV12 extends StompFrameHandlerV11 {
public StompFrameHandlerV12(StompConnection connection) {
super(connection);
public StompFrameHandlerV12(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory factory) {
super(connection, scheduledExecutorService, factory);
decoder = new StompDecoderV12(this);
decoder.init();
}