This commit is contained in:
Clebert Suconic 2018-06-11 18:37:37 -04:00
commit c5e33915fd
6 changed files with 185 additions and 9 deletions

View File

@ -45,12 +45,15 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import io.netty.channel.ChannelPipeline;
import org.jboss.logging.Logger;
/**
* A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources
*/
public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, AmqpInterceptor, ActiveMQProtonRemotingConnection> implements NotificationListener {
private static final Logger logger = Logger.getLogger(ProtonProtocolManager.class);
private static final List<String> websocketRegistryNames = Arrays.asList("amqp");
private final List<AmqpInterceptor> incomingInterceptors = new ArrayList<>();
@ -72,6 +75,9 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
private String saslLoginConfigScope = "amqp-sasl-gssapi";
private Long amqpIdleTimeout;
/*
* used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
* the address. This can be changed on the acceptor.
@ -115,6 +121,17 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
return false;
}
public Long getAmqpIdleTimeout() {
return amqpIdleTimeout;
}
public ProtonProtocolManager setAmqpIdleTimeout(Long ttl) {
logger.debug("Setting up " + ttl + " as the connectionTtl");
this.amqpIdleTimeout = ttl;
return this;
}
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor(), server);
@ -124,6 +141,14 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
ttl = server.getConfiguration().getConnectionTTLOverride();
}
if (getAmqpIdleTimeout() != null) {
ttl = getAmqpIdleTimeout().longValue();
}
if (ttl < 0) {
ttl = 0;
}
String id = server.getConfiguration().getName();
boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool(), true, null, null);
@ -136,7 +161,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
connectionCallback.setProtonConnectionDelegate(delegate);
ConnectionEntry entry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl);
// connection entry only understands -1 otherwise we would see disconnects for no reason
ConnectionEntry entry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl <= 0 ? -1 : ttl);
return entry;
}

View File

@ -388,8 +388,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
scheduledPool.schedule(new Runnable() {
@Override
public void run() {
long rescheduleAt = handler.tick(false);
if (rescheduleAt != 0) {
Long rescheduleAt = handler.tick(false);
if (rescheduleAt == null) {
// this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS);
} else if (rescheduleAt != 0) {
scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
}
}

View File

@ -88,7 +88,7 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
public ProtonHandler(Executor flushExecutor, boolean isServer) {
this.flushExecutor = flushExecutor;
this.readyListener = () -> flushExecutor.execute(() -> {
this.readyListener = () -> this.flushExecutor.execute(() -> {
flush();
});
this.creationTime = System.currentTimeMillis();
@ -105,8 +105,17 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
connection.collect(collector);
}
public long tick(boolean firstTick) {
public Long tick(boolean firstTick) {
if (firstTick) {
// the first tick needs to guarantee a lock here
lock.lock();
} else {
if (!lock.tryLock()) {
log.debug("Cannot hold a lock on ProtonHandler for Tick, it will retry shortly");
// if we can't lock the scheduler will retry in a very short period of time instead of holding the lock here
return null;
}
}
try {
if (!firstTick) {
try {
@ -122,7 +131,7 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
transport.close();
connection.setCondition(new ErrorCondition());
}
return 0;
return 0L;
}
return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
} finally {

View File

@ -127,3 +127,32 @@ message for later delivery:
If both annotations are present in the same message then the broker will prefer
the more specific `x-opt-delivery-time` value.
## Configuring AMQP Idle Timeout
It is possible to configure the AMQP Server's IDLE Timeout by setting the property amqpIdleTimeout in milliseconds on the acceptor.
This will make the server to send an AMQP frame open to the client, with your configured timeout / 2.
So, if you configured your AMQP Idle Timeout to be 60000, the server will tell the client to send frames every 30,000 milliseconds.
```xml
<acceptor name="amqp">.... ;amqpIdleTimeout=<configured-timeout>; ..... </acceptor>
```
### Disabling Keep alive checks
if you set amqpIdleTimeout=0 that will tell clients to not sending keep alive packets towards the server. On this case
you will rely on TCP to determine when the socket needs to be closed.
```xml
<acceptor name="amqp">.... ;amqpIdleTimeout=0; ..... </acceptor>
```
This contains a real example for configuring amqpIdleTimeout:
```xml
<acceptor name="amqp">tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300;directDeliver=false;batchDelay=10</acceptor>
```

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -27,19 +30,42 @@ import org.apache.activemq.transport.amqp.client.AmqpConnectionListener;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Test handling of heartbeats requested by the broker.
*/
public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
@RunWith(Parameterized.class)
public class AmqpBrokerRequestedHearbeatsTest extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 1000;
@Parameterized.Parameters(name = "useOverride={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}
@Parameterized.Parameter(0)
public boolean useOverride;
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
if (!useOverride) {
params.put("amqpIdleTimeout", "" + TEST_IDLE_TIMEOUT);
}
}
@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setConnectionTtlCheckInterval(TEST_IDLE_TIMEOUT / 3);
if (useOverride) {
server.getConfiguration().setConnectionTTLOverride(TEST_IDLE_TIMEOUT);
}
}
@Test(timeout = 60000)
public void testBrokerSendsHalfConfiguredIdleTimeout() throws Exception {

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class AmqpNoHearbeatsTest extends AmqpClientTestSupport {
@Parameterized.Parameters(name = "useOverride={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}
@Parameterized.Parameter(0)
public boolean useOverride;
@Override
protected void addConfiguration(ActiveMQServer server) {
if (useOverride) {
server.getConfiguration().setConnectionTTLOverride(0);
} else {
server.getConfiguration().setConnectionTtlCheckInterval(500);
}
}
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
if (!useOverride) {
params.put("amqpIdleTimeout", "0");
}
}
@Test(timeout = 60000)
public void testBrokerSendsHalfConfiguredIdleTimeout() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
assertEquals("idle timeout was not disabled", 0, connection.getTransport().getRemoteIdleTimeout());
}
});
AmqpConnection connection = addConnection(client.connect());
assertNotNull(connection);
connection.getStateInspector().assertValid();
connection.close();
}
}