diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java new file mode 100644 index 0000000000..25b2d97c23 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.client.impl; + +import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.spi.core.remoting.SessionContext; + +public abstract class AbstractProducerCreditsImpl implements ClientProducerCredits { + + protected int pendingCredits; + + private final int windowSize; + + protected volatile boolean closed; + + protected boolean blocked; + + protected final SimpleString address; + + private final ClientSessionInternal session; + + protected int arriving; + + private int refCount; + + protected boolean serverRespondedWithFail; + + protected SessionContext sessionContext; + + public AbstractProducerCreditsImpl(final ClientSessionInternal session, + final SimpleString address, + final int windowSize) { + this.session = session; + + this.address = address; + + this.windowSize = windowSize / 2; + } + + @Override + public SimpleString getAddress() { + return address; + } + + @Override + public void init(SessionContext sessionContext) { + // We initial request twice as many credits as we request in subsequent requests + // This allows the producer to keep sending as more arrive, minimising pauses + checkCredits(windowSize); + + this.sessionContext = sessionContext; + + this.sessionContext.linkFlowControl(address, this); + } + + @Override + public void acquireCredits(final int credits) throws ActiveMQException { + checkCredits(credits); + + actualAcquire(credits); + + afterAcquired(credits); + + } + + protected void afterAcquired(int credits) throws ActiveMQAddressFullException { + // check to see if the blocking mode is FAIL on the server + synchronized (this) { + pendingCredits -= credits; + } + } + + protected abstract void actualAcquire(int credits); + + @Override + public boolean isBlocked() { + return blocked; + } + + @Override + public void receiveFailCredits(final int credits) { + serverRespondedWithFail = true; + // receive credits like normal to keep the sender from blocking + receiveCredits(credits); + } + + + @Override + public void receiveCredits(final int credits) { + synchronized (this) { + arriving -= credits; + } + } + + + @Override + public synchronized void reset() { + // Any pendingCredits credits from before failover won't arrive, so we re-initialise + + int beforeFailure = pendingCredits; + + pendingCredits = 0; + arriving = 0; + + // If we are waiting for more credits than what's configured, then we need to use what we tried before + // otherwise the client may starve as the credit will never arrive + checkCredits(Math.max(windowSize * 2, beforeFailure)); + } + + @Override + public void close() { + // Closing a producer that is blocking should make it return + closed = true; + } + + @Override + public synchronized void incrementRefCount() { + refCount++; + } + + @Override + public synchronized int decrementRefCount() { + return --refCount; + } + + public abstract int getBalance(); + + protected void checkCredits(final int credits) { + int needed = Math.max(credits, windowSize); + + int toRequest = -1; + + synchronized (this) { + if (getBalance() + arriving < needed) { + toRequest = needed - arriving; + + pendingCredits += toRequest; + arriving += toRequest; + } + } + + if (toRequest != -1) { + requestCredits(toRequest); + } + } + + private void requestCredits(final int credits) { + session.sendProducerCreditsMessage(credits, address); + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java new file mode 100644 index 0000000000..a49122f1e7 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.client.impl; + +import org.apache.activemq.artemis.api.core.SimpleString; + +public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl { + + int balance; + + final ClientProducerFlowCallback callback; + + public AsynchronousProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize, + ClientProducerFlowCallback callback) { + super(session, address, windowSize); + balance = windowSize; + this.callback = callback; + } + + @Override + protected synchronized void actualAcquire(int credits) { + synchronized (this) { + balance -= credits; + if (balance <= 0) { + callback.onCreditsFlow(true, this); + } + } + + } + + @Override + public int getBalance() { + return balance; + } + + @Override + public void receiveCredits(int credits) { + synchronized (this) { + super.receiveCredits(credits); + balance += credits; + callback.onCreditsFlow(balance <= 0, this); + } + + } + + + @Override + public void receiveFailCredits(final int credits) { + super.receiveFailCredits(credits); + callback.onCreditsFail(this); + } + + @Override + public void releaseOutstanding() { + synchronized (this) { + balance = 0; + callback.onCreditsFlow(true, this); + } + + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java index 3c10e1a236..bb65f72c7a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java @@ -36,4 +36,8 @@ public interface ClientProducerCreditManager { int creditsMapSize(); int unReferencedCreditsSize(); + + /** This will determine the flow control as asynchronous, + * no actual block should happen instead a callback will be sent whenever blockages change */ + void setCallback(ClientProducerFlowCallback callback); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java index 80b255ffd2..cd2db9cc2c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java @@ -35,12 +35,22 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana private int windowSize; + private ClientProducerFlowCallback callback; + public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) { this.session = session; this.windowSize = windowSize; } + + /** This will determine the flow control as asynchronous, + * no actual block should happen instead a callback will be sent whenever blockages change */ + @Override + public void setCallback(ClientProducerFlowCallback callback) { + this.callback = callback; + } + @Override public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon, @@ -56,7 +66,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana if (credits == null) { // Doesn't need to be fair since session is single threaded - credits = new ClientProducerCreditsImpl(session, address, windowSize); + credits = build(address); needInit = true; producerCredits.put(address, credits); @@ -83,6 +93,14 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana } } + private ClientProducerCredits build(SimpleString address) { + if (callback != null) { + return new AsynchronousProducerCreditsImpl(session, address, windowSize, callback); + } else { + return new ClientProducerCreditsImpl(session, address, windowSize); + } + } + @Override public synchronized void returnCredits(final SimpleString address) { ClientProducerCredits credits = producerCredits.get(address); @@ -210,6 +228,10 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana public void releaseOutstanding() { } + @Override + public SimpleString getAddress() { + return SimpleString.toSimpleString(""); + } } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java index a97df92ee2..321fda5e89 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.client.impl; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; public interface ClientProducerCredits { @@ -40,4 +41,6 @@ public interface ClientProducerCredits { int decrementRefCount(); void releaseOutstanding(); + + SimpleString getAddress(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java index 75543fd9ec..41a08c9d4d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java @@ -19,96 +19,33 @@ package org.apache.activemq.artemis.core.client.impl; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; -import org.apache.activemq.artemis.spi.core.remoting.SessionContext; -public class ClientProducerCreditsImpl implements ClientProducerCredits { +public class ClientProducerCreditsImpl extends AbstractProducerCreditsImpl { + private final Semaphore semaphore; - private final int windowSize; + public ClientProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize) { + super(session, address, windowSize); - private volatile boolean closed; - - private boolean blocked; - - private final SimpleString address; - - private final ClientSessionInternal session; - - private int pendingCredits; - - private int arriving; - - private int refCount; - - private boolean serverRespondedWithFail; - - private SessionContext sessionContext; - - public ClientProducerCreditsImpl(final ClientSessionInternal session, - final SimpleString address, - final int windowSize) { - this.session = session; - - this.address = address; - - this.windowSize = windowSize / 2; // Doesn't need to be fair since session is single threaded - semaphore = new Semaphore(0, false); + } - @Override - public void init(SessionContext sessionContext) { - // We initial request twice as many credits as we request in subsequent requests - // This allows the producer to keep sending as more arrive, minimising pauses - checkCredits(windowSize); - - this.sessionContext = sessionContext; - - this.sessionContext.linkFlowControl(address, this); - } @Override - public void acquireCredits(final int credits) throws ActiveMQException { - checkCredits(credits); - - boolean tryAcquire; - - synchronized (this) { - tryAcquire = semaphore.tryAcquire(credits); - } - - if (!tryAcquire) { - if (!closed) { - this.blocked = true; - try { - while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) { - // I'm using string concatenation here in case address is null - // better getting a "null" string than a NPE - ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address); - } - } catch (InterruptedException interrupted) { - Thread.currentThread().interrupt(); - throw new ActiveMQInterruptedException(interrupted); - } finally { - this.blocked = false; - } - } - } - - synchronized (this) { - pendingCredits -= credits; - } - + protected void afterAcquired(int credits) throws ActiveMQAddressFullException { // check to see if the blocking mode is FAIL on the server synchronized (this) { + super.afterAcquired(credits); + if (serverRespondedWithFail) { serverRespondedWithFail = false; @@ -123,29 +60,30 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits { } @Override - public boolean isBlocked() { - return blocked; - } + protected void actualAcquire(int credits) { - public int getBalance() { - return semaphore.availablePermits(); - } - - @Override - public void receiveCredits(final int credits) { + boolean tryAcquire; synchronized (this) { - arriving -= credits; + tryAcquire = semaphore.tryAcquire(credits); } - semaphore.release(credits); + if (!tryAcquire && !closed) { + this.blocked = true; + try { + while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) { + // I'm using string concatenation here in case address is null + // better getting a "null" string than a NPE + ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address); + } + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + throw new ActiveMQInterruptedException(interrupted); + } finally { + this.blocked = false; + } + } } - @Override - public void receiveFailCredits(final int credits) { - serverRespondedWithFail = true; - // receive credits like normal to keep the sender from blocking - receiveCredits(credits); - } @Override public synchronized void reset() { @@ -153,59 +91,38 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits { semaphore.drainPermits(); - int beforeFailure = pendingCredits; - - pendingCredits = 0; - arriving = 0; - - // If we are waiting for more credits than what's configured, then we need to use what we tried before - // otherwise the client may starve as the credit will never arrive - checkCredits(Math.max(windowSize * 2, beforeFailure)); + super.reset(); } + @Override public void close() { - // Closing a producer that is blocking should make it return - closed = true; + super.close(); + // Closing a producer that is blocking should make it return semaphore.release(Integer.MAX_VALUE / 2); } @Override - public synchronized void incrementRefCount() { - refCount++; + public void receiveCredits(final int credits) { + synchronized (this) { + super.receiveCredits(credits); + } + + semaphore.release(credits); } - @Override - public synchronized int decrementRefCount() { - return --refCount; - } @Override public synchronized void releaseOutstanding() { semaphore.drainPermits(); } - private void checkCredits(final int credits) { - int needed = Math.max(credits, windowSize); - - int toRequest = -1; - - synchronized (this) { - if (semaphore.availablePermits() + arriving < needed) { - toRequest = needed - arriving; - - pendingCredits += toRequest; - arriving += toRequest; - } - } - - if (toRequest != -1) { - requestCredits(toRequest); - } + @Override + public int getBalance() { + return semaphore.availablePermits(); } - private void requestCredits(final int credits) { - session.sendProducerCreditsMessage(credits, address); - } + } + diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java new file mode 100644 index 0000000000..1fc289df48 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.client.impl; + +public interface ClientProducerFlowCallback { + void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits); + + void onCreditsFail(ClientProducerCredits credits); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 6037925e3d..f4033ec4b1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -47,7 +47,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; -import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl; +import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits; import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.core.Channel; @@ -234,7 +234,7 @@ public class ActiveMQSessionContext extends SessionContext { } @Override - public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) { + public void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits) { // nothing to be done here... Flow control here is done on the core side } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 058e6063d5..65713358d6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -34,7 +34,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; -import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl; +import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.IDGenerator; @@ -342,7 +342,7 @@ public abstract class SessionContext { public abstract void cleanup(); - public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits); + public abstract void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits); public abstract boolean isWritable(ReadyListener callback); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 353403f65d..c3f0a7d704 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1585,6 +1585,10 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 22272, value = "Message ack in prepared tx for queue {0} which does not exist. This ack will be ignored.", format = Message.Format.MESSAGE_FORMAT) void journalMessageAckMissingQueueInPreparedTX(Long queueID); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 22273, value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT) + void bridgeAddressFull(String addressName, String bridgeName); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 9815d37251..01596fd1d1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; +import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits; +import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; @@ -70,7 +72,7 @@ import org.jboss.logging.Logger; * A Core BridgeImpl */ -public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener { +public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener, ClientProducerFlowCallback { // Constants ----------------------------------------------------- private static final Logger logger = Logger.getLogger(BridgeImpl.class); @@ -119,6 +121,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private final long maxRetryInterval; + private boolean blockedOnFlowControl; + /** * Used when there's a scheduled reconnection */ @@ -253,6 +257,20 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled this.notificationService = notificationService; } + @Override + public void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits) { + this.blockedOnFlowControl = blocked; + if (!blocked) { + queue.deliverAsync(); + } + } + + @Override + public void onCreditsFail(ClientProducerCredits producerCredits) { + ActiveMQServerLogger.LOGGER.bridgeAddressFull("" + producerCredits.getAddress(), "" + this.getName()); + disconnect(); + } + @Override public long sequentialID() { return sequentialID; @@ -541,6 +559,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled queue.deliverAsync(); } + @Override public HandleStatus handle(final MessageReference ref) throws Exception { if (filter != null && !filter.match(ref.getMessage())) { @@ -555,6 +574,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return HandleStatus.BUSY; } + if (blockedOnFlowControl) { + return HandleStatus.BUSY; + } + if (deliveringLargeMessage) { return HandleStatus.BUSY; } @@ -876,6 +899,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } // Session is pre-acknowledge session = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1); + session.getProducerCreditManager().setCallback(this); sessionConsumer = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index 219aa15698..ae60a6150a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -1187,7 +1187,7 @@ public class BridgeTest extends ActiveMQTestBase { ArrayList staticConnectors = new ArrayList<>(); staticConnectors.add(server1tc.getName()); - BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors); + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors).setProducerWindowSize(1); List bridgeConfigs = new ArrayList<>(); bridgeConfigs.add(bridgeConfiguration);