diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java new file mode 100644 index 0000000000..25b2d97c23 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.client.impl; + +import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.spi.core.remoting.SessionContext; + +public abstract class AbstractProducerCreditsImpl implements ClientProducerCredits { + + protected int pendingCredits; + + private final int windowSize; + + protected volatile boolean closed; + + protected boolean blocked; + + protected final SimpleString address; + + private final ClientSessionInternal session; + + protected int arriving; + + private int refCount; + + protected boolean serverRespondedWithFail; + + protected SessionContext sessionContext; + + public AbstractProducerCreditsImpl(final ClientSessionInternal session, + final SimpleString address, + final int windowSize) { + this.session = session; + + this.address = address; + + this.windowSize = windowSize / 2; + } + + @Override + public SimpleString getAddress() { + return address; + } + + @Override + public void init(SessionContext sessionContext) { + // We initial request twice as many credits as we request in subsequent requests + // This allows the producer to keep sending as more arrive, minimising pauses + checkCredits(windowSize); + + this.sessionContext = sessionContext; + + this.sessionContext.linkFlowControl(address, this); + } + + @Override + public void acquireCredits(final int credits) throws ActiveMQException { + checkCredits(credits); + + actualAcquire(credits); + + afterAcquired(credits); + + } + + protected void afterAcquired(int credits) throws ActiveMQAddressFullException { + // check to see if the blocking mode is FAIL on the server + synchronized (this) { + pendingCredits -= credits; + } + } + + protected abstract void actualAcquire(int credits); + + @Override + public boolean isBlocked() { + return blocked; + } + + @Override + public void receiveFailCredits(final int credits) { + serverRespondedWithFail = true; + // receive credits like normal to keep the sender from blocking + receiveCredits(credits); + } + + + @Override + public void receiveCredits(final int credits) { + synchronized (this) { + arriving -= credits; + } + } + + + @Override + public synchronized void reset() { + // Any pendingCredits credits from before failover won't arrive, so we re-initialise + + int beforeFailure = pendingCredits; + + pendingCredits = 0; + arriving = 0; + + // If we are waiting for more credits than what's configured, then we need to use what we tried before + // otherwise the client may starve as the credit will never arrive + checkCredits(Math.max(windowSize * 2, beforeFailure)); + } + + @Override + public void close() { + // Closing a producer that is blocking should make it return + closed = true; + } + + @Override + public synchronized void incrementRefCount() { + refCount++; + } + + @Override + public synchronized int decrementRefCount() { + return --refCount; + } + + public abstract int getBalance(); + + protected void checkCredits(final int credits) { + int needed = Math.max(credits, windowSize); + + int toRequest = -1; + + synchronized (this) { + if (getBalance() + arriving < needed) { + toRequest = needed - arriving; + + pendingCredits += toRequest; + arriving += toRequest; + } + } + + if (toRequest != -1) { + requestCredits(toRequest); + } + } + + private void requestCredits(final int credits) { + session.sendProducerCreditsMessage(credits, address); + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java new file mode 100644 index 0000000000..a49122f1e7 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.client.impl; + +import org.apache.activemq.artemis.api.core.SimpleString; + +public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl { + + int balance; + + final ClientProducerFlowCallback callback; + + public AsynchronousProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize, + ClientProducerFlowCallback callback) { + super(session, address, windowSize); + balance = windowSize; + this.callback = callback; + } + + @Override + protected synchronized void actualAcquire(int credits) { + synchronized (this) { + balance -= credits; + if (balance <= 0) { + callback.onCreditsFlow(true, this); + } + } + + } + + @Override + public int getBalance() { + return balance; + } + + @Override + public void receiveCredits(int credits) { + synchronized (this) { + super.receiveCredits(credits); + balance += credits; + callback.onCreditsFlow(balance <= 0, this); + } + + } + + + @Override + public void receiveFailCredits(final int credits) { + super.receiveFailCredits(credits); + callback.onCreditsFail(this); + } + + @Override + public void releaseOutstanding() { + synchronized (this) { + balance = 0; + callback.onCreditsFlow(true, this); + } + + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java index 3c10e1a236..bb65f72c7a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java @@ -36,4 +36,8 @@ public interface ClientProducerCreditManager { int creditsMapSize(); int unReferencedCreditsSize(); + + /** This will determine the flow control as asynchronous, + * no actual block should happen instead a callback will be sent whenever blockages change */ + void setCallback(ClientProducerFlowCallback callback); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java index 80b255ffd2..cd2db9cc2c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java @@ -35,12 +35,22 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana private int windowSize; + private ClientProducerFlowCallback callback; + public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) { this.session = session; this.windowSize = windowSize; } + + /** This will determine the flow control as asynchronous, + * no actual block should happen instead a callback will be sent whenever blockages change */ + @Override + public void setCallback(ClientProducerFlowCallback callback) { + this.callback = callback; + } + @Override public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon, @@ -56,7 +66,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana if (credits == null) { // Doesn't need to be fair since session is single threaded - credits = new ClientProducerCreditsImpl(session, address, windowSize); + credits = build(address); needInit = true; producerCredits.put(address, credits); @@ -83,6 +93,14 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana } } + private ClientProducerCredits build(SimpleString address) { + if (callback != null) { + return new AsynchronousProducerCreditsImpl(session, address, windowSize, callback); + } else { + return new ClientProducerCreditsImpl(session, address, windowSize); + } + } + @Override public synchronized void returnCredits(final SimpleString address) { ClientProducerCredits credits = producerCredits.get(address); @@ -210,6 +228,10 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana public void releaseOutstanding() { } + @Override + public SimpleString getAddress() { + return SimpleString.toSimpleString(""); + } } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java index a97df92ee2..321fda5e89 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.client.impl; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; public interface ClientProducerCredits { @@ -40,4 +41,6 @@ public interface ClientProducerCredits { int decrementRefCount(); void releaseOutstanding(); + + SimpleString getAddress(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java index 75543fd9ec..41a08c9d4d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java @@ -19,96 +19,33 @@ package org.apache.activemq.artemis.core.client.impl; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; -import org.apache.activemq.artemis.spi.core.remoting.SessionContext; -public class ClientProducerCreditsImpl implements ClientProducerCredits { +public class ClientProducerCreditsImpl extends AbstractProducerCreditsImpl { + private final Semaphore semaphore; - private final int windowSize; + public ClientProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize) { + super(session, address, windowSize); - private volatile boolean closed; - - private boolean blocked; - - private final SimpleString address; - - private final ClientSessionInternal session; - - private int pendingCredits; - - private int arriving; - - private int refCount; - - private boolean serverRespondedWithFail; - - private SessionContext sessionContext; - - public ClientProducerCreditsImpl(final ClientSessionInternal session, - final SimpleString address, - final int windowSize) { - this.session = session; - - this.address = address; - - this.windowSize = windowSize / 2; // Doesn't need to be fair since session is single threaded - semaphore = new Semaphore(0, false); + } - @Override - public void init(SessionContext sessionContext) { - // We initial request twice as many credits as we request in subsequent requests - // This allows the producer to keep sending as more arrive, minimising pauses - checkCredits(windowSize); - - this.sessionContext = sessionContext; - - this.sessionContext.linkFlowControl(address, this); - } @Override - public void acquireCredits(final int credits) throws ActiveMQException { - checkCredits(credits); - - boolean tryAcquire; - - synchronized (this) { - tryAcquire = semaphore.tryAcquire(credits); - } - - if (!tryAcquire) { - if (!closed) { - this.blocked = true; - try { - while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) { - // I'm using string concatenation here in case address is null - // better getting a "null" string than a NPE - ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address); - } - } catch (InterruptedException interrupted) { - Thread.currentThread().interrupt(); - throw new ActiveMQInterruptedException(interrupted); - } finally { - this.blocked = false; - } - } - } - - synchronized (this) { - pendingCredits -= credits; - } - + protected void afterAcquired(int credits) throws ActiveMQAddressFullException { // check to see if the blocking mode is FAIL on the server synchronized (this) { + super.afterAcquired(credits); + if (serverRespondedWithFail) { serverRespondedWithFail = false; @@ -123,29 +60,30 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits { } @Override - public boolean isBlocked() { - return blocked; - } + protected void actualAcquire(int credits) { - public int getBalance() { - return semaphore.availablePermits(); - } - - @Override - public void receiveCredits(final int credits) { + boolean tryAcquire; synchronized (this) { - arriving -= credits; + tryAcquire = semaphore.tryAcquire(credits); } - semaphore.release(credits); + if (!tryAcquire && !closed) { + this.blocked = true; + try { + while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) { + // I'm using string concatenation here in case address is null + // better getting a "null" string than a NPE + ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address); + } + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + throw new ActiveMQInterruptedException(interrupted); + } finally { + this.blocked = false; + } + } } - @Override - public void receiveFailCredits(final int credits) { - serverRespondedWithFail = true; - // receive credits like normal to keep the sender from blocking - receiveCredits(credits); - } @Override public synchronized void reset() { @@ -153,59 +91,38 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits { semaphore.drainPermits(); - int beforeFailure = pendingCredits; - - pendingCredits = 0; - arriving = 0; - - // If we are waiting for more credits than what's configured, then we need to use what we tried before - // otherwise the client may starve as the credit will never arrive - checkCredits(Math.max(windowSize * 2, beforeFailure)); + super.reset(); } + @Override public void close() { - // Closing a producer that is blocking should make it return - closed = true; + super.close(); + // Closing a producer that is blocking should make it return semaphore.release(Integer.MAX_VALUE / 2); } @Override - public synchronized void incrementRefCount() { - refCount++; + public void receiveCredits(final int credits) { + synchronized (this) { + super.receiveCredits(credits); + } + + semaphore.release(credits); } - @Override - public synchronized int decrementRefCount() { - return --refCount; - } @Override public synchronized void releaseOutstanding() { semaphore.drainPermits(); } - private void checkCredits(final int credits) { - int needed = Math.max(credits, windowSize); - - int toRequest = -1; - - synchronized (this) { - if (semaphore.availablePermits() + arriving < needed) { - toRequest = needed - arriving; - - pendingCredits += toRequest; - arriving += toRequest; - } - } - - if (toRequest != -1) { - requestCredits(toRequest); - } + @Override + public int getBalance() { + return semaphore.availablePermits(); } - private void requestCredits(final int credits) { - session.sendProducerCreditsMessage(credits, address); - } + } + diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java new file mode 100644 index 0000000000..1fc289df48 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.client.impl; + +public interface ClientProducerFlowCallback { + void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits); + + void onCreditsFail(ClientProducerCredits credits); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index d2bcc963cf..721f71717b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -42,7 +42,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; -import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl; +import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits; import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.protocol.core.Channel; @@ -225,7 +225,7 @@ public class ActiveMQSessionContext extends SessionContext { } @Override - public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) { + public void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits) { // nothing to be done here... Flow control here is done on the core side } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 1f15cc6893..7c31597666 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -30,7 +30,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; -import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl; +import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -277,7 +277,7 @@ public abstract class SessionContext { public abstract void cleanup(); - public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits); + public abstract void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits); public abstract boolean isWritable(ReadyListener callback); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 5ecaee61e1..e728d3306e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -31,6 +31,7 @@ package org.apache.activemq.artemis.core.server; * so an INFO message would be 101000 to 101999 */ +import javax.naming.NamingException; import javax.transaction.xa.Xid; import java.io.File; import java.net.SocketAddress; @@ -1291,6 +1292,227 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 222217, value = "Cannot find connector-ref {0}. The cluster-connection {1} will not be deployed.", format = Message.Format.MESSAGE_FORMAT) void connectorRefNotFound(String connectorRef, String clusterConnection); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222218, value = "Server disconnecting: {0}", format = Message.Format.MESSAGE_FORMAT) + void disconnectCritical(String reason, @Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222219, value = "File {0} does not exist", + format = Message.Format.MESSAGE_FORMAT) + void fileDoesNotExist(String path); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222220, value = " Error while cleaning paging on queue {0}", format = Message.Format.MESSAGE_FORMAT) + void errorCleaningPagingOnQueue(@Cause Exception e, String queue); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222221, value = "Error while cleaning page, during the commit", format = Message.Format.MESSAGE_FORMAT) + void errorCleaningPagingDuringCommit(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222222, value = "Error while deleting page-complete-record", format = Message.Format.MESSAGE_FORMAT) + void errorDeletingPageCompleteRecord(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222223, value = "Failed to calculate message memory estimate", format = Message.Format.MESSAGE_FORMAT) + void errorCalculateMessageMemoryEstimate(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222224, value = "Failed to calculate scheduled delivery time", format = Message.Format.MESSAGE_FORMAT) + void errorCalculateScheduledDeliveryTime(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222225, value = "Sending unexpected exception to the client", format = Message.Format.MESSAGE_FORMAT) + void sendingUnexpectedExceptionToClient(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222226, value = "Connection configuration is null for connectorName {0}", format = Message.Format.MESSAGE_FORMAT) + void connectionConfigurationIsNull(String connectorName); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222227, value = "Failed to process an event", format = Message.Format.MESSAGE_FORMAT) + void failedToProcessEvent(@Cause NamingException e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222228, value = "Missing replication token on queue", format = Message.Format.MESSAGE_FORMAT) + void missingReplicationTokenOnQueue(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222229, value = "Failed to perform rollback", format = Message.Format.MESSAGE_FORMAT) + void failedToPerformRollback(@Cause IllegalStateException e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222230, value = "Failed to send notification", format = Message.Format.MESSAGE_FORMAT) + void failedToSendNotification(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222231, value = "Failed to flush outstanding data from the connection", format = Message.Format.MESSAGE_FORMAT) + void failedToFlushOutstandingDataFromTheConnection(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222232, value = "Unable to acquire lock", format = Message.Format.MESSAGE_FORMAT) + void unableToAcquireLock(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222233, value = "Unable to destroy connection with session metadata", format = Message.Format.MESSAGE_FORMAT) + void unableDestroyConnectionWithSessionMetadata(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222234, value = "Unable to deactivate a callback", format = Message.Format.MESSAGE_FORMAT) + void unableToDeactiveCallback(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222235, value = "Unable to inject a monitor", format = Message.Format.MESSAGE_FORMAT) + void unableToInjectMonitor(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222236, value = "Unable to flush deliveries", format = Message.Format.MESSAGE_FORMAT) + void unableToFlushDeliveries(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222237, value = "Unable to flush deliveries", format = Message.Format.MESSAGE_FORMAT) + void unableToCancelRedistributor(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222238, value = "Unable to commit transaction", format = Message.Format.MESSAGE_FORMAT) + void unableToCommitTransaction(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222239, value = "Unable to delete Queue status", format = Message.Format.MESSAGE_FORMAT) + void unableToDeleteQueueStatus(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222240, value = "Unable to pause a Queue", format = Message.Format.MESSAGE_FORMAT) + void unableToPauseQueue(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222241, value = "Unable to resume a Queue", format = Message.Format.MESSAGE_FORMAT) + void unableToResumeQueue(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222242, value = "Unable to obtain message priority, using default ", format = Message.Format.MESSAGE_FORMAT) + void unableToGetMessagePriority(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222243, value = "Unable to extract GroupID from message", format = Message.Format.MESSAGE_FORMAT) + void unableToExtractGroupID(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222244, value = "Unable to check if message expired", format = Message.Format.MESSAGE_FORMAT) + void unableToCheckIfMessageExpired(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222245, value = "Unable to perform post acknowledge", format = Message.Format.MESSAGE_FORMAT) + void unableToPerformPostAcknowledge(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222246, value = "Unable to rollback on close", format = Message.Format.MESSAGE_FORMAT) + void unableToRollbackOnClose(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222247, value = "Unable to close consumer", format = Message.Format.MESSAGE_FORMAT) + void unableToCloseConsumer(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222248, value = "Unable to remove consumer", format = Message.Format.MESSAGE_FORMAT) + void unableToRemoveConsumer(@Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222249, value = "Unable to rollback on TX timed out", format = Message.Format.MESSAGE_FORMAT) + void unableToRollbackOnTxTimedOut(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222250, value = "Unable to delete heuristic completion from storage manager", format = Message.Format.MESSAGE_FORMAT) + void unableToDeleteHeuristicCompletion(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222251, value = "Unable to start replication", format = Message.Format.MESSAGE_FORMAT) + void unableToStartReplication(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222252, value = "Unable to calculate file size", format = Message.Format.MESSAGE_FORMAT) + void unableToCalculateFileSize(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222253, value = "Error while syncing data on largeMessageInSync:: {0}", format = Message.Format.MESSAGE_FORMAT) + void errorWhileSyncingData(String target, @Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222254, value = "Invalid record type {0}", format = Message.Format.MESSAGE_FORMAT) + void invalidRecordType(byte type, @Cause Throwable e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222255, value = "Unable to calculate file store usage", format = Message.Format.MESSAGE_FORMAT) + void unableToCalculateFileStoreUsage(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222256, value = "Failed to unregister acceptors", format = Message.Format.MESSAGE_FORMAT) + void failedToUnregisterAcceptors(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222257, value = "Failed to decrement message reference count", format = Message.Format.MESSAGE_FORMAT) + void failedToDecrementMessageReferenceCount(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222258, value = "Error on deleting queue {0}", format = Message.Format.MESSAGE_FORMAT) + void errorOnDeletingQueue(String queueName, @Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222259, value = "Failed to flush the executor", format = Message.Format.MESSAGE_FORMAT) + void failedToFlushExecutor(@Cause InterruptedException e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222260, value = "Failed to perform rollback", format = Message.Format.MESSAGE_FORMAT) + void failedToRollback(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222261, value = "Failed to activate a backup", format = Message.Format.MESSAGE_FORMAT) + void failedToActivateBackup(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222262, value = "Failed to stop cluster manager", format = Message.Format.MESSAGE_FORMAT) + void failedToStopClusterManager(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222263, value = "Failed to stop cluster connection", format = Message.Format.MESSAGE_FORMAT) + void failedToStopClusterConnection(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222264, value = "Failed to process message reference after rollback", format = Message.Format.MESSAGE_FORMAT) + void failedToProcessMessageReferenceAfterRollback(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222265, value = "Failed to finish delivery, unable to lock delivery", format = Message.Format.MESSAGE_FORMAT) + void failedToFinishDelivery(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222266, value = "Failed to send request to the node", format = Message.Format.MESSAGE_FORMAT) + void failedToSendRequestToNode(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222267, value = "Failed to disconnect bindings", format = Message.Format.MESSAGE_FORMAT) + void failedToDisconnectBindings(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222268, value = "Failed to remove a record", format = Message.Format.MESSAGE_FORMAT) + void failedToRemoveRecord(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222269, value = "Please use a fixed value for \"journal-pool-files\". Default changed per https://issues.apache.org/jira/browse/ARTEMIS-1628", format = Message.Format.MESSAGE_FORMAT) + void useFixedValueOnJournalPoolFiles(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222270, value = "Unable to create management notification address: {0}", format = Message.Format.MESSAGE_FORMAT) + void unableToCreateManagementNotificationAddress(SimpleString addressName, @Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 22272, value = "Message ack in prepared tx for queue {0} which does not exist. This ack will be ignored.", format = Message.Format.MESSAGE_FORMAT) + void journalMessageAckMissingQueueInPreparedTX(Long queueID); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 22273, value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT) + void bridgeAddressFull(String addressName, String bridgeName); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 94e28e6ea9..dbbb4eaa45 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; +import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits; +import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; @@ -71,7 +73,7 @@ import org.jboss.logging.Logger; * A Core BridgeImpl */ -public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener { +public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener, ClientProducerFlowCallback { // Constants ----------------------------------------------------- private static final Logger logger = Logger.getLogger(BridgeImpl.class); @@ -122,6 +124,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private final long maxRetryInterval; + private boolean blockedOnFlowControl; + /** * Used when there's a scheduled reconnection */ @@ -254,6 +258,20 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled this.notificationService = notificationService; } + @Override + public void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits) { + this.blockedOnFlowControl = blocked; + if (!blocked) { + queue.deliverAsync(); + } + } + + @Override + public void onCreditsFail(ClientProducerCredits producerCredits) { + ActiveMQServerLogger.LOGGER.bridgeAddressFull("" + producerCredits.getAddress(), "" + this.getName()); + disconnect(); + } + @Override public synchronized void start() throws Exception { if (started) { @@ -536,6 +554,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled queue.deliverAsync(); } + @Override public HandleStatus handle(final MessageReference ref) throws Exception { if (filter != null && !filter.match(ref.getMessage())) { @@ -550,6 +569,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return HandleStatus.BUSY; } + if (blockedOnFlowControl) { + return HandleStatus.BUSY; + } + if (deliveringLargeMessage) { return HandleStatus.BUSY; } @@ -868,6 +891,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } // Session is pre-acknowledge session = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1); + session.getProducerCreditManager().setCallback(this); sessionConsumer = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index 59d35dbc4f..03ceb1a11f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -1077,7 +1077,7 @@ public class BridgeTest extends ActiveMQTestBase { ArrayList staticConnectors = new ArrayList<>(); staticConnectors.add(server1tc.getName()); - BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors); + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors).setProducerWindowSize(1); List bridgeConfigs = new ArrayList<>(); bridgeConfigs.add(bridgeConfiguration);