ARTEMIS-3956 clean up use of RemotingConnection

org.apache.activemq.artemis.spi.core.protocol.RemotingConnection has a
number of implementations most notably an abstract version which
provides many methods shared among the implementations. The sharing
could be improved to eliminate duplicate code.

This commit eliminates more than 700 lines of unnecessary code.

There should be no semantic changes.
This commit is contained in:
Justin Bertram 2022-08-24 12:44:10 -05:00 committed by clebertsuconic
parent 729fdc4aab
commit a0f39a4b28
11 changed files with 75 additions and 783 deletions

View File

@ -145,6 +145,11 @@
<artifactId>jakarta.json-api</artifactId> <artifactId>jakarta.json-api</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

View File

@ -64,8 +64,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
private final List<Interceptor> outgoingInterceptors; private final List<Interceptor> outgoingInterceptors;
private volatile boolean destroyed;
private final boolean client; private final boolean client;
private int channelVersion; private int channelVersion;
@ -80,12 +78,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
private final SimpleString nodeID; private final SimpleString nodeID;
@Override
public void scheduledFlush() {
flush();
}
/* /*
* Create a client side connection * Create a client side connection
*/ */
@ -326,11 +318,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
return client; return client;
} }
@Override
public boolean isDestroyed() {
return destroyed;
}
@Override @Override
public long getBlockingCallTimeout() { public long getBlockingCallTimeout() {
return blockingCallTimeout; return blockingCallTimeout;
@ -368,6 +355,11 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
return false; return false;
} }
@Override
public boolean isSupportsFlowControl() {
return true;
}
/** /**
* Returns the name of the protocol for this Remoting Connection * Returns the name of the protocol for this Remoting Connection
* *
@ -389,8 +381,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
logger.trace("RemotingConnectionID=" + getID() + " handling packet " + packet); logger.trace("RemotingConnectionID=" + getID() + " handling packet " + packet);
} }
dataReceived = true;
doBufferReceived(packet); doBufferReceived(packet);
super.bufferReceived(connectionID, buffer); super.bufferReceived(connectionID, buffer);
@ -409,11 +399,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
} }
} }
@Override
public String getTransportLocalAddress() {
return getTransportConnection().getLocalAddress();
}
private void doBufferReceived(final Packet packet) { private void doBufferReceived(final Packet packet) {
if (ChannelImpl.invokeInterceptors(packet, incomingInterceptors, this) != null) { if (ChannelImpl.invokeInterceptors(packet, incomingInterceptors, this) != null) {
return; return;

View File

@ -26,6 +26,7 @@ import java.util.concurrent.FutureTask;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.CloseListener;
@ -45,6 +46,7 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
protected final Connection transportConnection; protected final Connection transportConnection;
protected final Executor executor; protected final Executor executor;
protected final long creationTime; protected final long creationTime;
protected volatile boolean destroyed;
protected volatile boolean dataReceived; protected volatile boolean dataReceived;
private String clientId; private String clientId;
private Subject subject; private Subject subject;
@ -65,6 +67,21 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return new ArrayList<>(failureListeners); return new ArrayList<>(failureListeners);
} }
@Override
public boolean isClient() {
return false;
}
@Override
public boolean isDestroyed() {
return destroyed;
}
@Override
public void flush() {
// noop
}
@Override @Override
public boolean isWritable(ReadyListener callback) { public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback); return transportConnection.isWritable(callback);
@ -204,11 +221,6 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return res; return res;
} }
@Override
public boolean isSupportReconnect() {
return false;
}
/* /*
* This can be called concurrently by more than one thread so needs to be locked * This can be called concurrently by more than one thread so needs to be locked
*/ */
@ -239,9 +251,19 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
dataReceived = true; dataReceived = true;
} }
@Override
public void killMessage(SimpleString nodeID) {
// noop
}
@Override
public boolean isSupportReconnect() {
return false;
}
@Override @Override
public boolean isSupportsFlowControl() { public boolean isSupportsFlowControl() {
return true; return false;
} }
@Override @Override
@ -263,4 +285,9 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
public String getClientID() { public String getClientID() {
return clientId; return clientId;
} }
@Override
public String getTransportLocalAddress() {
return transportConnection.getLocalAddress();
}
} }

View File

@ -35,7 +35,6 @@ import javax.security.auth.Subject;
/** /**
* A RemotingConnection is a connection between a client and a server. * A RemotingConnection is a connection between a client and a server.
* *
*
* Perhaps a better name for this class now would be ProtocolConnection as this * Perhaps a better name for this class now would be ProtocolConnection as this
* represents the link with the used protocol * represents the link with the used protocol
*/ */

View File

@ -16,32 +16,19 @@
*/ */
package org.apache.activemq.artemis.core.protocol.core.impl; package org.apache.activemq.artemis.core.protocol.core.impl;
import javax.security.auth.Subject;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -51,7 +38,11 @@ public class ChannelImplTest {
@Before @Before
public void setUp() { public void setUp() {
channel = new ChannelImpl(new CoreRR(), 1, 4000, null); CoreRemotingConnection coreRC = Mockito.mock(CoreRemotingConnection.class);
Mockito.when(coreRC.createTransportBuffer(Packet.INITIAL_PACKET_SIZE)).thenReturn(new ChannelBufferWrapper(Unpooled.buffer(Packet.INITIAL_PACKET_SIZE)));
Connection connection = Mockito.mock(Connection.class);
Mockito.when(coreRC.getTransportConnection()).thenReturn(connection);
channel = new ChannelImpl(coreRC, 1, 4000, null);
} }
@Test @Test
@ -59,15 +50,18 @@ public class ChannelImplTest {
AtomicInteger handleResponseCount = new AtomicInteger(); AtomicInteger handleResponseCount = new AtomicInteger();
RequestPacket requestPacket = new RequestPacket((byte) 1); Packet requestPacket = Mockito.mock(Packet.class);
Mockito.when(requestPacket.isResponseAsync()).thenReturn(true);
Mockito.when(requestPacket.isRequiresResponse()).thenReturn(true);
setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet()); setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
channel.send(requestPacket); channel.send(requestPacket);
assertEquals(1, channel.getCache().size()); assertEquals(1, channel.getCache().size());
ResponsePacket responsePacket = new ResponsePacket((byte) 1); Packet responsePacket = Mockito.mock(Packet.class);
responsePacket.setCorrelationID(requestPacket.getCorrelationID()); Mockito.when(responsePacket.isResponseAsync()).thenReturn(true);
Mockito.when(responsePacket.isResponse()).thenReturn(true);
channel.handlePacket(responsePacket); channel.handlePacket(responsePacket);
@ -94,7 +88,10 @@ public class ChannelImplTest {
AtomicInteger handleResponseCount = new AtomicInteger(); AtomicInteger handleResponseCount = new AtomicInteger();
RequestPacket requestPacket = new RequestPacket((byte) 1); Packet requestPacket = Mockito.mock(Packet.class);
Mockito.when(requestPacket.isResponseAsync()).thenReturn(true);
Mockito.when(requestPacket.isRequiresResponse()).thenReturn(true);
Mockito.when(requestPacket.isRequiresConfirmations()).thenReturn(true);
setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet()); setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
channel.send(requestPacket); channel.send(requestPacket);
@ -105,434 +102,4 @@ public class ChannelImplTest {
assertEquals(0, channel.getCache().size()); assertEquals(0, channel.getCache().size());
} }
class RequestPacket extends PacketImpl {
private long id;
RequestPacket(byte type) {
super(type);
}
@Override
public boolean isRequiresResponse() {
return true;
}
@Override
public boolean isResponseAsync() {
return true;
}
@Override
public long getCorrelationID() {
return id;
}
@Override
public void setCorrelationID(long id) {
this.id = id;
}
@Override
public int getPacketSize() {
return 0;
}
}
class ResponsePacket extends PacketImpl {
private long id;
ResponsePacket(byte type) {
super(type);
}
@Override
public boolean isResponseAsync() {
return true;
}
@Override
public boolean isResponse() {
return true;
}
@Override
public long getCorrelationID() {
return id;
}
@Override
public void setCorrelationID(long id) {
this.id = id;
}
@Override
public int getPacketSize() {
return 0;
}
}
class CoreRR implements CoreRemotingConnection {
@Override
public int getChannelVersion() {
return 0;
}
@Override
public void setChannelVersion(int clientVersion) {
}
@Override
public Channel getChannel(long channelID, int confWindowSize) {
return null;
}
@Override
public void putChannel(long channelID, Channel channel) {
}
@Override
public boolean removeChannel(long channelID) {
return false;
}
@Override
public long generateChannelID() {
return 0;
}
@Override
public void syncIDGeneratorSequence(long id) {
}
@Override
public long getIDGeneratorSequence() {
return 0;
}
@Override
public long getBlockingCallTimeout() {
return 0;
}
@Override
public long getBlockingCallFailoverTimeout() {
return 0;
}
@Override
public Object getTransferLock() {
return null;
}
@Override
public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
return null;
}
@Override
public boolean blockUntilWritable(long timeout) {
return false;
}
@Override
public Object getID() {
return null;
}
@Override
public long getCreationTime() {
return 0;
}
@Override
public String getRemoteAddress() {
return null;
}
@Override
public void scheduledFlush() {
}
@Override
public void addFailureListener(FailureListener listener) {
}
@Override
public boolean removeFailureListener(FailureListener listener) {
return false;
}
@Override
public void addCloseListener(CloseListener listener) {
}
@Override
public boolean removeCloseListener(CloseListener listener) {
return false;
}
@Override
public List<CloseListener> removeCloseListeners() {
return null;
}
@Override
public void setCloseListeners(List<CloseListener> listeners) {
}
@Override
public List<FailureListener> getFailureListeners() {
return null;
}
@Override
public List<FailureListener> removeFailureListeners() {
return null;
}
@Override
public void setFailureListeners(List<FailureListener> listeners) {
}
@Override
public ActiveMQBuffer createTransportBuffer(int size) {
return new ChannelBufferWrapper(Unpooled.buffer(size));
}
@Override
public void fail(ActiveMQException me) {
}
@Override
public Future asyncFail(ActiveMQException me) {
return null;
}
@Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
}
@Override
public void destroy() {
}
@Override
public Connection getTransportConnection() {
return new Connection() {
@Override
public ActiveMQBuffer createTransportBuffer(int size) {
return null;
}
@Override
public RemotingConnection getProtocolConnection() {
return null;
}
@Override
public void setProtocolConnection(RemotingConnection connection) {
}
@Override
public boolean isOpen() {
return true;
}
@Override
public boolean isWritable(ReadyListener listener) {
return false;
}
@Override
public void fireReady(boolean ready) {
}
@Override
public void setAutoRead(boolean autoRead) {
}
@Override
public Object getID() {
return null;
}
@Override
public void write(ActiveMQBuffer buffer, boolean requestFlush) {
}
@Override
public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) {
}
@Override
public void write(ActiveMQBuffer buffer,
boolean flush,
boolean batched,
ChannelFutureListener futureListener) {
}
@Override
public void write(ActiveMQBuffer buffer) {
}
@Override
public void forceClose() {
}
@Override
public void close() {
}
@Override
public String getRemoteAddress() {
return null;
}
@Override
public String getLocalAddress() {
return null;
}
@Override
public void checkFlushBatchBuffer() {
}
@Override
public TransportConfiguration getConnectorConfig() {
return null;
}
@Override
public boolean isDirectDeliver() {
return false;
}
@Override
public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
return null;
}
@Override
public boolean isUsingProtocolHandling() {
return false;
}
@Override
public boolean isSameTarget(TransportConfiguration... configs) {
return false;
}
};
}
@Override
public boolean isClient() {
return true;
}
@Override
public boolean isDestroyed() {
return false;
}
@Override
public void disconnect(boolean criticalError) {
}
@Override
public void disconnect(String scaleDownNodeID, boolean criticalError) {
}
@Override
public boolean checkDataReceived() {
return false;
}
@Override
public void flush() {
}
@Override
public boolean isWritable(ReadyListener callback) {
return false;
}
@Override
public void killMessage(SimpleString nodeID) {
}
@Override
public boolean isSupportReconnect() {
return false;
}
@Override
public boolean isSupportsFlowControl() {
return false;
}
@Override
public void setSubject(Subject subject) {
}
@Override
public Subject getSubject() {
return null;
}
@Override
public String getProtocolName() {
return null;
}
@Override
public void setClientID(String cID) {
}
@Override
public String getClientID() {
return null;
}
@Override
public String getTransportLocalAddress() {
return null;
}
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
}
}
} }

View File

@ -16,12 +16,12 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.broker; package org.apache.activemq.artemis.protocol.amqp.broker;
import javax.security.auth.Subject;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException; import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
@ -30,8 +30,6 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import javax.security.auth.Subject;
/** /**
* This is a Server's Connection representation used by ActiveMQ Artemis. * This is a Server's Connection representation used by ActiveMQ Artemis.
*/ */
@ -39,8 +37,6 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
private final AMQPConnectionContext amqpConnection; private final AMQPConnectionContext amqpConnection;
private boolean destroyed = false;
private final ProtonProtocolManager manager; private final ProtonProtocolManager manager;
public ActiveMQProtonRemotingConnection(ProtonProtocolManager manager, public ActiveMQProtonRemotingConnection(ProtonProtocolManager manager,
@ -104,17 +100,6 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
callClosingListeners(); callClosingListeners();
internalClose(); internalClose();
}
@Override
public boolean isClient() {
return false;
}
@Override
public boolean isDestroyed() {
return destroyed;
} }
@Override @Override
@ -155,11 +140,6 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
getTransportConnection().close(); getTransportConnection().close();
} }
@Override
public void killMessage(SimpleString nodeID) {
//unsupported
}
@Override @Override
public Subject getSubject() { public Subject getSubject() {
SASLResult saslResult = amqpConnection.getSASLResult(); SASLResult saslResult = amqpConnection.getSASLResult();
@ -169,6 +149,11 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
return super.getSubject(); return super.getSubject();
} }
@Override
public boolean isSupportsFlowControl() {
return true;
}
/** /**
* Returns the name of the protocol for this Remoting Connection * Returns the name of the protocol for this Remoting Connection
* *
@ -188,9 +173,4 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
amqpConnection.open(); amqpConnection.open();
} }
@Override
public String getTransportLocalAddress() {
return getTransportConnection().getLocalAddress();
}
} }

View File

@ -24,7 +24,6 @@ import java.util.concurrent.FutureTask;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
@ -58,12 +57,7 @@ public class MQTTConnection extends AbstractRemotingConnection {
@Override @Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) { public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
synchronized (failureListeners) { fail(me);
for (FailureListener listener : failureListeners) {
//FIXME(mtaylor) How do we check if the node has failed over?
listener.connectionFailed(me, false);
}
}
} }
@Override @Override
@ -88,16 +82,6 @@ public class MQTTConnection extends AbstractRemotingConnection {
disconnect(false); disconnect(false);
} }
@Override
public boolean isClient() {
return false;
}
@Override
public boolean isDestroyed() {
return destroyed;
}
@Override @Override
public void disconnect(boolean criticalError) { public void disconnect(boolean criticalError) {
transportConnection.forceClose(); transportConnection.forceClose();
@ -129,16 +113,6 @@ public class MQTTConnection extends AbstractRemotingConnection {
return connected; return connected;
} }
@Override
public void killMessage(SimpleString nodeID) {
//unsupported
}
@Override
public boolean isSupportsFlowControl() {
return false;
}
/** /**
* Returns the name of the protocol for this Remoting Connection * Returns the name of the protocol for this Remoting Connection
* *
@ -149,11 +123,6 @@ public class MQTTConnection extends AbstractRemotingConnection {
return MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME + (protocolVersion != null ? protocolVersion : ""); return MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME + (protocolVersion != null ? protocolVersion : "");
} }
@Override
public String getTransportLocalAddress() {
return getTransportConnection().getLocalAddress();
}
public int getReceiveMaximum() { public int getReceiveMaximum() {
return receiveMaximum; return receiveMaximum;
} }

View File

@ -478,16 +478,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
fail(null, null); fail(null, null);
} }
@Override
public boolean isClient() {
return false;
}
@Override
public boolean isDestroyed() {
return destroyed;
}
@Override @Override
public void disconnect(boolean criticalError) { public void disconnect(boolean criticalError) {
this.disconnect(null, null, criticalError); this.disconnect(null, null, criticalError);
@ -1835,8 +1825,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
@Override @Override
public void killMessage(SimpleString nodeID) { public boolean isSupportsFlowControl() {
//unsupported return true;
} }
@Override @Override
@ -1849,11 +1839,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return context != null ? context.getClientId() : null; return context != null ? context.getClientId() : null;
} }
@Override
public String getTransportLocalAddress() {
return transportConnection.getLocalAddress();
}
public CoreMessageObjectPools getCoreMessageObjectPools() { public CoreMessageObjectPools getCoreMessageObjectPools() {
return coreMessageObjectPools; return coreMessageObjectPools;
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.core.protocol.stomp; package org.apache.activemq.artemis.core.protocol.stomp;
import javax.security.auth.Subject;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
@ -24,14 +23,12 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@ -43,14 +40,13 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10; import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12; import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@ -63,7 +59,7 @@ import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
import static org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING; import static org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING;
public final class StompConnection implements RemotingConnection { public final class StompConnection extends AbstractRemotingConnection {
private static final Logger logger = Logger.getLogger(StompConnection.class); private static final Logger logger = Logger.getLogger(StompConnection.class);
@ -72,31 +68,19 @@ public final class StompConnection implements RemotingConnection {
private final StompProtocolManager manager; private final StompProtocolManager manager;
private final Connection transportConnection;
private String login; private String login;
private String passcode; private String passcode;
private String clientID;
//this means login is valid. (stomp connection ok) //this means login is valid. (stomp connection ok)
private boolean valid; private boolean valid;
private boolean destroyed = false; private boolean destroyed = false;
private final long creationTime;
private final Acceptor acceptorUsed; private final Acceptor acceptorUsed;
private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<>();
private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
private final Object failLock = new Object(); private final Object failLock = new Object();
private boolean dataReceived;
private final boolean enableMessageID; private final boolean enableMessageID;
private final int minLargeMessageSize; private final int minLargeMessageSize;
@ -115,23 +99,12 @@ public final class StompConnection implements RemotingConnection {
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorFactory executorFactory; private final ExecutorFactory executorFactory;
private Subject subject;
@Override
public boolean isSupportReconnect() {
return false;
}
public VersionedStompFrameHandler getStompVersionHandler() { public VersionedStompFrameHandler getStompVersionHandler() {
return frameHandler; return frameHandler;
} }
@Override
public void scheduledFlush() {
flush();
}
public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException { public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
StompFrame frame = null; StompFrame frame = null;
try { try {
@ -170,105 +143,22 @@ public final class StompConnection implements RemotingConnection {
final StompProtocolManager manager, final StompProtocolManager manager,
final ScheduledExecutorService scheduledExecutorService, final ScheduledExecutorService scheduledExecutorService,
final ExecutorFactory executorFactory) { final ExecutorFactory executorFactory) {
super(transportConnection, null);
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
this.transportConnection = transportConnection;
this.manager = manager; this.manager = manager;
this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, executorFactory); this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, executorFactory);
this.creationTime = System.currentTimeMillis();
this.acceptorUsed = acceptorUsed; this.acceptorUsed = acceptorUsed;
this.enableMessageID = ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID_DEPRECATED, false, acceptorUsed.getConfiguration()) || ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID, false, acceptorUsed.getConfiguration()); this.enableMessageID = ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID_DEPRECATED, false, acceptorUsed.getConfiguration()) || ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID, false, acceptorUsed.getConfiguration());
this.minLargeMessageSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE_DEPRECATED, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, acceptorUsed.getConfiguration()), acceptorUsed.getConfiguration()); this.minLargeMessageSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE_DEPRECATED, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, acceptorUsed.getConfiguration()), acceptorUsed.getConfiguration());
} }
@Override
public void addFailureListener(final FailureListener listener) {
if (listener == null) {
throw new IllegalStateException("FailureListener cannot be null");
}
failureListeners.add(listener);
}
@Override
public boolean removeFailureListener(final FailureListener listener) {
if (listener == null) {
throw new IllegalStateException("FailureListener cannot be null");
}
return failureListeners.remove(listener);
}
@Override
public void addCloseListener(final CloseListener listener) {
if (listener == null) {
throw new IllegalStateException("CloseListener cannot be null");
}
closeListeners.add(listener);
}
@Override
public boolean removeCloseListener(final CloseListener listener) {
if (listener == null) {
throw new IllegalStateException("CloseListener cannot be null");
}
return closeListeners.remove(listener);
}
@Override
public List<CloseListener> removeCloseListeners() {
List<CloseListener> ret = new ArrayList<>(closeListeners);
closeListeners.clear();
return ret;
}
@Override
public List<FailureListener> removeFailureListeners() {
List<FailureListener> ret = new ArrayList<>(failureListeners);
failureListeners.clear();
return ret;
}
@Override
public void setCloseListeners(List<CloseListener> listeners) {
closeListeners.clear();
closeListeners.addAll(listeners);
}
@Override
public void setFailureListeners(final List<FailureListener> listeners) {
failureListeners.clear();
failureListeners.addAll(listeners);
}
protected synchronized void setDataReceived() {
dataReceived = true;
}
@Override
public synchronized boolean checkDataReceived() {
boolean res = dataReceived;
dataReceived = false;
return res;
}
// TODO this should take a type - send or receive so it knows whether to check the address or the queue // TODO this should take a type - send or receive so it knows whether to check the address or the queue
public void checkDestination(String destination) throws ActiveMQStompException { public void checkDestination(String destination) throws ActiveMQStompException {
if (!manager.destinationExists(destination)) { if (!manager.destinationExists(destination)) {
@ -326,11 +216,6 @@ public final class StompConnection implements RemotingConnection {
} }
} }
@Override
public ActiveMQBuffer createTransportBuffer(int size) {
return ActiveMQBuffers.dynamicBuffer(size);
}
@Override @Override
public void destroy() { public void destroy() {
synchronized (failLock) { synchronized (failLock) {
@ -407,10 +292,6 @@ public final class StompConnection implements RemotingConnection {
fail(me); fail(me);
} }
@Override
public void flush() {
}
@Override @Override
public List<FailureListener> getFailureListeners() { public List<FailureListener> getFailureListeners() {
// we do not return the listeners otherwise the remoting service // we do not return the listeners otherwise the remoting service
@ -418,38 +299,9 @@ public final class StompConnection implements RemotingConnection {
return Collections.emptyList(); return Collections.emptyList();
} }
@Override
public Object getID() {
return transportConnection.getID();
}
@Override
public String getRemoteAddress() {
return transportConnection.getRemoteAddress();
}
@Override
public long getCreationTime() {
return creationTime;
}
@Override
public Connection getTransportConnection() {
return transportConnection;
}
@Override
public boolean isClient() {
return false;
}
@Override
public boolean isDestroyed() {
return destroyed;
}
@Override @Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
super.bufferReceived(connectionID, buffer);
manager.handleBuffer(this, buffer); manager.handleBuffer(this, buffer);
} }
@ -469,16 +321,6 @@ public final class StompConnection implements RemotingConnection {
this.passcode = passcode; this.passcode = passcode;
} }
@Override
public void setClientID(String clientID) {
this.clientID = clientID;
}
@Override
public String getClientID() {
return clientID;
}
public boolean isValid() { public boolean isValid() {
return valid; return valid;
} }
@ -494,24 +336,7 @@ public final class StompConnection implements RemotingConnection {
try { try {
listener.connectionFailed(me, false); listener.connectionFailed(me, false);
} catch (final Throwable t) { } catch (final Throwable t) {
// Failure of one listener to execute shouldn't prevent others // Failure of one listener to execute shouldn't prevent others from executing
// from
// executing
ActiveMQServerLogger.LOGGER.errorCallingFailureListener(t);
}
}
}
private void callClosingListeners() {
final List<CloseListener> listenersClone = new ArrayList<>(closeListeners);
for (final CloseListener listener : listenersClone) {
try {
listener.connectionClosed();
} catch (final Throwable t) {
// Failure of one listener to execute shouldn't prevent others
// from
// executing
ActiveMQServerLogger.LOGGER.errorCallingFailureListener(t); ActiveMQServerLogger.LOGGER.errorCallingFailureListener(t);
} }
} }
@ -828,7 +653,6 @@ public final class StompConnection implements RemotingConnection {
} }
ActiveMQStompProtocolLogger.LOGGER.sentErrorToClient(getTransportConnection().getRemoteAddress(), message); ActiveMQStompProtocolLogger.LOGGER.sentErrorToClient(getTransportConnection().getRemoteAddress(), message);
} }
} }
public VersionedStompFrameHandler getFrameHandler() { public VersionedStompFrameHandler getFrameHandler() {
@ -847,26 +671,6 @@ public final class StompConnection implements RemotingConnection {
return manager; return manager;
} }
@Override
public void killMessage(SimpleString nodeID) {
//unsupported
}
@Override
public boolean isSupportsFlowControl() {
return false;
}
@Override
public void setSubject(Subject subject) {
this.subject = subject;
}
@Override
public Subject getSubject() {
return subject;
}
/** /**
* Returns the name of the protocol for this Remoting Connection * Returns the name of the protocol for this Remoting Connection
* *
@ -876,11 +680,4 @@ public final class StompConnection implements RemotingConnection {
public String getProtocolName() { public String getProtocolName() {
return StompProtocolManagerFactory.STOMP_PROTOCOL_NAME; return StompProtocolManagerFactory.STOMP_PROTOCOL_NAME;
} }
@Override
public String getTransportLocalAddress() {
// TODO Auto-generated method stub
return getTransportConnection().getLocalAddress();
}
} }

View File

@ -134,8 +134,6 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
public void handleBuffer(final RemotingConnection connection, final ActiveMQBuffer buffer) { public void handleBuffer(final RemotingConnection connection, final ActiveMQBuffer buffer) {
StompConnection conn = (StompConnection) connection; StompConnection conn = (StompConnection) connection;
conn.setDataReceived();
do { do {
StompFrame request; StompFrame request;
try { try {

View File

@ -53,12 +53,10 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void scheduledFlush() { public void scheduledFlush() {
} }
@Override @Override
public void addFailureListener(FailureListener listener) { public void addFailureListener(FailureListener listener) {
} }
@Override @Override
@ -68,7 +66,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void addCloseListener(CloseListener listener) { public void addCloseListener(CloseListener listener) {
} }
@Override @Override
@ -83,7 +80,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void setCloseListeners(List<CloseListener> listeners) { public void setCloseListeners(List<CloseListener> listeners) {
} }
@Override @Override
@ -98,7 +94,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void setFailureListeners(List<FailureListener> listeners) { public void setFailureListeners(List<FailureListener> listeners) {
} }
@Override @Override
@ -108,7 +103,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void fail(ActiveMQException me) { public void fail(ActiveMQException me) {
} }
@Override @Override
@ -118,12 +112,10 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) { public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
} }
@Override @Override
public void destroy() { public void destroy() {
} }
@Override @Override
@ -143,12 +135,10 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void disconnect(boolean criticalError) { public void disconnect(boolean criticalError) {
} }
@Override @Override
public void disconnect(String scaleDownNodeID, boolean criticalError) { public void disconnect(String scaleDownNodeID, boolean criticalError) {
} }
@Override @Override
@ -158,7 +148,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void flush() { public void flush() {
} }
@Override @Override
@ -168,7 +157,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void killMessage(SimpleString nodeID) { public void killMessage(SimpleString nodeID) {
} }
@Override @Override
@ -183,7 +171,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void setSubject(Subject subject) { public void setSubject(Subject subject) {
} }
@Override @Override
@ -198,7 +185,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void setClientID(String cID) { public void setClientID(String cID) {
} }
@Override @Override
@ -208,12 +194,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public String getTransportLocalAddress() { public String getTransportLocalAddress() {
return "Manaement"; return "Management";
} }
@Override @Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
} }
public SessionCallback callback = new SessionCallback() { public SessionCallback callback = new SessionCallback() {
@ -224,7 +209,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void afterDelivery() throws Exception { public void afterDelivery() throws Exception {
} }
@Override @Override
@ -234,12 +218,10 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void sendProducerCreditsMessage(int credits, SimpleString address) { public void sendProducerCreditsMessage(int credits, SimpleString address) {
} }
@Override @Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) { public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
} }
@Override @Override
@ -271,7 +253,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void disconnect(ServerConsumer consumerId, SimpleString queueName) { public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
} }
@Override @Override
@ -281,7 +262,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override @Override
public void browserFinished(ServerConsumer consumer) { public void browserFinished(ServerConsumer consumer) {
} }
}; };
} }