diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index c9c6d75926..f3fd2e1f69 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -44,6 +44,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final int id;
+
private final SimpleString address;
private final ClientSessionInternal session;
@@ -75,7 +77,10 @@ public class ClientProducerImpl implements ClientProducerInternal {
final boolean autoGroup,
final SimpleString groupID,
final int minLargeMessageSize,
- final SessionContext sessionContext) {
+ final SessionContext sessionContext,
+ final int producerID) {
+ this.id = producerID;
+
this.sessionContext = sessionContext;
this.session = session;
@@ -197,6 +202,11 @@ public class ClientProducerImpl implements ClientProducerInternal {
return producerCredits;
}
+ @Override
+ public int getID() {
+ return id;
+ }
+
private void doCleanup() {
if (address != null) {
session.returnCredits(address);
@@ -204,6 +214,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
session.removeProducer(this);
+ sessionContext.removeProducer(id);
+
closed = true;
}
@@ -290,7 +302,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
theCredits.acquireCredits(creditSize);
- sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
+ sessionContext.sendFullMessage(msgI, sendBlocking, handler, address, id);
}
private void checkClosed() throws ActiveMQException {
@@ -381,7 +393,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
lastChunk = pos >= bodySize;
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
- int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.array(), messageHandler);
+ int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.array(), id, messageHandler);
credits.acquireCredits(creditsUsed);
}
@@ -492,7 +504,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
headerSent = true;
sendInitialLargeMessageHeader(msgI, credits);
}
- int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
+ int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, id, handler);
credits.acquireCredits(creditsSent);
}
} else {
@@ -501,7 +513,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
sendInitialLargeMessageHeader(msgI, credits);
}
- int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
+ int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, id, handler);
credits.acquireCredits(creditsSent);
}
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerInternal.java
index 6190885e3d..963f86fdbe 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerInternal.java
@@ -26,4 +26,6 @@ public interface ClientProducerInternal extends ClientProducer {
void cleanUp();
ClientProducerCredits getProducerCredits();
+
+ int getID();
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index dfb99e65ee..e388673645 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -161,6 +161,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+ private AtomicInteger producerIDs = new AtomicInteger();
+
ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
final String name,
final String username,
@@ -2031,10 +2033,20 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final int maxRate) throws ActiveMQException {
checkClosed();
- ClientProducerInternal producer = new ClientProducerImpl(this, address, maxRate == -1 ? null : new TokenBucketLimiterImpl(maxRate, false), autoCommitSends && blockOnNonDurableSend, autoCommitSends && blockOnDurableSend, autoGroup, groupID == null ? null : new SimpleString(groupID), minLargeMessageSize, sessionContext);
+ ClientProducerInternal producer = new ClientProducerImpl(this,
+ address,
+ maxRate == -1 ? null : new TokenBucketLimiterImpl(maxRate, false),
+ autoCommitSends && blockOnNonDurableSend,
+ autoCommitSends && blockOnDurableSend,
+ autoGroup, groupID == null ? null : new SimpleString(groupID),
+ minLargeMessageSize,
+ sessionContext,
+ producerIDs.incrementAndGet());
addProducer(producer);
+ sessionContext.createProducer(producer);
+
return producer;
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index 43035b0520..3f8699b5b8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -76,6 +76,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
return version < PacketImpl.ARTEMIS_2_18_0_VERSION;
}
+ default boolean isBeforeProducerMetricsChanged() {
+ int version = getChannelVersion();
+ return version < PacketImpl.ARTEMIS_2_28_0_VERSION;
+ }
+
/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 7f248b7403..b864dc1013 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.Channel;
@@ -68,6 +69,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateProducerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
@@ -76,6 +78,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RemoveProducerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
@@ -105,10 +108,12 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -371,6 +376,20 @@ public class ActiveMQSessionContext extends SessionContext {
return remotingConnection.isWritable(callback);
}
+ @Override
+ public void createProducer(ClientProducerInternal producer) {
+ if (!sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
+ sessionChannel.send(new CreateProducerMessage(producer.getID(), producer.getAddress()));
+ }
+ }
+
+ @Override
+ public void removeProducer(int id) {
+ if (!sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
+ sessionChannel.send(new RemoveProducerMessage(id));
+ }
+ }
+
@Override
public ClientConsumerInternal createConsumer(SimpleString queueName,
SimpleString filterString,
@@ -546,15 +565,19 @@ public class ActiveMQSessionContext extends SessionContext {
public void sendFullMessage(ICoreMessage msgI,
boolean sendBlocking,
SendAcknowledgementHandler handler,
- SimpleString defaultAddress) throws ActiveMQException {
+ SimpleString defaultAddress,
+ int senderID) throws ActiveMQException {
final SessionSendMessage packet;
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
} else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
packet = new SessionSendMessage(msgI, sendBlocking, handler);
- } else {
+ } else if (sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
boolean responseRequired = confirmationWindow != -1 || sendBlocking;
packet = new SessionSendMessage_V2(msgI, responseRequired, handler);
+ } else {
+ boolean responseRequired = confirmationWindow != -1 || sendBlocking;
+ packet = new SessionSendMessage_V3(msgI, responseRequired, handler, senderID);
}
if (sendBlocking) {
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
@@ -579,8 +602,9 @@ public class ActiveMQSessionContext extends SessionContext {
boolean lastChunk,
byte[] chunk,
int reconnectID,
+ int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
- return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
+ return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, senderID, messageHandler);
}
@Override
@@ -589,8 +613,9 @@ public class ActiveMQSessionContext extends SessionContext {
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
+ int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
- return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
+ return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, senderID, messageHandler);
}
@Override
@@ -1043,13 +1068,16 @@ public class ActiveMQSessionContext extends SessionContext {
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
+ int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk && sendBlocking;
final SessionSendContinuationMessage chunkPacket;
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
- } else {
+ } else if (sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
+ } else {
+ chunkPacket = new SessionSendContinuationMessage_V3(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, senderID, messageHandler);
}
//perform a weak form of flow control to avoid OOM on tight loops
final CoreRemotingConnection connection = channel.getConnection();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
index 2967c970e9..6b71951cd3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
@@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@@ -423,8 +424,10 @@ public abstract class PacketDecoder implements Serializable {
case SESS_SEND_CONTINUATION: {
if (connection.isVersionBeforeAsyncResponseChange()) {
packet = new SessionSendContinuationMessage();
- } else {
+ } else if (connection.isBeforeProducerMetricsChanged()) {
packet = new SessionSendContinuationMessage_V2();
+ } else {
+ packet = new SessionSendContinuationMessage_V3();
}
break;
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index b83e3ee849..00020faecc 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -47,6 +47,9 @@ public class PacketImpl implements Packet {
// 2.24.0
public static final int ARTEMIS_2_24_0_VERSION = 133;
+ // 2.28.0
+ public static final int ARTEMIS_2_28_0_VERSION = 134;
+
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue.");
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
@@ -293,7 +296,9 @@ public class PacketImpl implements Packet {
public static final byte DISCONNECT_V3 = -19;
+ public static final byte CREATE_PRODUCER = -20;
+ public static final byte REMOVE_PRODUCER = -21;
public PacketImpl(final byte type) {
this.type = type;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateProducerMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateProducerMessage.java
new file mode 100644
index 0000000000..ead9a9594b
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateProducerMessage.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+
+import java.util.Objects;
+
+public class CreateProducerMessage extends PacketImpl {
+ protected int id;
+
+ protected SimpleString address;
+
+ public CreateProducerMessage() {
+ super(PacketImpl.CREATE_PRODUCER);
+ }
+
+ public CreateProducerMessage(int id, SimpleString address) {
+ super(PacketImpl.CREATE_PRODUCER);
+ this.id = id;
+ this.address = address;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public SimpleString getAddress() {
+ return address;
+ }
+
+ public void setAddress(SimpleString address) {
+ this.address = address;
+ }
+
+
+
+ @Override
+ public void encodeRest(final ActiveMQBuffer buffer) {
+ buffer.writeInt(id);
+ buffer.writeNullableSimpleString(address);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ id = buffer.readInt();
+ address = buffer.readNullableSimpleString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ CreateProducerMessage that = (CreateProducerMessage) o;
+ return Objects.equals(id, that.id) && Objects.equals(address, that.address);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), id, address);
+ }
+}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/RemoveProducerMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/RemoveProducerMessage.java
new file mode 100644
index 0000000000..a6f8d7755c
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/RemoveProducerMessage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+
+import java.util.Objects;
+
+public class RemoveProducerMessage extends PacketImpl {
+
+ protected int id;
+
+ public RemoveProducerMessage(int id) {
+ super(PacketImpl.REMOVE_PRODUCER);
+ this.id = id;
+ }
+
+ public RemoveProducerMessage() {
+ super(PacketImpl.REMOVE_PRODUCER);
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ @Override
+ public void encodeRest(final ActiveMQBuffer buffer) {
+ buffer.writeInt(id);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ id = buffer.readInt();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ RemoveProducerMessage that = (RemoveProducerMessage) o;
+ return Objects.equals(id, that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), id);
+ }
+}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
index 1aa521650e..1db5ff7f47 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
@@ -180,4 +180,8 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
public SendAcknowledgementHandler getHandler() {
return handler;
}
+
+ public int getSenderID() {
+ return -1;
+ }
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V3.java
new file mode 100644
index 0000000000..60db0c2d92
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V3.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+/**
+ * A SessionSendContinuationMessage
+ */
+public class SessionSendContinuationMessage_V3 extends SessionSendContinuationMessage_V2 {
+
+ private int senderID;
+
+ public SessionSendContinuationMessage_V3() {
+ super();
+ }
+
+ /**
+ * @param body
+ * @param continues
+ * @param requiresResponse
+ */
+ public SessionSendContinuationMessage_V3(final Message message,
+ final byte[] body,
+ final boolean continues,
+ final boolean requiresResponse,
+ final long messageBodySize,
+ final int senderID,
+ final SendAcknowledgementHandler handler) {
+ super(message, body, continues, requiresResponse, messageBodySize, handler);
+ this.senderID = senderID;
+ }
+
+ @Override
+ public int expectedEncodeSize() {
+ return super.expectedEncodeSize() + DataConstants.SIZE_INT;
+ }
+
+ @Override
+ public void encodeRest(final ActiveMQBuffer buffer) {
+ super.encodeRest(buffer);
+ buffer.writeInt(senderID);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ super.decodeRest(buffer);
+ senderID = buffer.readInt();
+ }
+
+ @Override
+ public int getSenderID() {
+ return senderID;
+ }
+}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index dafcb8fc6d..206af04c77 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -136,4 +136,7 @@ public class SessionSendMessage extends MessagePacket {
return true;
}
+ public int getSenderID() {
+ return -1;
+ }
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V3.java
new file mode 100644
index 0000000000..690b931f62
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage_V3.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class SessionSendMessage_V3 extends SessionSendMessage_V2 {
+
+ private int senderID;
+
+ /** This will be using the CoreMessage because it is meant for the core-protocol */
+ public SessionSendMessage_V3(final ICoreMessage message,
+ final boolean requiresResponse,
+ final SendAcknowledgementHandler handler,
+ final int senderID) {
+ super(message, requiresResponse, handler);
+ this.senderID = senderID;
+ }
+
+ public SessionSendMessage_V3(final CoreMessage message) {
+ super(message);
+ }
+
+ @Override
+ public void encodeRest(ActiveMQBuffer buffer) {
+ super.encodeRest(buffer);
+ buffer.writeInt(senderID);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ super.decodeRest(buffer);
+ senderID = buffer.readInt();
+ }
+
+ @Override
+ protected int fieldsEncodeSize() {
+ return super.fieldsEncodeSize() + DataConstants.SIZE_INT;
+ }
+
+ @Override
+ public int getSenderID() {
+ return senderID;
+ }
+}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index cbe9dd8b54..7c33fff290 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -142,7 +143,8 @@ public abstract class SessionContext {
public abstract void sendFullMessage(ICoreMessage msgI,
boolean sendBlocking,
SendAcknowledgementHandler handler,
- SimpleString defaultAddress) throws ActiveMQException;
+ SimpleString defaultAddress,
+ int senderID) throws ActiveMQException;
/**
* it should return the number of credits (or bytes) used to send this packet
@@ -159,6 +161,7 @@ public abstract class SessionContext {
boolean lastChunk,
byte[] chunk,
int reconnectID,
+ int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException;
public abstract int sendServerLargeMessageChunk(Message msgI,
@@ -166,6 +169,7 @@ public abstract class SessionContext {
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
+ int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException;
public abstract void setSendAcknowledgementHandler(SendAcknowledgementHandler handler);
@@ -385,4 +389,8 @@ public abstract class SessionContext {
public abstract void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits);
public abstract boolean isWritable(ReadyListener callback);
+
+ public abstract void createProducer(ClientProducerInternal producer);
+
+ public abstract void removeProducer(int id);
}
diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties
index 8ca160b89e..7295b14231 100644
--- a/artemis-core-client/src/main/resources/activemq-version.properties
+++ b/artemis-core-client/src/main/resources/activemq-version.properties
@@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionTag=${activemq.version.versionTag}
-activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132,133
+activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132,133,134
diff --git a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js
index 8312e44b53..3da804108e 100644
--- a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js
+++ b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js
@@ -98,7 +98,15 @@ var Artemis;
{name: "Address", visible: true},
{name: "Remote Address", visible: true},
{name: "Local Address", visible: true},
- {name: "Creation Time", visible: true}
+ {name: "Creation Time", visible: true},
+ {name: "Messages in Transit", visible: false},
+ {name: "Messages in Transit Size", visible: false},
+ {name: "Messages Delivered", visible: false},
+ {name: "Messages Delivered Size", visible: false},
+ {name: "Messages Acknowledged", visible: false},
+ {name: "Messages Acknowledged awaiting Commit", visible: false},
+ {name: "Last Delivered Time", visible: false},
+ {name: "Last Acknowledged Time", visible: false}
]
};
@@ -130,7 +138,13 @@ var Artemis;
{id: 'queue', name: 'Queue'},
{id: 'protocol', name: 'Protocol'},
{id: 'localAddress', name: 'Local Address'},
- {id: 'remoteAddress', name: 'Remote Address'}
+ {id: 'remoteAddress', name: 'Remote Address'},
+ {id: 'messagesInTransit', name: 'Messages in Transit'},
+ {id: 'messagesInTransitSize', name: 'Messages in Transit Size'},
+ {id: 'messagesDelivered', name: 'Messages Delivered'},
+ {id: 'messagesDeliveredSize', name: 'Messages Delivered Size'},
+ {id: 'messagesAcknowledged', name: 'Messages Acknowledged'},
+ {id: 'messagesAcknowledgedAwaitingCommit', name: 'Messages Acknowledged awaiting Commit'}
],
operationOptions: [
{id: 'EQUALS', name: 'Equals'},
@@ -181,7 +195,15 @@ var Artemis;
{ header: 'Address', itemField: 'addressName' , htmlTemplate: 'consumers-anchor-column-template', colActionFn: (item) => selectAddress(item.idx) },
{ header: 'Remote Address', itemField: 'remoteAddress' },
{ header: 'Local Address', itemField: 'localAddress' },
- { header: 'Creation Time', itemField: 'creationTime' }
+ { header: 'Creation Time', itemField: 'creationTime' },
+ { header: 'Messages in Transit', itemField: 'messagesInTransit' },
+ { header: 'Messages in Transit Size', itemField: 'messagesInTransitSize' },
+ { header: 'Messages Delivered', itemField: 'messagesDelivered' },
+ { header: 'Messages Delivered Size', itemField: 'messagesDeliveredSize' },
+ { header: 'Messages Acknowledged', itemField: 'messagesAcknowledged' },
+ { header: 'Messages Acknowledged awaiting Commit', itemField: 'messagesAcknowledgedAwaitingCommit' },
+ { header: 'Last Delivered Time', itemField: 'lastDeliveredTime', templateFn: function(value) { return formatTimestamp(value);} },
+ { header: 'Last Acknowledged Time', itemField: 'lastAcknowledgedTime' , templateFn: function(value) { return formatTimestamp(value);} }
];
ctrl.refresh = function () {
@@ -279,6 +301,25 @@ var Artemis;
}
};
+ function formatTimestamp(timestamp) {
+ Artemis.log.info("in timestamp " + timestamp + " " + (typeof timestamp !== "number"));
+ if (isNaN(timestamp) || typeof timestamp !== "number") {
+ Artemis.log.info("returning timestamp " + timestamp + " " + (typeof timestamp !== "number"));
+ return timestamp;
+ }
+ if (timestamp === 0) {
+ return "N/A";
+ }
+ var d = new Date(timestamp);
+ // "yyyy-MM-dd HH:mm:ss"
+ //add 1 to month as getmonth returns the position not the actual month
+ return d.getFullYear() + "-" + pad2(d.getMonth() + 1) + "-" + pad2(d.getDate()) + " " + pad2(d.getHours()) + ":" + pad2(d.getMinutes()) + ":" + pad2(d.getSeconds());
+ }
+ function pad2(value) {
+ return (value < 10 ? '0' : '') + value;
+ }
+
+
ctrl.pagination.setOperation(ctrl.loadOperation);
function onError(response) {
diff --git a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/producers.js b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/producers.js
index 9602336886..a58ebcef84 100644
--- a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/producers.js
+++ b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/producers.js
@@ -72,6 +72,7 @@ var Artemis;
ordering: false,
columns: [
{name: "ID", visible: true},
+ {name: "Name", visible: true},
{name: "Session", visible: true},
{name: "Client ID", visible: true},
{name: "Protocol", visible: true},
@@ -79,7 +80,10 @@ var Artemis;
{name: "Validated User", visible: false},
{name: "Address", visible: true},
{name: "Remote Address", visible: true},
- {name: "Local Address", visible: true}
+ {name: "Local Address", visible: true},
+ {name: "Messages Sent", visible: false},
+ {name: "Messages Sent Size", visible: false},
+ {name: "Last Produced Message ID", visible: false}
]
};
@@ -103,6 +107,7 @@ var Artemis;
ctrl.filter = {
fieldOptions: [
{id: 'id', name: 'ID'},
+ {id: 'name', name: 'Name'},
{id: 'session', name: 'Session'},
{id: 'clientID', name: 'Client ID'},
{id: 'user', name: 'User'},
@@ -140,6 +145,7 @@ var Artemis;
};
ctrl.tableColumns = [
{ header: 'ID', itemField: 'id' },
+ { header: 'Name', itemField: 'name' },
{ header: 'Session', itemField: 'session' , htmlTemplate: 'producers-anchor-column-template', colActionFn: (item) => selectSession(item.idx) },
{ header: 'Client ID', itemField: 'clientID' },
{ header: 'Protocol', itemField: 'protocol' },
@@ -147,7 +153,10 @@ var Artemis;
{ header: 'Validated User', name: 'validatedUser'},
{ header: 'Address', itemField: 'addressName' , htmlTemplate: 'producers-anchor-column-template', colActionFn: (item) => selectAddress(item.idx) },
{ header: 'Remote Address', itemField: 'remoteAddress' },
- { header: 'Local Address', itemField: 'localAddress' }
+ { header: 'Local Address', itemField: 'localAddress' },
+ { header: 'Messages Sent', itemField: 'msgSent'},
+ { header: 'Messages Sent Size', itemField: 'msgSizeSent'},
+ { header: 'Last Produced Message ID', itemField: 'lastProducedMessageID'}
];
ctrl.refresh = function () {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 637cdf10f2..0603688a96 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -41,7 +41,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@@ -202,14 +201,14 @@ public class AMQPSessionCallback implements SessionCallback {
false, // boolean autoCommitAcks,
false, // boolean preAcknowledge,
true, //boolean xa,
- null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
+ null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), false);
} else {
serverSession = manager.getServer().createSession(name, connection.getUser(), connection.getPassword(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
false, // boolean autoCommitSends
false, // boolean autoCommitAcks,
false, // boolean preAcknowledge,
true, //boolean xa,
- null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), connection.getValidatedUser());
+ null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), connection.getValidatedUser(), false);
}
}
@@ -532,7 +531,7 @@ public class AMQPSessionCallback implements SessionCallback {
OperationContext oldContext = recoverContext();
try {
if (invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()) == null) {
- serverSession.send(transaction, message, directDeliver, false, routingContext);
+ serverSession.send(transaction, message, directDeliver, receiver.getName(), false, routingContext);
afterIO(new IOCallback() {
@Override
@@ -744,8 +743,8 @@ public class AMQPSessionCallback implements SessionCallback {
return protonSPI.invokeOutgoingInterceptors(message, connection);
}
- public void addProducer(ServerProducer serverProducer) {
- serverSession.addProducer(serverProducer);
+ public void addProducer(String name, String address) {
+ serverSession.addProducer(name, ProtonProtocolManagerFactory.AMQP_PROTOCOL_NAME, address);
}
public void removeProducer(String name) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index 90c271cfde..b64f41f3f8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -24,8 +24,6 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ServerProducer;
-import org.apache.activemq.artemis.core.server.impl.ServerProducerImpl;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -160,7 +158,6 @@ public class AMQPSessionContext extends ProtonInitializable {
}
public void removeReceiver(Receiver receiver) {
- sessionSPI.removeProducer(receiver.getName());
receivers.remove(receiver);
}
@@ -229,8 +226,7 @@ public class AMQPSessionContext extends ProtonInitializable {
AMQPMirrorControllerTarget protonReceiver = new AMQPMirrorControllerTarget(sessionSPI, connection, this, receiver, server);
protonReceiver.initialize();
receivers.put(receiver, protonReceiver);
- ServerProducer serverProducer = new ServerProducerImpl(receiver.getName(), "AMQP", receiver.getTarget().getAddress());
- sessionSPI.addProducer(serverProducer);
+ sessionSPI.addProducer(receiver.getName(), receiver.getTarget().getAddress());
receiver.setContext(protonReceiver);
HashMap brokerIDProperties = new HashMap<>();
brokerIDProperties.put(AMQPMirrorControllerSource.BROKER_ID, server.getNodeID().toString());
@@ -255,8 +251,7 @@ public class AMQPSessionContext extends ProtonInitializable {
ProtonServerReceiverContext protonReceiver = new ProtonServerReceiverContext(sessionSPI, connection, this, receiver);
protonReceiver.initialize();
receivers.put(receiver, protonReceiver);
- ServerProducer serverProducer = new ServerProducerImpl(receiver.getName(), "AMQP", receiver.getTarget().getAddress());
- sessionSPI.addProducer(serverProducer);
+ sessionSPI.addProducer(receiver.getName(), receiver.getTarget().getAddress());
receiver.setContext(protonReceiver);
connection.runNow(() -> {
receiver.open();
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 199e8c2d95..33e87bd5ed 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -270,6 +270,7 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver {
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
super.close(remoteLinkClose);
+ sessionSPI.removeProducer(receiver.getName());
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
try {
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index fc6b5fd91b..fe1b9c50c6 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -169,7 +169,9 @@ public class MQTTConnectionManager {
MQTTUtil.SESSION_AUTO_CREATE_QUEUE,
server.newOperationContext(),
session.getProtocolManager().getPrefixes(),
- session.getProtocolManager().getSecurityDomain(), validatedUser);
+ session.getProtocolManager().getSecurityDomain(),
+ validatedUser,
+ false);
return (ServerSessionImpl) serverSession;
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index c65cf2186b..0059140a31 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -42,8 +42,10 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectException;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
@@ -72,6 +74,10 @@ public class MQTTPublishManager {
private SimpleString managementAddress;
+ private final String senderName = UUIDGenerator.getInstance().generateUUID().toString();
+
+ private boolean createProducer = true;
+
private ServerConsumer managementConsumer;
private MQTTSession session;
@@ -173,6 +179,10 @@ public class MQTTPublishManager {
*/
void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception {
synchronized (lock) {
+ if (createProducer) {
+ session.getServerSession().addProducer(senderName, MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME, ServerProducer.ANONYMOUS);
+ createProducer = false;
+ }
String topic = message.variableHeader().topicName();
if (session.getVersion() == MQTTVersion.MQTT_5) {
Integer alias = MQTTUtil.getProperty(Integer.class, message.variableHeader().properties(), TOPIC_ALIAS);
@@ -206,7 +216,6 @@ public class MQTTPublishManager {
if (qos > 0) {
serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
}
-
int messageId = message.variableHeader().packetId();
if (qos < 2 || !state.getPubRec().contains(messageId)) {
if (qos == 2 && !internal)
@@ -217,7 +226,7 @@ public class MQTTPublishManager {
if (session.getServer().getAddressInfo(address) == null && session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses()) {
session.getServerSession().createAddress(address, RoutingType.MULTICAST, true);
}
- session.getServerSession().send(tx, serverMessage, true, false);
+ session.getServerSession().send(tx, serverMessage, true, senderName, false);
if (message.fixedHeader().isRetain()) {
ByteBuf payload = message.payload();
@@ -303,7 +312,7 @@ public class MQTTPublishManager {
if (ref != null) {
Message m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId);
//send the management message via the internal server session to bypass security.
- session.getInternalServerSession().send(m, true);
+ session.getInternalServerSession().send(m, true, senderName);
session.getServerSession().individualAcknowledge(ref.getB(), ref.getA());
releaseFlowControl(ref.getB());
} else {
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 943d9a3cf2..bb22cc48d0 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -837,7 +837,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
private void createInternalSession(ConnectionInfo info) throws Exception {
- internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser);
+ internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser, false);
}
//raise the refCount of context
@@ -1261,6 +1261,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
ss.addProducer(info);
+ getSession(info.getProducerId().getParentId()).getCoreSession().addProducer(
+ info.getProducerId().toString(),
+ OpenWireProtocolManagerFactory.OPENWIRE_PROTOCOL_NAME,
+ info.getDestination() != null ? info.getDestination().getPhysicalName() : null);
}
return null;
@@ -1290,6 +1294,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
synchronized (producerExchanges) {
producerExchanges.remove(id);
+ getSession(id.getParentId()).getCoreSession().removeProducer(id.toString());
}
return null;
}
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 057160ba33..d064d5036c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -362,6 +362,7 @@ public class AMQConsumer {
if (ack.isIndividualAck() || ack.isStandardAck()) {
for (MessageReference ref : ackList) {
ref.acknowledge(transaction, serverConsumer);
+ serverConsumer.metricsAcknowledge(ref, transaction);
removeRolledback(ref);
}
} else if (ack.isPoisonAck()) {
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 28caa08f92..aa7b899b9c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -132,7 +132,7 @@ public class AMQSession implements SessionCallback {
// now
try {
- coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), connection.getValidatedUser());
+ coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), connection.getValidatedUser(), false);
} catch (Exception e) {
logger.error("error init session", e);
}
@@ -392,7 +392,6 @@ public class AMQSession implements SessionCallback {
assert clientId.toString().equals(this.connection.getState().getInfo().getClientId()) : "Session cached clientId must be the same of the connection";
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME, clientId);
-
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
* the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
@@ -438,7 +437,7 @@ public class AMQSession implements SessionCallback {
restoreAutoRead();
}
- getCoreSession().send(coreMsg, false, dest.isTemporary());
+ getCoreSession().send(coreMsg, false, producerInfo.getProducerId().toString(), dest.isTemporary());
if (count == null || count.decrementAndGet() == 0) {
if (sendProducerAck) {
@@ -462,7 +461,7 @@ public class AMQSession implements SessionCallback {
Exception exceptionToSend = null;
try {
- getCoreSession().send(coreMsg, false, dest.isTemporary());
+ getCoreSession().send(coreMsg, false, producerInfo.getProducerId().toString(), dest.isTemporary());
} catch (Exception e) {
logger.debug("Sending exception to the client", e);
exceptionToSend = e;
diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java
index f9c969c192..9071c755a5 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java
@@ -71,7 +71,7 @@ public class OpenWireConnectionTest {
ServerSession serverSession = Mockito.mock(ServerSession.class);
Mockito.when(serverSession.getName()).thenReturn("session");
Mockito.doReturn(serverSession).when(server).createSession(Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean(),
- Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any());
+ Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any(), Mockito.anyBoolean());
OpenWireProtocolManager openWireProtocolManager = new OpenWireProtocolManager(null, server,null, null);
openWireProtocolManager.setSecurityDomain("securityDomain");
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index ab7e004f80..291b595399 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -247,7 +247,7 @@ public class StompProtocolManager extends AbstractProtocolManager= connection.getMinLargeMessageSize()) {
throw BUNDLE.headerTooBig();
@@ -402,7 +417,7 @@ public class StompSession implements SessionCallback {
largeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, bytes.length);
- session.send(largeMessage.toMessage(), direct);
+ session.send(largeMessage.toMessage(), direct, senderName);
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
index 98e966ef6a..205956394e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
@@ -122,7 +122,7 @@ public abstract class AbstractControl extends StandardMBean {
Integer.MAX_VALUE, fakeConnection,
true, true, false,
false, address.toString(), fakeConnection.callback,
- false, new DummyOperationContext(), Collections.emptyMap(), null, validatedUser);
+ false, new DummyOperationContext(), Collections.emptyMap(), null, validatedUser, false);
try {
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
if (headers != null) {
@@ -159,7 +159,7 @@ public abstract class AbstractControl extends StandardMBean {
}
// There's no point on direct delivery using the management thread, use false here
- serverSession.send(message, false);
+ serverSession.send(message, false, null);
return "" + message.getMessageID();
} finally {
try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 25deb59e09..bca62eedfa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.management.impl.view.AddressView;
import org.apache.activemq.artemis.core.management.impl.view.ConnectionView;
+import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerView;
import org.apache.activemq.artemis.core.management.impl.view.ProducerView;
import org.apache.activemq.artemis.core.management.impl.view.QueueView;
@@ -107,6 +108,7 @@ import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
@@ -2609,7 +2611,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
try {
Set producers = new HashSet<>();
for (ServerSession session : server.getSessions()) {
- producers.addAll(session.getServerProducers().values());
+ producers.addAll(session.getServerProducers());
}
ProducerView view = new ProducerView(server);
view.setCollection(producers);
@@ -2767,7 +2769,24 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
private JsonObject toJSONObject(ServerConsumer consumer) throws Exception {
- JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("consumerID", consumer.getID()).add("connectionID", consumer.getConnectionID().toString()).add("sessionID", consumer.getSessionID()).add("queueName", consumer.getQueue().getName().toString()).add("browseOnly", consumer.isBrowseOnly()).add("creationTime", consumer.getCreationTime()).add("deliveringCount", consumer.getDeliveringMessages().size());
+ List deliveringMessages = consumer.getDeliveringMessages();
+ JsonObjectBuilder obj = JsonLoader.createObjectBuilder()
+ .add(ConsumerField.ID.getAlternativeName(), consumer.getID())
+ .add(ConsumerField.CONNECTION.getAlternativeName(), consumer.getConnectionID().toString())
+ .add(ConsumerField.SESSION.getAlternativeName(), consumer.getSessionID())
+ .add(ConsumerField.QUEUE.getAlternativeName(), consumer.getQueue().getName().toString())
+ .add(ConsumerField.BROWSE_ONLY.getName(), consumer.isBrowseOnly())
+ .add(ConsumerField.CREATION_TIME.getName(), consumer.getCreationTime())
+ // deliveringCount is renamed as MESSAGES_IN_TRANSIT but left in json for backward compatibility
+ .add(ConsumerField.MESSAGES_IN_TRANSIT.getAlternativeName(), consumer.getMessagesInTransit())
+ .add(ConsumerField.MESSAGES_IN_TRANSIT.getName(), consumer.getMessagesInTransit())
+ .add(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName(), consumer.getMessagesInTransitSize())
+ .add(ConsumerField.MESSAGES_DELIVERED.getName(), consumer.getMessagesDelivered())
+ .add(ConsumerField.MESSAGES_DELIVERED_SIZE.getName(), consumer.getMessagesDeliveredSize())
+ .add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(), consumer.getMessagesAcknowledged())
+ .add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), consumer.getMessagesAcknowledgedAwaitingCommit())
+ .add(ConsumerField.LAST_DELIVERED_TIME.getName(), consumer.getLastDeliveredTime())
+ .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), consumer.getLastAcknowledgedTime());
if (consumer.getFilter() != null) {
obj.add("filter", consumer.getFilter().getFilterString().toString());
}
@@ -4407,14 +4426,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
- public String[] listTargetAddresses(final String sessionID) {
- ServerSession session = server.getSessionByID(sessionID);
- if (session != null) {
- return session.getTargetAddresses();
- }
- return new String[0];
- }
-
@Override
public void onNotification(org.apache.activemq.artemis.core.server.management.Notification notification) {
if (!(notification.getType() instanceof CoreNotificationType))
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 9bced82671..5b37b6570d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
+import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -1826,9 +1827,20 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
if (consumer instanceof ServerConsumer) {
ServerConsumer serverConsumer = (ServerConsumer) consumer;
-
- JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
-
+ JsonObjectBuilder obj = JsonLoader.createObjectBuilder()
+ .add(ConsumerField.ID.getAlternativeName(), serverConsumer.getID())
+ .add(ConsumerField.CONNECTION.getAlternativeName(), serverConsumer.getConnectionID().toString())
+ .add(ConsumerField.SESSION.getAlternativeName(), serverConsumer.getSessionID())
+ .add(ConsumerField.BROWSE_ONLY.getName(), serverConsumer.isBrowseOnly())
+ .add(ConsumerField.CREATION_TIME.getName(), serverConsumer.getCreationTime())
+ .add(ConsumerField.MESSAGES_IN_TRANSIT.getName(), serverConsumer.getMessagesInTransit())
+ .add(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName(), serverConsumer.getMessagesInTransitSize())
+ .add(ConsumerField.MESSAGES_DELIVERED.getName(), serverConsumer.getMessagesDelivered())
+ .add(ConsumerField.MESSAGES_DELIVERED_SIZE.getName(), serverConsumer.getMessagesDeliveredSize())
+ .add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(), serverConsumer.getMessagesAcknowledged())
+ .add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), serverConsumer.getMessagesAcknowledgedAwaitingCommit())
+ .add(ConsumerField.LAST_DELIVERED_TIME.getName(), serverConsumer.getLastDeliveredTime())
+ .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), serverConsumer.getLastAcknowledgedTime());
jsonArray.add(obj);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
index c0d2cf38f4..c4066a19a5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
@@ -20,9 +20,10 @@ import java.util.Map;
import java.util.TreeMap;
public enum ConsumerField {
- ID("id"),
- SESSION("session"),
- QUEUE("queue"),
+ ID("id", "consumerID"),
+ SESSION("session", "sessionID"),
+ CONNECTION("connection", "connectionID"),
+ QUEUE("queue", "queueName"),
FILTER("filter"),
ADDRESS("address"),
USER("user"),
@@ -32,7 +33,16 @@ public enum ConsumerField {
LOCAL_ADDRESS("localAddress"),
REMOTE_ADDRESS("remoteAddress"),
QUEUE_TYPE("queueType"),
- CREATION_TIME("creationTime");
+ BROWSE_ONLY("browseOnly"),
+ CREATION_TIME("creationTime"),
+ MESSAGES_IN_TRANSIT("messagesInTransit", "deliveringCount"),
+ MESSAGES_IN_TRANSIT_SIZE("messagesInTransitSize"),
+ MESSAGES_DELIVERED("messagesDelivered"),
+ MESSAGES_DELIVERED_SIZE("messagesDeliveredSize"),
+ MESSAGES_ACKNOWLEDGED("messagesAcknowledged"),
+ MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT("messagesAcknowledgedAwaitingCommit"),
+ LAST_DELIVERED_TIME("lastDeliveredTime"),
+ LAST_ACKNOWLEDGED_TIME("lastAcknowledgedTime");
private static final Map lookup = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
@@ -44,12 +54,28 @@ public enum ConsumerField {
private final String name;
+ private final String alternativeName;
+
public String getName() {
return name;
}
+ /**
+ * There is some inconsistency with some json objects returned for consumers because they were hard coded.
+ * This is just to track the differences and provide backward compatibility.
+ * @return the old alternative name
+ */
+ public String getAlternativeName() {
+ return alternativeName;
+ }
+
ConsumerField(String name) {
+ this(name, "");
+ }
+
+ ConsumerField(String name, String alternativeName) {
this.name = name;
+ this.alternativeName = alternativeName;
}
public static ConsumerField valueOfName(String name) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
index 77ffb304ae..97eaf93ee2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.management.impl.view;
+import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.json.JsonObjectBuilder;
import java.util.Date;
@@ -46,7 +47,7 @@ public class ConsumerView extends ActiveMQAbstractView {
@Override
public JsonObjectBuilder toJson(ServerConsumer consumer) {
ServerSession session = server.getSessionByID(consumer.getSessionID());
-
+ ActiveMQServerControlImpl serverControl = server.getActiveMQServerControl();
//if session is not available then consumer is not in valid state - ignore
if (session == null) {
return null;
@@ -71,7 +72,15 @@ public class ConsumerView extends ActiveMQAbstractView {
.add(ConsumerField.ADDRESS.getName(), toString(consumer.getQueueAddress()))
.add(ConsumerField.LOCAL_ADDRESS.getName(), toString(consumer.getConnectionLocalAddress()))
.add(ConsumerField.REMOTE_ADDRESS.getName(), toString(consumer.getConnectionRemoteAddress()))
- .add(ConsumerField.CREATION_TIME.getName(), new Date(consumer.getCreationTime()).toString());
+ .add(ConsumerField.CREATION_TIME.getName(), new Date(consumer.getCreationTime()).toString())
+ .add(ConsumerField.MESSAGES_IN_TRANSIT.getName(), toString(consumer.getMessagesInTransit()))
+ .add(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName(), toString(consumer.getMessagesInTransitSize()))
+ .add(ConsumerField.MESSAGES_DELIVERED.getName(), toString(consumer.getMessagesDelivered()))
+ .add(ConsumerField.MESSAGES_DELIVERED_SIZE.getName(), toString(consumer.getMessagesDeliveredSize()))
+ .add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(), toString(consumer.getMessagesAcknowledged()))
+ .add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), toString(consumer.getMessagesAcknowledgedAwaitingCommit()))
+ .add(ConsumerField.LAST_DELIVERED_TIME.getName(), consumer.getLastDeliveredTime())
+ .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), consumer.getLastAcknowledgedTime());
return obj;
}
@@ -111,6 +120,22 @@ public class ConsumerView extends ActiveMQAbstractView {
return consumer.getConnectionRemoteAddress();
case CREATION_TIME:
return new Date(consumer.getCreationTime());
+ case MESSAGES_IN_TRANSIT:
+ return consumer.getMessagesInTransit();
+ case MESSAGES_IN_TRANSIT_SIZE:
+ return consumer.getMessagesInTransitSize();
+ case MESSAGES_DELIVERED:
+ return consumer.getMessagesDelivered();
+ case MESSAGES_DELIVERED_SIZE:
+ return consumer.getMessagesDeliveredSize();
+ case MESSAGES_ACKNOWLEDGED:
+ return consumer.getMessagesAcknowledged();
+ case MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT:
+ return consumer.getMessagesAcknowledgedAwaitingCommit();
+ case LAST_DELIVERED_TIME:
+ return consumer.getLastDeliveredTime();
+ case LAST_ACKNOWLEDGED_TIME:
+ return consumer.getLastAcknowledgedTime();
default:
throw new IllegalArgumentException("Unsupported field, " + fieldName);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ProducerField.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ProducerField.java
index 87f9cb2ecc..075f2ccf30 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ProducerField.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ProducerField.java
@@ -21,15 +21,20 @@ import java.util.TreeMap;
public enum ProducerField {
ID("id"),
- SESSION("session"),
+ NAME("name"),
+ SESSION("session", "sessionID"),
CONNECTION_ID("connectionID"),
- ADDRESS("address"), USER("user"),
+ ADDRESS("address", "destination"),
+ USER("user"),
VALIDATED_USER("validatedUser"),
PROTOCOL("protocol"),
CLIENT_ID("clientID"),
LOCAL_ADDRESS("localAddress"),
REMOTE_ADDRESS("remoteAddress"),
- CREATION_TIME("creationTime");
+ CREATION_TIME("creationTime"),
+ MESSAGE_SENT("msgSent"),
+ MESSAGE_SENT_SIZE("msgSizeSent"),
+ LAST_PRODUCED_MESSAGE_ID("lastProducedMessageID");
private static final Map lookup = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
@@ -41,12 +46,27 @@ public enum ProducerField {
private final String name;
+ private final String alternativeName;
+
public String getName() {
return name;
}
+ /**
+ * There is some inconsistency with some json objects returned for consumers because they were hard coded.
+ * This is just to track the differences and provide backward compatibility.
+ * @return the old alternative name
+ */
+ public String getAlternativeName() {
+ return alternativeName;
+ }
+
ProducerField(String name) {
+ this(name, "");
+ }
+ ProducerField(String name, String alternativeName) {
this.name = name;
+ this.alternativeName = alternativeName;
}
public static ProducerField valueOfName(String name) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ProducerView.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ProducerView.java
index cfddf7c65d..76556f0ab6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ProducerView.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ProducerView.java
@@ -59,6 +59,7 @@ public class ProducerView extends ActiveMQAbstractView {
JsonObjectBuilder obj = JsonLoader.createObjectBuilder()
.add(ProducerField.ID.getName(), toString(producer.getID()))
+ .add(ProducerField.NAME.getName(), toString(producer.getName()))
.add(ProducerField.SESSION.getName(), toString(session.getName()))
.add(ProducerField.CLIENT_ID.getName(), toString(sessionClientID))
.add(ProducerField.USER.getName(), toString(session.getUsername()))
@@ -67,7 +68,10 @@ public class ProducerView extends ActiveMQAbstractView {
.add(ProducerField.ADDRESS.getName(), toString(producer.getAddress() != null ? producer.getAddress() : session.getDefaultAddress()))
.add(ProducerField.LOCAL_ADDRESS.getName(), toString(session.getRemotingConnection().getTransportConnection().getLocalAddress()))
.add(ProducerField.REMOTE_ADDRESS.getName(), toString(session.getRemotingConnection().getTransportConnection().getRemoteAddress()))
- .add(ProducerField.CREATION_TIME.getName(), toString(producer.getCreationTime()));
+ .add(ProducerField.CREATION_TIME.getName(), toString(producer.getCreationTime()))
+ .add(ProducerField.MESSAGE_SENT.getName(), producer.getMessagesSent())
+ .add(ProducerField.MESSAGE_SENT_SIZE.getName(), producer.getMessagesSentSize())
+ .add(ProducerField.LAST_PRODUCED_MESSAGE_ID.getName(), toString(producer.getLastProducedMessageID()));
return obj;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/predicate/ConsumerFilterPredicate.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/predicate/ConsumerFilterPredicate.java
index b42e5a473a..ceb48eeb7a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/predicate/ConsumerFilterPredicate.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/predicate/ConsumerFilterPredicate.java
@@ -59,6 +59,18 @@ public class ConsumerFilterPredicate extends ActiveMQFilterPredicate poolNullResponseV2;
+ private final IntObjectHashMap producers = new IntObjectHashMap<>();
+
public ServerSessionPacketHandler(final ActiveMQServer server,
final ServerSession session,
final Channel channel) {
@@ -323,7 +334,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_SEND_CONTINUATION: {
SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet;
requiresResponse = message.isRequiresResponse();
- sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
+ int senderID = message.getSenderID();
+ sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues(), senderID);
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
@@ -635,6 +647,28 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
break;
}
+ case CREATE_PRODUCER: {
+ CreateProducerMessage message = (CreateProducerMessage) packet;
+ if (!producers.containsKey(message.getId())) {
+ // this is used to create/destroy the producer so needs to be unique
+ String senderName = PRODUCER_ID_PREFIX + UUIDGenerator.getInstance().generateUUID();
+ producers.put(message.getId(), senderName);
+ session.addProducer(senderName, ActiveMQClient.DEFAULT_CORE_PROTOCOL, message.getAddress() != null ? message.getAddress().toString() : null);
+ } else {
+ ActiveMQServerLogger.LOGGER.producerAlreadyExists(message.getId(),session.getName(), remotingConnection.getRemoteAddress());
+ }
+ break;
+ }
+ case REMOVE_PRODUCER: {
+ RemoveProducerMessage message = (RemoveProducerMessage) packet;
+ String remove = producers.remove(message.getId());
+ if (remove != null) {
+ session.removeProducer(remove);
+ } else {
+ ActiveMQServerLogger.LOGGER.producerDoesNotExist(message.getId(), session.getName(), remotingConnection.getRemoteAddress());
+ }
+ break;
+ }
}
} catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
@@ -747,7 +781,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
try {
final SessionSendMessage message = (SessionSendMessage) packet;
requiresResponse = message.isRequiresResponse();
- this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage(), storageManager), this.direct);
+ this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage(), storageManager), this.direct, producers.get(message.getSenderID()));
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
@@ -1049,7 +1083,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private void sendContinuations(final int packetSize,
final long messageBodySize,
final byte[] body,
- final boolean continues) throws Exception {
+ final boolean continues,
+ final int senderID) throws Exception {
synchronized (largeMessageLock) {
if (currentLargeMessage == null) {
@@ -1071,7 +1106,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
LargeServerMessage message = currentLargeMessage;
currentLargeMessage.setStorageManager(storageManager);
currentLargeMessage = null;
- session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, false);
+ session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, producers.get(senderID), false);
}
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 64dd56d196..28c53c4602 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -45,9 +45,7 @@ import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.ServerProducerImpl;
import org.apache.activemq.artemis.core.version.Version;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.slf4j.Logger;
@@ -183,9 +181,8 @@ public class ActiveMQPacketHandler implements ChannelHandler {
Map routingTypeMap = protocolManager.getPrefixes();
CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection);
- ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap, protocolManager.getSecurityDomain(), validatedUser);
- ServerProducer serverProducer = new ServerProducerImpl(session.getName(), "CORE", request.getDefaultAddress());
- session.addProducer(serverProducer);
+ boolean isLegacyProducer = request.getVersion() < PacketImpl.ARTEMIS_2_28_0_VERSION;
+ ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap, protocolManager.getSecurityDomain(), validatedUser, isLegacyProducer);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, session, channel);
channel.setHandler(handler);
sessionCallback.setSessionHandler(handler);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 2f690afa8d..97031c2df6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -349,7 +349,9 @@ public interface ActiveMQServer extends ServiceComponent {
boolean autoCreateQueues,
OperationContext context,
Map prefixes,
- String securityDomain, String validatedUser) throws Exception;
+ String securityDomain,
+ String validatedUser,
+ boolean isLegacyProducer) throws Exception;
/** This is to be used in places where security is bypassed, like internal sessions, broker connections, etc... */
ServerSession createInternalSession(String name,
@@ -364,7 +366,8 @@ public interface ActiveMQServer extends ServiceComponent {
boolean autoCreateQueues,
OperationContext context,
Map prefixes,
- String securityDomain) throws Exception;
+ String securityDomain,
+ boolean isLegacyProducer) throws Exception;
/** should the server rebuild page counters upon startup.
* this will be useful on testing or an embedded broker scenario */
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 2b9699df52..eaec474036 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1220,7 +1220,11 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222308, value = "Unable to listen for incoming fail-back request because {} is null. Ensure the broker has the proper cluster-connection configuration.", level = LogMessage.Level.WARN)
void failBackCheckerFailure(String component);
+ @LogMessage(id = 222309, value = "Trying to remove a producer with ID {} that doesnt exist from session {} on Connection {}.", level = LogMessage.Level.WARN)
+ void producerDoesNotExist(int id, String session, String remoteAddress);
+ @LogMessage(id = 222310, value = "Trying to add a producer with ID {} that already exists to session {} on Connection {}.", level = LogMessage.Level.WARN)
+ void producerAlreadyExists(int id, String session, String remoteAddress);
@LogMessage(id = 224000, value = "Failure in initialisation", level = LogMessage.Level.ERROR)
void initializationError(Throwable e);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerInfo.java
index 23642565b4..3ac36b3ef7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerInfo.java
@@ -62,4 +62,51 @@ public interface ConsumerInfo {
*/
String getConnectionRemoteAddress();
+ /**
+ * Returns how many messages are out for delivery but not yet acknowledged
+ * @return delivering count
+ */
+ int getMessagesInTransit();
+
+ /**
+ * Returns the combined size of all the messages out for delivery but not yet acknowledged
+ * @return the total size of all the messages
+ */
+ long getMessagesInTransitSize();
+
+ /**
+ * Returns The total number of messages sent to a consumer including redeliveries that have been acknowledged
+ * @return the total number of messages delivered.
+ */
+ long getMessagesDelivered();
+
+ /**
+ * Returns the total size of all the messages delivered to the consumer. This includes redelivered messages
+ * @return The total size of all the messages
+ */
+ long getMessagesDeliveredSize();
+
+ /**
+ * Returns the number of messages acknowledged by this consumer since it was created
+ * @return messages acknowledged
+ */
+ long getMessagesAcknowledged();
+
+ /**
+ * Returns the number of acknowledged messages that are awaiting commit in a transaction
+ * @return th eno acknowledgements awaiting commit
+ */
+ int getMessagesAcknowledgedAwaitingCommit();
+
+ /**
+ * Returns the time in milliseconds that the last message was delivered to a consumer
+ * @return the time of the last message delivered
+ */
+ long getLastDeliveredTime();
+
+ /**
+ * Returns the time in milliseconds that the last message was acknowledged by a consumer
+ * @return the time of the last message was acknowledged
+ */
+ long getLastAcknowledgedTime();
}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index e743c0401c..646df6a787 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -110,4 +110,11 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
long getCreationTime();
String getSessionID();
+
+ /**
+ * This is needed when some protocols (OW) handle the acks themselves and need to update the metrics
+ * @param ref the message reference
+ * @param transaction the tx
+ */
+ void metricsAcknowledge(MessageReference ref, Transaction transaction);
}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerProducer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerProducer.java
index 97f8fe52d4..64abf8eed7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerProducer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerProducer.java
@@ -17,7 +17,12 @@
package org.apache.activemq.artemis.core.server;
public interface ServerProducer {
- String getAddress();
+ String ANONYMOUS = "ANONYMOUS";
+
+ long getID();
+
+ String getName();
+ String getAddress();;
String getProtocol();
@@ -29,7 +34,14 @@ public interface ServerProducer {
String getConnectionID();
- String getID();
long getCreationTime();
+
+ Object getLastProducedMessageID();
+
+ long getMessagesSent();
+
+ long getMessagesSentSize();
+
+ void updateMetrics(Object lastProducedMessageID, int encodeSize);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index f238a4925d..2d880f1ec8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import javax.transaction.xa.Xid;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -332,11 +333,13 @@ public interface ServerSession extends SecurityAuth {
RoutingStatus send(Transaction tx,
Message message,
boolean direct,
+ String senderName,
boolean noAutoCreateQueue) throws Exception;
RoutingStatus send(Transaction tx,
Message message,
boolean direct,
+ String senderName,
boolean noAutoCreateQueue,
RoutingContext routingContext) throws Exception;
@@ -345,18 +348,20 @@ public interface ServerSession extends SecurityAuth {
Message msg,
SimpleString originalAddress,
boolean direct,
+ String senderName,
boolean noAutoCreateQueue) throws Exception;
RoutingStatus doSend(Transaction tx,
Message msg,
SimpleString originalAddress,
boolean direct,
+ String senderName,
boolean noAutoCreateQueue,
RoutingContext routingContext) throws Exception;
- RoutingStatus send(Message message, boolean direct, boolean noAutoCreateQueue) throws Exception;
+ RoutingStatus send(Message message, boolean direct, String senderName, boolean noAutoCreateQueue) throws Exception;
- RoutingStatus send(Message message, boolean direct) throws Exception;
+ RoutingStatus send(Message message, boolean direct, String senderName) throws Exception;
void forceConsumerDelivery(long consumerID, long sequence) throws Exception;
@@ -376,8 +381,6 @@ public interface ServerSession extends SecurityAuth {
Map getMetaData();
- String[] getTargetAddresses();
-
/**
* Add all the producers detail to the JSONArray object.
* This is a method to be used by the management layer.
@@ -387,7 +390,6 @@ public interface ServerSession extends SecurityAuth {
*/
void describeProducersInfo(JsonArrayBuilder objs) throws Exception;
- String getLastSentMessageID(String address);
long getCreationTime();
@@ -522,11 +524,11 @@ public interface ServerSession extends SecurityAuth {
Pair> getAddressAndRoutingTypes(SimpleString address,
EnumSet defaultRoutingTypes);
- void addProducer(ServerProducer serverProducer);
+ void addProducer(String name, String protocol, String address);
void removeProducer(String ID);
- Map getServerProducers();
+ Collection getServerProducers();
String getDefaultAddress();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 9e122d670e..2a313ce4a2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1746,7 +1746,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final OperationContext context,
final Map prefixes,
final String securityDomain,
- String validatedUser) throws Exception {
+ String validatedUser,
+ boolean isLegacyProducer) throws Exception {
if (validatedUser == null) {
validatedUser = validateUser(username, password, connection, securityDomain);
}
@@ -1758,7 +1759,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, prefixes);
}
- final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes, securityDomain);
+ final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes, securityDomain, isLegacyProducer);
return session;
}
@@ -1786,8 +1787,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
boolean autoCreateQueues,
OperationContext context,
Map prefixes,
- String securityDomain) throws Exception {
- ServerSessionImpl session = internalCreateSession(name, null, null, null, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes, securityDomain);
+ String securityDomain,
+ boolean isLegacyProducer) throws Exception {
+ ServerSessionImpl session = internalCreateSession(name, null, null, null, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes, securityDomain, isLegacyProducer);
session.disableSecurity();
return session;
}
@@ -1864,14 +1866,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
OperationContext context,
boolean autoCreateQueues,
Map prefixes,
- String securityDomain) throws Exception {
+ String securityDomain,
+ boolean isLegacyProducer) throws Exception {
if (hasBrokerSessionPlugins()) {
callBrokerSessionPlugins(plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes));
}
- ServerSessionImpl session = new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, pagingManager, prefixes, securityDomain);
+ ServerSessionImpl session = new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, pagingManager, prefixes, securityDomain, isLegacyProducer);
sessions.put(name, session);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index e8ed25652b..d75a939336 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -26,7 +26,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Function;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@@ -61,6 +63,8 @@ import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -153,14 +157,14 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private AtomicLong messageConsumedSnapshot = new AtomicLong(0);
- private long acks;
-
private boolean requiresLegacyPrefix = false;
private boolean anycast = false;
private boolean isClosed = false;
+ ServerConsumerMetrics metrics = new ServerConsumerMetrics();
+
public ServerConsumerImpl(final long id,
final ServerSession session,
@@ -355,6 +359,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
return this.session.getName();
}
+ @Override
+ public void metricsAcknowledge(MessageReference ref, Transaction transaction) {
+ metrics.addAcknowledge(ref.getMessage().getEncodeSize(), transaction);
+ }
+
@Override
public List getDeliveringMessages() {
synchronized (lock) {
@@ -441,6 +450,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
deliveringRefs.add(ref);
}
+ metrics.addMessage(ref.getMessage().getEncodeSize());
+
ref.handled();
ref.setConsumerId(this.id);
@@ -466,7 +477,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (preAcknowledge) {
// With pre-ack, we ack *before* sending to the client
ref.getQueue().acknowledge(ref, this);
- acks++;
+ metrics.addAcknowledge(ref.getMessage().getEncodeSize(), null);
}
}
@@ -704,9 +715,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
final List refs = new ArrayList<>(deliveringRefs.size());
MessageReference ref;
while ((ref = deliveringRefs.poll()) != null) {
+ metrics.addAcknowledge(ref.getMessage().getEncodeSize(), tx);
if (performACK) {
ref.acknowledge(tx, this);
-
performACK = false;
} else {
refs.add(ref);
@@ -911,8 +922,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref.acknowledge(tx, this);
ackedRefs.add(ref.getMessageID());
-
- acks++;
+ metrics.addAcknowledge(ref.getMessage().getEncodeSize(), tx);
}
while (ref.getMessageID() != messageID);
@@ -973,11 +983,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
tx.markAsRollbackOnly(ils);
throw ils;
}
-
+ metrics.addAcknowledge(ref.getMessage().getEncodeSize(), tx);
ref.acknowledge(tx, this);
- acks++;
-
if (startedTransaction) {
tx.commit();
}
@@ -1016,7 +1024,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (!failed) {
ref.decrementDeliveryCount();
}
-
+ metrics.addAcknowledge(ref.getMessage().getEncodeSize(), null);
ref.getQueue().cancel(ref, System.currentTimeMillis());
}
@@ -1032,7 +1040,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (ref == null) {
return; // nothing to be done
}
-
+ metrics.addAcknowledge(ref.getMessage().getEncodeSize(), null);
ref.getQueue().sendToDeadLetterAddress(null, ref);
}
@@ -1040,6 +1048,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
public synchronized void backToDelivering(MessageReference reference) {
synchronized (lock) {
deliveringRefs.addFirst(reference);
+ metrics.addMessage(reference.getMessage().getEncodeSize());
}
}
@@ -1059,10 +1068,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
if (deliveringRefs.peek().getMessage().getMessageID() == messageID) {
- return deliveringRefs.poll();
+ MessageReference ref = deliveringRefs.poll();
+ return ref;
}
//slow path in a separate method
- return removeDeliveringRefById(messageID);
+ MessageReference ref = removeDeliveringRefById(messageID);
+ return ref;
}
}
@@ -1114,6 +1125,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
public float getRate() {
float timeSlice = ((System.currentTimeMillis() - consumerRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
+ long acks = metrics.getMessagesAcknowledged();
if (timeSlice == 0) {
messageConsumedSnapshot.getAndSet(acks);
return 0.0f;
@@ -1513,7 +1525,143 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
return this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
}
+ @Override
+ public long getMessagesInTransitSize() {
+ return metrics.getMessagesInTransitSize();
+ }
+
+ @Override
+ public int getMessagesInTransit() {
+ return deliveringRefs.size();
+ }
+
+ @Override
+ public long getLastDeliveredTime() {
+ return metrics.getLastDeliveredTime();
+ }
+
+ @Override
+ public long getLastAcknowledgedTime() {
+ return metrics.getLastAcknowledgedTime();
+ }
+
+ @Override
+ public long getMessagesAcknowledged() {
+ return metrics.getMessagesAcknowledged();
+ }
+
+ @Override
+ public long getMessagesDeliveredSize() {
+ return metrics.getMessagesDeliveredSize();
+ }
+
+ @Override
+ public long getMessagesDelivered() {
+ return metrics.getMessagesDelivered();
+ }
+
+ @Override
+ public int getMessagesAcknowledgedAwaitingCommit() {
+ return metrics.getMessagesAcknowledgedAwaitingCommit();
+ }
+
public SessionCallback getCallback() {
return callback;
}
+
+ static class ServerConsumerMetrics extends TransactionOperationAbstract {
+
+ /**
+ * Since messages can be delivered (incremented) and acknowledged (decremented) at the same time we have to protect
+ * the encode size and make it atomic. The other fields are ok since they are only accessed from a single thread.
+ */
+ private static final AtomicLongFieldUpdater messagesInTransitSizeUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesInTransitSize");
+
+ private volatile long messagesInTransitSize = 0;
+
+ private static final AtomicIntegerFieldUpdater messagesAcknowledgedAwaitingCommitUpdater = AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesAcknowledgedAwaitingCommit");
+
+ private volatile int messagesAcknowledgedAwaitingCommit = 0;
+
+ private static final AtomicLongFieldUpdatermessagesDeliveredSizeUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesDeliveredSize");
+
+ private volatile long messagesDeliveredSize = 0;
+
+ private volatile long lastDeliveredTime = 0;
+
+ private volatile long lastAcknowledgedTime = 0;
+
+ private static final AtomicLongFieldUpdatermessagesDeliveredUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesDelivered");
+
+ private volatile long messagesDelivered = 0;
+
+ private static final AtomicLongFieldUpdater messagesAcknowledgedUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesAcknowledged");
+
+ private volatile long messagesAcknowledged = 0;
+
+
+ public long getMessagesInTransitSize() {
+ return messagesInTransitSizeUpdater.get(this);
+ }
+
+ public long getMessagesDeliveredSize() {
+ return messagesDeliveredSizeUpdater.get(this);
+ }
+
+ public long getLastDeliveredTime() {
+ return lastDeliveredTime;
+ }
+
+ public long getLastAcknowledgedTime() {
+ return lastAcknowledgedTime;
+ }
+
+ public long getMessagesDelivered() {
+ return messagesDeliveredUpdater.get(this);
+ }
+
+ public long getMessagesAcknowledged() {
+ return messagesAcknowledgedUpdater.get(this);
+ }
+
+ public int getMessagesAcknowledgedAwaitingCommit() {
+ return messagesAcknowledgedAwaitingCommitUpdater.get(this);
+ }
+
+ public void addMessage(int encodeSize) {
+ messagesInTransitSizeUpdater.addAndGet(this, encodeSize);
+ messagesDeliveredSizeUpdater.addAndGet(this, encodeSize);
+ messagesDeliveredUpdater.addAndGet(this, 1);
+ lastDeliveredTime = System.currentTimeMillis();
+ }
+
+ public void addAcknowledge(int encodeSize, Transaction tx) {
+ messagesInTransitSizeUpdater.addAndGet(this, -encodeSize);
+ messagesAcknowledgedUpdater.addAndGet(this, 1);
+ lastAcknowledgedTime = System.currentTimeMillis();
+ if (tx != null) {
+ addOperation(tx);
+ messagesAcknowledgedAwaitingCommitUpdater.addAndGet(this, 1);
+ }
+ }
+
+ @Override
+ public void afterCommit(Transaction tx) {
+ messagesAcknowledgedAwaitingCommitUpdater.set(this, 0);
+ }
+
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ messagesAcknowledgedAwaitingCommitUpdater.set(this, 0);
+ }
+
+ public void addOperation(Transaction tx) {
+ Object property = tx.getProperty(TransactionPropertyIndexes.CONSUMER_METRICS_OPERATION);
+ if (property == null) {
+ tx.putProperty(TransactionPropertyIndexes.CONSUMER_METRICS_OPERATION, this);
+ tx.addOperation(this);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerLegacyProducersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerLegacyProducersImpl.java
new file mode 100644
index 0000000000..e1ab46561a
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerLegacyProducersImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.server.ServerProducer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.utils.collections.MaxSizeMap;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ServerLegacyProducersImpl implements ServerProducers {
+
+ private static final int MAX_PRODUCER_METRIC_SIZE = 100;
+
+ protected final Map producers = Collections.synchronizedMap(new MaxSizeMap<>(MAX_PRODUCER_METRIC_SIZE));
+ private final String sessionName;
+
+ private final String connectionID;
+
+ public ServerLegacyProducersImpl(ServerSession session) {
+ this.sessionName = session.getName();
+ this.connectionID = session.getConnectionID() != null ? session.getConnectionID().toString() : null;
+ }
+
+ @Override
+ public Map cloneProducers() {
+ return new HashMap<>(producers);
+ }
+
+ @Override
+ public Collection getServerProducers() {
+ return new ArrayList<>(producers.values());
+ }
+
+ @Override
+ public ServerProducer getServerProducer(String senderName, Message msg, ServerSessionImpl serverSession) {
+ String address = msg.getAddress();
+ String name = sessionName + ":" + address;
+ ServerProducer producer = producers.get(name);
+ if (producer == null) {
+ producer = new ServerProducerImpl(name, "CORE", address);
+ producer.setSessionID(sessionName);
+ producer.setConnectionID(connectionID);
+ producers.put(name, producer);
+ }
+ return producer;
+ }
+
+ @Override
+ public void put(String id, ServerProducer producer) {
+ //never called as we track by address the old way
+ }
+
+ @Override
+ public void remove(String id) {
+ //never called as we track by address the old way
+ }
+
+ @Override
+ public void clear() {
+ producers.clear();
+ }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducerImpl.java
index bc0f910484..cb7eef011d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducerImpl.java
@@ -18,23 +18,50 @@ package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.server.ServerProducer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
public class ServerProducerImpl implements ServerProducer {
- private final String ID;
+
+ private static final AtomicLong PRODUCER_ID_GENERATOR = new AtomicLong();
+
+ private final long ID;
+ private final String name;
private final String protocol;
private final long creationTime;
+ private volatile long messagesSent = 0;
+ private volatile long messagesSentSize = 0;
+ private static final AtomicLongFieldUpdater messagesSentUpdater = AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSent");
+
+ private static final AtomicLongFieldUpdater messagesSentSizeUpdater = AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSentSize");
private final String address;
+
+ private volatile Object lastProducedMessageID;
+
private String sessionID;
+
private String connectionID;
- public ServerProducerImpl(String ID, String protocol, String address) {
- this.ID = ID;
+ public ServerProducerImpl(String name, String protocol, String address) {
+ this.ID = PRODUCER_ID_GENERATOR.incrementAndGet();
+ this.name = name;
this.protocol = protocol;
this.address = address;
this.creationTime = System.currentTimeMillis();
}
+ @Override
+ public long getID() {
+ return ID;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
@Override
public String getAddress() {
return address;
@@ -52,7 +79,6 @@ public class ServerProducerImpl implements ServerProducer {
@Override
public void setConnectionID(String connectionID) {
-
this.connectionID = connectionID;
}
@@ -66,13 +92,30 @@ public class ServerProducerImpl implements ServerProducer {
return connectionID;
}
- @Override
- public String getID() {
- return ID;
- }
-
@Override
public long getCreationTime() {
return creationTime;
}
+
+ @Override
+ public void updateMetrics(Object lastProducedMessageID, int encodeSize) {
+ messagesSentUpdater.addAndGet(this, 1);
+ messagesSentSizeUpdater.getAndAdd(this, encodeSize);
+ this.lastProducedMessageID = lastProducedMessageID;
+ }
+
+ @Override
+ public Object getLastProducedMessageID() {
+ return lastProducedMessageID;
+ }
+
+ @Override
+ public long getMessagesSent() {
+ return messagesSent;
+ }
+
+ @Override
+ public long getMessagesSentSize() {
+ return messagesSentSize;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducers.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducers.java
new file mode 100644
index 0000000000..99f79010fc
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducers.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.server.ServerProducer;
+
+import java.util.Collection;
+import java.util.Map;
+
+public interface ServerProducers {
+ Map cloneProducers();
+
+ Collection getServerProducers();
+
+ ServerProducer getServerProducer(String senderName, Message msg, ServerSessionImpl serverSession);
+
+ void put(String id, ServerProducer producer);
+
+ void remove(String id);
+
+ void clear();
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducersImpl.java
new file mode 100644
index 0000000000..ca8561e326
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducersImpl.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.server.ServerProducer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ServerProducersImpl implements ServerProducers {
+ protected final Map producers = new ConcurrentHashMap<>();
+
+ @Override
+ public Map cloneProducers() {
+ return new HashMap<>(producers);
+ }
+
+ @Override
+ public Collection getServerProducers() {
+ return new ArrayList<>(producers.values());
+ }
+
+ @Override
+ public ServerProducer getServerProducer(String senderName, Message msg, ServerSessionImpl serverSession) {
+ if (senderName != null) {
+ return producers.get(senderName);
+ }
+ return null;
+ }
+
+ @Override
+ public void put(String id, ServerProducer producer) {
+ producers.put(id, producer);
+ }
+
+ @Override
+ public void remove(String id) {
+ producers.remove(id);
+ }
+
+ @Override
+ public void clear() {
+ producers.clear();
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index af27a0e8ad..ce02f88e2f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.impl;
+import org.apache.activemq.artemis.core.management.impl.view.ProducerField;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -25,17 +26,16 @@ import java.security.cert.X509Certificate;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@@ -102,7 +102,6 @@ import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.PrefixUtil;
-import org.apache.activemq.artemis.utils.collections.MaxSizeMap;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,7 +114,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
private boolean securityEnabled = true;
private final String securityDomain;
@@ -140,7 +138,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected final Map consumers = new ConcurrentHashMap<>();
- protected final Map producers = new ConcurrentHashMap<>();
+ protected final ServerProducers serverProducers;
protected Transaction tx;
@@ -184,9 +182,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private final OperationContext context;
- // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
- protected final Map> targetAddressInfos = new MaxSizeMap<>(100);
-
private final long creationTime = System.currentTimeMillis();
// to prevent session from being closed twice.
@@ -228,7 +223,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final OperationContext context,
final PagingManager pagingManager,
final Map prefixes,
- final String securityDomain) throws Exception {
+ final String securityDomain,
+ boolean isLegacyProducer) throws Exception {
this.username = username;
this.password = password;
@@ -289,6 +285,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
sendSessionNotification(CoreNotificationType.SESSION_CREATED);
this.securityDomain = securityDomain;
+ if (isLegacyProducer) {
+ serverProducers = new ServerLegacyProducersImpl(this);
+ } else {
+ serverProducers = new ServerProducersImpl();
+ }
}
// ServerSession implementation ---------------------------------------------------------------------------
@@ -450,7 +451,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
consumers.clear();
- producers.clear();
+ serverProducers.clear();
if (closeables != null) {
for (Closeable closeable : closeables) {
@@ -1186,10 +1187,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
remotingConnection.removeFailureListener(cleaner);
}
-
- if (server.getAddressInfo(unPrefixedQueueName) == null) {
- targetAddressInfos.remove(queueToDelete);
- }
}
@Override
@@ -1790,29 +1787,32 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
- public RoutingStatus send(final Message message, final boolean direct) throws Exception {
- return send(message, direct, false);
+ public RoutingStatus send(final Message message, final boolean direct, final String senderName) throws Exception {
+ return send(message, direct, senderName,false);
}
@Override
public RoutingStatus send(final Message message,
final boolean direct,
+ final String senderName,
boolean noAutoCreateQueue) throws Exception {
- return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
+ return send(getCurrentTransaction(), message, direct, senderName, noAutoCreateQueue);
}
@Override
public synchronized RoutingStatus send(Transaction tx,
Message msg,
final boolean direct,
+ final String senderName,
boolean noAutoCreateQueue) throws Exception {
- return send(tx, msg, direct, noAutoCreateQueue, routingContext);
+ return send(tx, msg, direct, senderName, noAutoCreateQueue, routingContext);
}
@Override
public synchronized RoutingStatus send(Transaction tx,
Message messageParameter,
final boolean direct,
+ final String senderName,
boolean noAutoCreateQueue,
RoutingContext routingContext) throws Exception {
final Message message = LargeServerMessageImpl.checkLargeMessage(messageParameter, storageManager);
@@ -1865,7 +1865,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
result = handleManagementMessage(tx, message, direct);
} else {
- result = doSend(tx, message, address, direct, noAutoCreateQueue, routingContext);
+ result = doSend(tx, message, address, direct, senderName, noAutoCreateQueue, routingContext);
}
if (AuditLogger.isMessageLoggingEnabled()) {
@@ -1975,30 +1975,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return metaData;
}
- @Override
- public String[] getTargetAddresses() {
- Map> copy = cloneTargetAddresses();
- Iterator iter = copy.keySet().iterator();
- int num = copy.keySet().size();
- String[] addresses = new String[num];
- int i = 0;
- while (iter.hasNext()) {
- addresses[i] = iter.next().toString();
- i++;
- }
- return addresses;
- }
-
- @Override
- public String getLastSentMessageID(String address) {
- Pair