diff --git a/artemis-core-client/pom.xml b/artemis-core-client/pom.xml
index 541ced4343..4b4cc96e50 100644
--- a/artemis-core-client/pom.xml
+++ b/artemis-core-client/pom.xml
@@ -145,6 +145,11 @@
jakarta.json-api
test
+
+ org.mockito
+ mockito-core
+ test
+
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index 09607321a0..f633ab56f4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -64,8 +64,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
private final List outgoingInterceptors;
- private volatile boolean destroyed;
-
private final boolean client;
private int channelVersion;
@@ -80,12 +78,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
private final SimpleString nodeID;
- @Override
- public void scheduledFlush() {
- flush();
- }
-
-
/*
* Create a client side connection
*/
@@ -326,11 +318,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
return client;
}
- @Override
- public boolean isDestroyed() {
- return destroyed;
- }
-
@Override
public long getBlockingCallTimeout() {
return blockingCallTimeout;
@@ -368,6 +355,11 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
return false;
}
+ @Override
+ public boolean isSupportsFlowControl() {
+ return true;
+ }
+
/**
* Returns the name of the protocol for this Remoting Connection
*
@@ -389,8 +381,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
logger.trace("RemotingConnectionID=" + getID() + " handling packet " + packet);
}
- dataReceived = true;
-
doBufferReceived(packet);
super.bufferReceived(connectionID, buffer);
@@ -409,11 +399,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
}
}
- @Override
- public String getTransportLocalAddress() {
- return getTransportConnection().getLocalAddress();
- }
-
private void doBufferReceived(final Packet packet) {
if (ChannelImpl.invokeInterceptors(packet, incomingInterceptors, this) != null) {
return;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index 1e54913a27..401a021ebe 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -26,6 +26,7 @@ import java.util.concurrent.FutureTask;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.remoting.CloseListener;
@@ -45,6 +46,7 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
protected final Connection transportConnection;
protected final Executor executor;
protected final long creationTime;
+ protected volatile boolean destroyed;
protected volatile boolean dataReceived;
private String clientId;
private Subject subject;
@@ -65,6 +67,21 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return new ArrayList<>(failureListeners);
}
+ @Override
+ public boolean isClient() {
+ return false;
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ return destroyed;
+ }
+
+ @Override
+ public void flush() {
+ // noop
+ }
+
@Override
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
@@ -204,11 +221,6 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return res;
}
- @Override
- public boolean isSupportReconnect() {
- return false;
- }
-
/*
* This can be called concurrently by more than one thread so needs to be locked
*/
@@ -239,9 +251,19 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
dataReceived = true;
}
+ @Override
+ public void killMessage(SimpleString nodeID) {
+ // noop
+ }
+
+ @Override
+ public boolean isSupportReconnect() {
+ return false;
+ }
+
@Override
public boolean isSupportsFlowControl() {
- return true;
+ return false;
}
@Override
@@ -263,4 +285,9 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
public String getClientID() {
return clientId;
}
+
+ @Override
+ public String getTransportLocalAddress() {
+ return transportConnection.getLocalAddress();
+ }
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
index 3b4161a453..06b61e3ece 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
@@ -35,7 +35,6 @@ import javax.security.auth.Subject;
/**
* A RemotingConnection is a connection between a client and a server.
*
- *
* Perhaps a better name for this class now would be ProtocolConnection as this
* represents the link with the used protocol
*/
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
index 2784972863..9442c62d41 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -16,32 +16,19 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
-import javax.security.auth.Subject;
-import java.util.List;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelFutureListener;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
-import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
-import org.apache.activemq.artemis.core.remoting.FailureListener;
-import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
@@ -51,7 +38,11 @@ public class ChannelImplTest {
@Before
public void setUp() {
- channel = new ChannelImpl(new CoreRR(), 1, 4000, null);
+ CoreRemotingConnection coreRC = Mockito.mock(CoreRemotingConnection.class);
+ Mockito.when(coreRC.createTransportBuffer(Packet.INITIAL_PACKET_SIZE)).thenReturn(new ChannelBufferWrapper(Unpooled.buffer(Packet.INITIAL_PACKET_SIZE)));
+ Connection connection = Mockito.mock(Connection.class);
+ Mockito.when(coreRC.getTransportConnection()).thenReturn(connection);
+ channel = new ChannelImpl(coreRC, 1, 4000, null);
}
@Test
@@ -59,15 +50,18 @@ public class ChannelImplTest {
AtomicInteger handleResponseCount = new AtomicInteger();
- RequestPacket requestPacket = new RequestPacket((byte) 1);
+ Packet requestPacket = Mockito.mock(Packet.class);
+ Mockito.when(requestPacket.isResponseAsync()).thenReturn(true);
+ Mockito.when(requestPacket.isRequiresResponse()).thenReturn(true);
setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
channel.send(requestPacket);
assertEquals(1, channel.getCache().size());
- ResponsePacket responsePacket = new ResponsePacket((byte) 1);
- responsePacket.setCorrelationID(requestPacket.getCorrelationID());
+ Packet responsePacket = Mockito.mock(Packet.class);
+ Mockito.when(responsePacket.isResponseAsync()).thenReturn(true);
+ Mockito.when(responsePacket.isResponse()).thenReturn(true);
channel.handlePacket(responsePacket);
@@ -94,7 +88,10 @@ public class ChannelImplTest {
AtomicInteger handleResponseCount = new AtomicInteger();
- RequestPacket requestPacket = new RequestPacket((byte) 1);
+ Packet requestPacket = Mockito.mock(Packet.class);
+ Mockito.when(requestPacket.isResponseAsync()).thenReturn(true);
+ Mockito.when(requestPacket.isRequiresResponse()).thenReturn(true);
+ Mockito.when(requestPacket.isRequiresConfirmations()).thenReturn(true);
setResponseHandlerAsPerActiveMQSessionContext((packet, response) -> handleResponseCount.incrementAndGet());
channel.send(requestPacket);
@@ -105,434 +102,4 @@ public class ChannelImplTest {
assertEquals(0, channel.getCache().size());
}
-
- class RequestPacket extends PacketImpl {
-
- private long id;
-
- RequestPacket(byte type) {
- super(type);
- }
-
- @Override
- public boolean isRequiresResponse() {
- return true;
- }
-
- @Override
- public boolean isResponseAsync() {
- return true;
- }
-
- @Override
- public long getCorrelationID() {
- return id;
- }
-
- @Override
- public void setCorrelationID(long id) {
- this.id = id;
- }
-
- @Override
- public int getPacketSize() {
- return 0;
- }
- }
-
- class ResponsePacket extends PacketImpl {
-
- private long id;
-
- ResponsePacket(byte type) {
- super(type);
- }
-
- @Override
- public boolean isResponseAsync() {
- return true;
- }
-
- @Override
- public boolean isResponse() {
- return true;
- }
-
- @Override
- public long getCorrelationID() {
- return id;
- }
-
- @Override
- public void setCorrelationID(long id) {
- this.id = id;
- }
-
- @Override
- public int getPacketSize() {
- return 0;
- }
- }
-
- class CoreRR implements CoreRemotingConnection {
-
- @Override
- public int getChannelVersion() {
- return 0;
- }
-
- @Override
- public void setChannelVersion(int clientVersion) {
-
- }
-
- @Override
- public Channel getChannel(long channelID, int confWindowSize) {
- return null;
- }
-
- @Override
- public void putChannel(long channelID, Channel channel) {
-
- }
-
- @Override
- public boolean removeChannel(long channelID) {
- return false;
- }
-
- @Override
- public long generateChannelID() {
- return 0;
- }
-
- @Override
- public void syncIDGeneratorSequence(long id) {
-
- }
-
- @Override
- public long getIDGeneratorSequence() {
- return 0;
- }
-
- @Override
- public long getBlockingCallTimeout() {
- return 0;
- }
-
- @Override
- public long getBlockingCallFailoverTimeout() {
- return 0;
- }
-
- @Override
- public Object getTransferLock() {
- return null;
- }
-
- @Override
- public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
- return null;
- }
-
- @Override
- public boolean blockUntilWritable(long timeout) {
- return false;
- }
-
- @Override
- public Object getID() {
- return null;
- }
-
- @Override
- public long getCreationTime() {
- return 0;
- }
-
- @Override
- public String getRemoteAddress() {
- return null;
- }
-
- @Override
- public void scheduledFlush() {
-
- }
-
- @Override
- public void addFailureListener(FailureListener listener) {
-
- }
-
- @Override
- public boolean removeFailureListener(FailureListener listener) {
- return false;
- }
-
- @Override
- public void addCloseListener(CloseListener listener) {
-
- }
-
- @Override
- public boolean removeCloseListener(CloseListener listener) {
- return false;
- }
-
- @Override
- public List removeCloseListeners() {
- return null;
- }
-
- @Override
- public void setCloseListeners(List listeners) {
-
- }
-
- @Override
- public List getFailureListeners() {
- return null;
- }
-
- @Override
- public List removeFailureListeners() {
- return null;
- }
-
- @Override
- public void setFailureListeners(List listeners) {
-
- }
-
- @Override
- public ActiveMQBuffer createTransportBuffer(int size) {
- return new ChannelBufferWrapper(Unpooled.buffer(size));
- }
-
- @Override
- public void fail(ActiveMQException me) {
-
- }
-
- @Override
- public Future asyncFail(ActiveMQException me) {
- return null;
- }
-
- @Override
- public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
-
- }
-
- @Override
- public void destroy() {
-
- }
-
- @Override
- public Connection getTransportConnection() {
- return new Connection() {
- @Override
- public ActiveMQBuffer createTransportBuffer(int size) {
- return null;
- }
-
- @Override
- public RemotingConnection getProtocolConnection() {
- return null;
- }
-
- @Override
- public void setProtocolConnection(RemotingConnection connection) {
-
- }
-
- @Override
- public boolean isOpen() {
- return true;
- }
-
- @Override
- public boolean isWritable(ReadyListener listener) {
- return false;
- }
-
- @Override
- public void fireReady(boolean ready) {
-
- }
-
- @Override
- public void setAutoRead(boolean autoRead) {
-
- }
-
- @Override
- public Object getID() {
- return null;
- }
-
- @Override
- public void write(ActiveMQBuffer buffer, boolean requestFlush) {
-
- }
-
- @Override
- public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) {
-
- }
-
- @Override
- public void write(ActiveMQBuffer buffer,
- boolean flush,
- boolean batched,
- ChannelFutureListener futureListener) {
-
- }
-
- @Override
- public void write(ActiveMQBuffer buffer) {
-
- }
-
- @Override
- public void forceClose() {
-
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public String getRemoteAddress() {
- return null;
- }
-
- @Override
- public String getLocalAddress() {
- return null;
- }
-
- @Override
- public void checkFlushBatchBuffer() {
-
- }
-
- @Override
- public TransportConfiguration getConnectorConfig() {
- return null;
- }
-
- @Override
- public boolean isDirectDeliver() {
- return false;
- }
-
- @Override
- public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
- return null;
- }
-
- @Override
- public boolean isUsingProtocolHandling() {
- return false;
- }
-
- @Override
- public boolean isSameTarget(TransportConfiguration... configs) {
- return false;
- }
- };
- }
-
- @Override
- public boolean isClient() {
- return true;
- }
-
- @Override
- public boolean isDestroyed() {
- return false;
- }
-
- @Override
- public void disconnect(boolean criticalError) {
-
- }
-
- @Override
- public void disconnect(String scaleDownNodeID, boolean criticalError) {
-
- }
-
- @Override
- public boolean checkDataReceived() {
- return false;
- }
-
- @Override
- public void flush() {
-
- }
-
- @Override
- public boolean isWritable(ReadyListener callback) {
- return false;
- }
-
- @Override
- public void killMessage(SimpleString nodeID) {
-
- }
-
- @Override
- public boolean isSupportReconnect() {
- return false;
- }
-
- @Override
- public boolean isSupportsFlowControl() {
- return false;
- }
-
- @Override
- public void setSubject(Subject subject) {
-
- }
-
- @Override
- public Subject getSubject() {
- return null;
- }
-
- @Override
- public String getProtocolName() {
- return null;
- }
-
- @Override
- public void setClientID(String cID) {
-
- }
-
- @Override
- public String getClientID() {
- return null;
- }
-
- @Override
- public String getTransportLocalAddress() {
- return null;
- }
-
- @Override
- public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
-
- }
- }
-
}
\ No newline at end of file
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
index 90313e1dfd..69740abff7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -16,12 +16,12 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
+import javax.security.auth.Subject;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
@@ -30,8 +30,6 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import javax.security.auth.Subject;
-
/**
* This is a Server's Connection representation used by ActiveMQ Artemis.
*/
@@ -39,8 +37,6 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
private final AMQPConnectionContext amqpConnection;
- private boolean destroyed = false;
-
private final ProtonProtocolManager manager;
public ActiveMQProtonRemotingConnection(ProtonProtocolManager manager,
@@ -104,17 +100,6 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
callClosingListeners();
internalClose();
-
- }
-
- @Override
- public boolean isClient() {
- return false;
- }
-
- @Override
- public boolean isDestroyed() {
- return destroyed;
}
@Override
@@ -155,11 +140,6 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
getTransportConnection().close();
}
- @Override
- public void killMessage(SimpleString nodeID) {
- //unsupported
- }
-
@Override
public Subject getSubject() {
SASLResult saslResult = amqpConnection.getSASLResult();
@@ -169,6 +149,11 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
return super.getSubject();
}
+ @Override
+ public boolean isSupportsFlowControl() {
+ return true;
+ }
+
/**
* Returns the name of the protocol for this Remoting Connection
*
@@ -188,9 +173,4 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
amqpConnection.open();
}
- @Override
- public String getTransportLocalAddress() {
- return getTransportConnection().getLocalAddress();
- }
-
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index 8a98e69dce..981fa3efbf 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -24,7 +24,6 @@ import java.util.concurrent.FutureTask;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -58,12 +57,7 @@ public class MQTTConnection extends AbstractRemotingConnection {
@Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
- synchronized (failureListeners) {
- for (FailureListener listener : failureListeners) {
- //FIXME(mtaylor) How do we check if the node has failed over?
- listener.connectionFailed(me, false);
- }
- }
+ fail(me);
}
@Override
@@ -88,16 +82,6 @@ public class MQTTConnection extends AbstractRemotingConnection {
disconnect(false);
}
- @Override
- public boolean isClient() {
- return false;
- }
-
- @Override
- public boolean isDestroyed() {
- return destroyed;
- }
-
@Override
public void disconnect(boolean criticalError) {
transportConnection.forceClose();
@@ -129,16 +113,6 @@ public class MQTTConnection extends AbstractRemotingConnection {
return connected;
}
- @Override
- public void killMessage(SimpleString nodeID) {
- //unsupported
- }
-
- @Override
- public boolean isSupportsFlowControl() {
- return false;
- }
-
/**
* Returns the name of the protocol for this Remoting Connection
*
@@ -149,11 +123,6 @@ public class MQTTConnection extends AbstractRemotingConnection {
return MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME + (protocolVersion != null ? protocolVersion : "");
}
- @Override
- public String getTransportLocalAddress() {
- return getTransportConnection().getLocalAddress();
- }
-
public int getReceiveMaximum() {
return receiveMaximum;
}
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 8a1b89c3f7..08edf87b40 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -478,16 +478,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
fail(null, null);
}
- @Override
- public boolean isClient() {
- return false;
- }
-
- @Override
- public boolean isDestroyed() {
- return destroyed;
- }
-
@Override
public void disconnect(boolean criticalError) {
this.disconnect(null, null, criticalError);
@@ -1835,8 +1825,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
@Override
- public void killMessage(SimpleString nodeID) {
- //unsupported
+ public boolean isSupportsFlowControl() {
+ return true;
}
@Override
@@ -1849,11 +1839,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return context != null ? context.getClientId() : null;
}
- @Override
- public String getTransportLocalAddress() {
- return transportConnection.getLocalAddress();
- }
-
public CoreMessageObjectPools getCoreMessageObjectPools() {
return coreMessageObjectPools;
}
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index d2e4b551a9..93cfcda3d2 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp;
-import javax.security.auth.Subject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
@@ -24,14 +23,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@@ -43,14 +40,13 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -63,7 +59,7 @@ import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
import static org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING;
-public final class StompConnection implements RemotingConnection {
+public final class StompConnection extends AbstractRemotingConnection {
private static final Logger logger = Logger.getLogger(StompConnection.class);
@@ -72,31 +68,19 @@ public final class StompConnection implements RemotingConnection {
private final StompProtocolManager manager;
- private final Connection transportConnection;
-
private String login;
private String passcode;
- private String clientID;
-
//this means login is valid. (stomp connection ok)
private boolean valid;
private boolean destroyed = false;
- private final long creationTime;
-
private final Acceptor acceptorUsed;
- private final List failureListeners = new CopyOnWriteArrayList<>();
-
- private final List closeListeners = new CopyOnWriteArrayList<>();
-
private final Object failLock = new Object();
- private boolean dataReceived;
-
private final boolean enableMessageID;
private final int minLargeMessageSize;
@@ -115,23 +99,12 @@ public final class StompConnection implements RemotingConnection {
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorFactory executorFactory;
- private Subject subject;
-
- @Override
- public boolean isSupportReconnect() {
- return false;
- }
public VersionedStompFrameHandler getStompVersionHandler() {
return frameHandler;
}
- @Override
- public void scheduledFlush() {
- flush();
- }
-
public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
StompFrame frame = null;
try {
@@ -170,105 +143,22 @@ public final class StompConnection implements RemotingConnection {
final StompProtocolManager manager,
final ScheduledExecutorService scheduledExecutorService,
final ExecutorFactory executorFactory) {
+ super(transportConnection, null);
+
this.scheduledExecutorService = scheduledExecutorService;
this.executorFactory = executorFactory;
- this.transportConnection = transportConnection;
-
this.manager = manager;
this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, executorFactory);
- this.creationTime = System.currentTimeMillis();
-
this.acceptorUsed = acceptorUsed;
this.enableMessageID = ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID_DEPRECATED, false, acceptorUsed.getConfiguration()) || ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID, false, acceptorUsed.getConfiguration());
this.minLargeMessageSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE_DEPRECATED, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, acceptorUsed.getConfiguration()), acceptorUsed.getConfiguration());
}
- @Override
- public void addFailureListener(final FailureListener listener) {
- if (listener == null) {
- throw new IllegalStateException("FailureListener cannot be null");
- }
-
- failureListeners.add(listener);
- }
-
- @Override
- public boolean removeFailureListener(final FailureListener listener) {
- if (listener == null) {
- throw new IllegalStateException("FailureListener cannot be null");
- }
-
- return failureListeners.remove(listener);
- }
-
- @Override
- public void addCloseListener(final CloseListener listener) {
- if (listener == null) {
- throw new IllegalStateException("CloseListener cannot be null");
- }
-
- closeListeners.add(listener);
- }
-
- @Override
- public boolean removeCloseListener(final CloseListener listener) {
- if (listener == null) {
- throw new IllegalStateException("CloseListener cannot be null");
- }
-
- return closeListeners.remove(listener);
- }
-
- @Override
- public List removeCloseListeners() {
- List ret = new ArrayList<>(closeListeners);
-
- closeListeners.clear();
-
- return ret;
- }
-
- @Override
- public List removeFailureListeners() {
- List ret = new ArrayList<>(failureListeners);
-
- failureListeners.clear();
-
- return ret;
- }
-
- @Override
- public void setCloseListeners(List listeners) {
- closeListeners.clear();
-
- closeListeners.addAll(listeners);
- }
-
- @Override
- public void setFailureListeners(final List listeners) {
- failureListeners.clear();
-
- failureListeners.addAll(listeners);
- }
-
- protected synchronized void setDataReceived() {
- dataReceived = true;
- }
-
- @Override
- public synchronized boolean checkDataReceived() {
- boolean res = dataReceived;
-
- dataReceived = false;
-
- return res;
- }
-
// TODO this should take a type - send or receive so it knows whether to check the address or the queue
public void checkDestination(String destination) throws ActiveMQStompException {
if (!manager.destinationExists(destination)) {
@@ -326,11 +216,6 @@ public final class StompConnection implements RemotingConnection {
}
}
- @Override
- public ActiveMQBuffer createTransportBuffer(int size) {
- return ActiveMQBuffers.dynamicBuffer(size);
- }
-
@Override
public void destroy() {
synchronized (failLock) {
@@ -407,10 +292,6 @@ public final class StompConnection implements RemotingConnection {
fail(me);
}
- @Override
- public void flush() {
- }
-
@Override
public List getFailureListeners() {
// we do not return the listeners otherwise the remoting service
@@ -418,38 +299,9 @@ public final class StompConnection implements RemotingConnection {
return Collections.emptyList();
}
- @Override
- public Object getID() {
- return transportConnection.getID();
- }
-
- @Override
- public String getRemoteAddress() {
- return transportConnection.getRemoteAddress();
- }
-
- @Override
- public long getCreationTime() {
- return creationTime;
- }
-
- @Override
- public Connection getTransportConnection() {
- return transportConnection;
- }
-
- @Override
- public boolean isClient() {
- return false;
- }
-
- @Override
- public boolean isDestroyed() {
- return destroyed;
- }
-
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+ super.bufferReceived(connectionID, buffer);
manager.handleBuffer(this, buffer);
}
@@ -469,16 +321,6 @@ public final class StompConnection implements RemotingConnection {
this.passcode = passcode;
}
- @Override
- public void setClientID(String clientID) {
- this.clientID = clientID;
- }
-
- @Override
- public String getClientID() {
- return clientID;
- }
-
public boolean isValid() {
return valid;
}
@@ -494,24 +336,7 @@ public final class StompConnection implements RemotingConnection {
try {
listener.connectionFailed(me, false);
} catch (final Throwable t) {
- // Failure of one listener to execute shouldn't prevent others
- // from
- // executing
- ActiveMQServerLogger.LOGGER.errorCallingFailureListener(t);
- }
- }
- }
-
- private void callClosingListeners() {
- final List listenersClone = new ArrayList<>(closeListeners);
-
- for (final CloseListener listener : listenersClone) {
- try {
- listener.connectionClosed();
- } catch (final Throwable t) {
- // Failure of one listener to execute shouldn't prevent others
- // from
- // executing
+ // Failure of one listener to execute shouldn't prevent others from executing
ActiveMQServerLogger.LOGGER.errorCallingFailureListener(t);
}
}
@@ -828,7 +653,6 @@ public final class StompConnection implements RemotingConnection {
}
ActiveMQStompProtocolLogger.LOGGER.sentErrorToClient(getTransportConnection().getRemoteAddress(), message);
}
-
}
public VersionedStompFrameHandler getFrameHandler() {
@@ -847,26 +671,6 @@ public final class StompConnection implements RemotingConnection {
return manager;
}
- @Override
- public void killMessage(SimpleString nodeID) {
- //unsupported
- }
-
- @Override
- public boolean isSupportsFlowControl() {
- return false;
- }
-
- @Override
- public void setSubject(Subject subject) {
- this.subject = subject;
- }
-
- @Override
- public Subject getSubject() {
- return subject;
- }
-
/**
* Returns the name of the protocol for this Remoting Connection
*
@@ -876,11 +680,4 @@ public final class StompConnection implements RemotingConnection {
public String getProtocolName() {
return StompProtocolManagerFactory.STOMP_PROTOCOL_NAME;
}
-
- @Override
- public String getTransportLocalAddress() {
- // TODO Auto-generated method stub
- return getTransportConnection().getLocalAddress();
- }
-
}
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 7ba235138c..ad77a8a293 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -134,8 +134,6 @@ public class StompProtocolManager extends AbstractProtocolManager listeners) {
-
}
@Override
@@ -98,7 +94,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void setFailureListeners(List listeners) {
-
}
@Override
@@ -108,7 +103,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void fail(ActiveMQException me) {
-
}
@Override
@@ -118,12 +112,10 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
-
}
@Override
public void destroy() {
-
}
@Override
@@ -143,12 +135,10 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void disconnect(boolean criticalError) {
-
}
@Override
public void disconnect(String scaleDownNodeID, boolean criticalError) {
-
}
@Override
@@ -158,7 +148,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void flush() {
-
}
@Override
@@ -168,7 +157,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void killMessage(SimpleString nodeID) {
-
}
@Override
@@ -183,7 +171,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void setSubject(Subject subject) {
-
}
@Override
@@ -198,7 +185,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void setClientID(String cID) {
-
}
@Override
@@ -208,12 +194,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public String getTransportLocalAddress() {
- return "Manaement";
+ return "Management";
}
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
-
}
public SessionCallback callback = new SessionCallback() {
@@ -224,7 +209,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void afterDelivery() throws Exception {
-
}
@Override
@@ -234,12 +218,10 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void sendProducerCreditsMessage(int credits, SimpleString address) {
-
}
@Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
-
}
@Override
@@ -271,7 +253,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
-
}
@Override
@@ -281,7 +262,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
@Override
public void browserFinished(ServerConsumer consumer) {
-
}
};
}