ARTEMIS-4483 Avoid log.warn on regular AMQP closing
This commit is contained in:
parent
4a13449056
commit
3cdd6cc672
|
@ -404,7 +404,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
// The exception has to be created in the same thread where it's being called
|
||||
// as to avoid a different stack trace cause
|
||||
final ActiveMQException ex = ActiveMQClientMessageBundle.BUNDLE.channelDisconnected();
|
||||
|
|
|
@ -86,7 +86,7 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
|
|||
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
|
||||
synchronized (this) {
|
||||
if (active) {
|
||||
listenerExecutor.execute(() -> listener.connectionDestroyed(channelId(ctx.channel())));
|
||||
listenerExecutor.execute(() -> listener.connectionDestroyed(channelId(ctx.channel()), true));
|
||||
|
||||
active = false;
|
||||
}
|
||||
|
|
|
@ -92,6 +92,11 @@ public class NettyConnection implements Connection {
|
|||
this.batchingEnabled = batchingEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventLoop getEventLoop() {
|
||||
return channel.eventLoop();
|
||||
}
|
||||
|
||||
private static void waitFor(ChannelPromise promise, long millis) {
|
||||
try {
|
||||
final boolean completed = promise.await(millis);
|
||||
|
@ -237,7 +242,7 @@ public class NettyConnection implements Connection {
|
|||
|
||||
closed = true;
|
||||
|
||||
listener.connectionDestroyed(getID());
|
||||
listener.connectionDestroyed(getID(), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -832,7 +832,7 @@ public class NettyConnector extends AbstractConnector {
|
|||
channelClazz = null;
|
||||
|
||||
for (Connection connection : connections.values()) {
|
||||
listener.connectionDestroyed(connection.getID());
|
||||
listener.connectionDestroyed(connection.getID(), false);
|
||||
}
|
||||
|
||||
connections.clear();
|
||||
|
@ -1231,13 +1231,13 @@ public class NettyConnector extends AbstractConnector {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
if (connections.remove(connectionID) != null) {
|
||||
// Execute on different thread to avoid deadlocks
|
||||
closeExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
listener.connectionDestroyed(connectionID);
|
||||
listener.connectionDestroyed(connectionID, failed);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -232,6 +232,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
|
|||
fail(me, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
fail(new ActiveMQException());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future asyncFail(final ActiveMQException me) {
|
||||
|
||||
|
|
|
@ -133,6 +133,8 @@ public interface RemotingConnection extends BufferHandler {
|
|||
*/
|
||||
void fail(ActiveMQException me);
|
||||
|
||||
void close();
|
||||
|
||||
/** Same thing as fail, but using an executor.
|
||||
* semantic of send here, is asynchrounous.
|
||||
* @param me
|
||||
|
|
|
@ -46,7 +46,7 @@ public interface BaseConnectionLifeCycleListener<ProtocolClass> {
|
|||
*
|
||||
* @param connectionID the connection being destroyed.
|
||||
*/
|
||||
void connectionDestroyed(Object connectionID);
|
||||
void connectionDestroyed(Object connectionID, boolean failed);
|
||||
|
||||
/**
|
||||
* Called when an error occurs on the connection.
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.spi.core.remoting;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.EventLoop;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
|
||||
|
@ -75,6 +76,8 @@ public interface Connection {
|
|||
*/
|
||||
Object getID();
|
||||
|
||||
EventLoop getEventLoop();
|
||||
|
||||
/**
|
||||
* writes the buffer to the connection and if flush is true request to flush the buffer
|
||||
* (and any previous un-flushed ones) into the wire.
|
||||
|
|
|
@ -105,6 +105,28 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (destroyed) {
|
||||
return;
|
||||
}
|
||||
|
||||
destroyed = true;
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
try {
|
||||
logger.debug("Connection regular close. amqpConnection.getHandler().getConnection().getRemoteState() = {}, remoteIP={}", amqpConnection.getHandler().getConnection().getRemoteState(), amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress());
|
||||
} catch (Throwable e) { // just to avoid a possible NPE from the debug statement itself
|
||||
logger.debug(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
amqpConnection.runNow(() -> {
|
||||
callClosingListeners();
|
||||
internalClose();
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
synchronized (this) {
|
||||
|
|
|
@ -62,7 +62,7 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(Object connectionID) {
|
||||
public void connectionDestroyed(Object connectionID, boolean failed) {
|
||||
RemotingConnection connection = connectionMap.remove(connectionID);
|
||||
if (connection != null) {
|
||||
logger.info("Connection {} destroyed", connection.getRemoteAddress());
|
||||
|
|
|
@ -787,7 +787,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(Object connectionID) {
|
||||
public void connectionDestroyed(Object connectionID, boolean failed) {
|
||||
server.getRemotingService().removeConnection(connectionID);
|
||||
redoConnection();
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ public class AMQPBrokerConnectionChannelHandler extends ChannelDuplexHandler {
|
|||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
synchronized (this) {
|
||||
if (active) {
|
||||
listenerExecutor.execute(() -> listener.connectionDestroyed(channelId(ctx.channel())));
|
||||
listenerExecutor.execute(() -> listener.connectionDestroyed(channelId(ctx.channel()), true));
|
||||
super.channelInactive(ctx);
|
||||
active = false;
|
||||
}
|
||||
|
|
|
@ -108,10 +108,10 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(Object connectionID) {
|
||||
public void connectionDestroyed(Object connectionID, boolean failed) {
|
||||
for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
|
||||
if (connection.getConnection() != null && connectionID.equals(connection.getConnection().getID())) {
|
||||
connection.connectionDestroyed(connectionID);
|
||||
connection.connectionDestroyed(connectionID, failed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ import io.netty.buffer.ByteBuf;
|
|||
import io.netty.channel.EventLoop;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.security.CheckType;
|
||||
import org.apache.activemq.artemis.core.security.SecurityAuth;
|
||||
|
@ -50,7 +49,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPSecurity
|
|||
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
|
||||
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExecutorNettyAdapter;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
|
||||
import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
|
||||
|
@ -168,12 +166,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
|
||||
this.scheduledPool = scheduledPool;
|
||||
connectionCallback.setConnection(this);
|
||||
EventLoop nettyExecutor;
|
||||
if (connectionCallback.getTransportConnection() instanceof NettyConnection) {
|
||||
nettyExecutor = ((NettyConnection) connectionCallback.getTransportConnection()).getNettyChannel().eventLoop();
|
||||
} else {
|
||||
nettyExecutor = new ExecutorNettyAdapter(protocolManager.getServer().getExecutorFactory().getExecutor());
|
||||
}
|
||||
EventLoop nettyExecutor = connectionCallback.getTransportConnection().getEventLoop();
|
||||
this.handler = new ProtonHandler(nettyExecutor, protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection && saslClientFactory == null);
|
||||
handler.addEventHandler(this);
|
||||
Transport transport = handler.getTransport();
|
||||
|
|
|
@ -1,229 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.proton.handler;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.ProgressivePromise;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.concurrent.ScheduledFuture;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
|
||||
/** Test cases may supply a simple executor instead of the real Netty Executor
|
||||
* On that case this is a simple adapter for what's needed from these tests.
|
||||
* Not intended to be used in production.
|
||||
*
|
||||
* TODO: This could be refactored out of the main codebase but at a high cost.
|
||||
* We may do it some day if we find an easy way that won't clutter the code too much.
|
||||
* */
|
||||
public class ExecutorNettyAdapter implements EventLoop, AutoCloseable {
|
||||
|
||||
final ArtemisExecutor executor;
|
||||
|
||||
public ExecutorNettyAdapter(ArtemisExecutor executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventLoopGroup parent() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventLoop next() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture register(Channel channel) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture register(ChannelPromise promise) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture register(Channel channel, ChannelPromise promise) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean inEventLoop() {
|
||||
return inEventLoop(Thread.currentThread());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean inEventLoop(Thread thread) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Promise<V> newPromise() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ProgressivePromise<V> newProgressivePromise() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Future<V> newSucceededFuture(V result) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Future<V> newFailedFuture(Throwable cause) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShuttingDown() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<?> shutdownGracefully() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<?> terminationFuture() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Runnable> shutdownNow() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<EventExecutor> iterator() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<?> submit(Runnable task) {
|
||||
execute(task);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Runnable task, T result) {
|
||||
execute(task);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Callable<T> task) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShutdown() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTerminated() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
|
||||
long timeout,
|
||||
TimeUnit unit) throws InterruptedException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
|
||||
long timeout,
|
||||
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
executor.execute(command);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// noop
|
||||
}
|
||||
}
|
|
@ -201,6 +201,10 @@ public class ManagementRemotingConnection implements RemotingConnection {
|
|||
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
public SessionCallback callback = new SessionCallback() {
|
||||
@Override
|
||||
public boolean hasCredits(ServerConsumer consumerID) {
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
|||
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
|
||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||
|
||||
|
@ -159,7 +160,7 @@ public final class InVMAcceptor extends AbstractAcceptor {
|
|||
}
|
||||
|
||||
for (Connection connection : connections.values()) {
|
||||
listener.connectionDestroyed(connection.getID());
|
||||
listener.connectionDestroyed(connection.getID(), true);
|
||||
}
|
||||
|
||||
connections.clear();
|
||||
|
@ -220,7 +221,7 @@ public final class InVMAcceptor extends AbstractAcceptor {
|
|||
public void connect(final String connectionID,
|
||||
final BufferHandler remoteHandler,
|
||||
final InVMConnector connector,
|
||||
final Executor clientExecutor) {
|
||||
final ArtemisExecutor clientExecutor) {
|
||||
if (!started) {
|
||||
throw new IllegalStateException("Acceptor is not started");
|
||||
}
|
||||
|
@ -287,12 +288,12 @@ public final class InVMAcceptor extends AbstractAcceptor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
InVMConnection connection = (InVMConnection) connections.remove(connectionID);
|
||||
|
||||
if (connection != null) {
|
||||
|
||||
listener.connectionDestroyed(connectionID);
|
||||
listener.connectionDestroyed(connectionID, failed);
|
||||
|
||||
// Execute on different thread after all the packets are sent, to avoid deadlocks
|
||||
connection.getExecutor().execute(new Runnable() {
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.EventLoop;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
|
@ -36,6 +37,8 @@ import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
|||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
@ -57,7 +60,7 @@ public class InVMConnection implements Connection {
|
|||
|
||||
private final int serverID;
|
||||
|
||||
private final Executor executor;
|
||||
private final ArtemisExecutor executor;
|
||||
|
||||
private volatile boolean closing;
|
||||
|
||||
|
@ -72,7 +75,7 @@ public class InVMConnection implements Connection {
|
|||
public InVMConnection(final int serverID,
|
||||
final BufferHandler handler,
|
||||
final BaseConnectionLifeCycleListener listener,
|
||||
final Executor executor) {
|
||||
final ArtemisExecutor executor) {
|
||||
this(serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener, executor);
|
||||
}
|
||||
|
||||
|
@ -80,7 +83,7 @@ public class InVMConnection implements Connection {
|
|||
final String id,
|
||||
final BufferHandler handler,
|
||||
final BaseConnectionLifeCycleListener listener,
|
||||
final Executor executor) {
|
||||
final ArtemisExecutor executor) {
|
||||
this(serverID, id, handler, listener, executor, null);
|
||||
}
|
||||
|
||||
|
@ -88,7 +91,7 @@ public class InVMConnection implements Connection {
|
|||
final String id,
|
||||
final BufferHandler handler,
|
||||
final BaseConnectionLifeCycleListener listener,
|
||||
final Executor executor,
|
||||
final ArtemisExecutor executor,
|
||||
final ActiveMQPrincipal defaultActiveMQPrincipal) {
|
||||
|
||||
this.serverID = serverID;
|
||||
|
@ -147,7 +150,7 @@ public class InVMConnection implements Connection {
|
|||
|
||||
synchronized (this) {
|
||||
if (!closed) {
|
||||
listener.connectionDestroyed(id);
|
||||
listener.connectionDestroyed(id, false);
|
||||
|
||||
closed = true;
|
||||
}
|
||||
|
@ -248,6 +251,11 @@ public class InVMConnection implements Connection {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventLoop getEventLoop() {
|
||||
throw new NotImplementedException("Event Loop and Netty is not supported on the inVM Connection.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRemoteAddress() {
|
||||
return "invm:" + serverID;
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener
|
|||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
|
||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -169,7 +170,7 @@ public class InVMConnector extends AbstractConnector {
|
|||
}
|
||||
|
||||
for (Connection connection : connections.values()) {
|
||||
listener.connectionDestroyed(connection.getID());
|
||||
listener.connectionDestroyed(connection.getID(), false);
|
||||
}
|
||||
|
||||
started = false;
|
||||
|
@ -232,7 +233,7 @@ public class InVMConnector extends AbstractConnector {
|
|||
// This may be an injection point for mocks on tests
|
||||
protected Connection internalCreateConnection(final BufferHandler handler,
|
||||
final ClientConnectionLifeCycleListener listener,
|
||||
final Executor serverExecutor) {
|
||||
final ArtemisExecutor serverExecutor) {
|
||||
// No acceptor on a client connection
|
||||
InVMConnection inVMConnection = new InVMConnection(id, handler, listener, serverExecutor);
|
||||
inVMConnection.setEnableBufferPooling(bufferPoolingEnabled);
|
||||
|
@ -267,7 +268,7 @@ public class InVMConnector extends AbstractConnector {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
if (connections.remove(connectionID) != null) {
|
||||
// Close the corresponding connection on the other side
|
||||
acceptor.disconnect((String) connectionID);
|
||||
|
@ -276,7 +277,7 @@ public class InVMConnector extends AbstractConnector {
|
|||
closeExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
listener.connectionDestroyed(connectionID);
|
||||
listener.connectionDestroyed(connectionID, failed);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -794,7 +794,7 @@ public class NettyAcceptor extends AbstractAcceptor {
|
|||
channelClazz = null;
|
||||
|
||||
for (Connection connection : connections.values()) {
|
||||
listener.connectionDestroyed(connection.getID());
|
||||
listener.connectionDestroyed(connection.getID(), true);
|
||||
}
|
||||
|
||||
connections.clear();
|
||||
|
@ -969,9 +969,9 @@ public class NettyAcceptor extends AbstractAcceptor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
if (connections.remove(connectionID) != null) {
|
||||
listener.connectionDestroyed(connectionID);
|
||||
listener.connectionDestroyed(connectionID, failed);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -591,12 +591,16 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Connection removed {} from server {}", connectionID, this.server, new Exception("trace"));
|
||||
}
|
||||
|
||||
issueFailure(connectionID, new ActiveMQRemoteDisconnectException());
|
||||
if (failed) {
|
||||
issueFailure(connectionID, new ActiveMQRemoteDisconnectException());
|
||||
} else {
|
||||
issueClose(connectionID);
|
||||
}
|
||||
}
|
||||
|
||||
private void issueFailure(Object connectionID, ActiveMQException e) {
|
||||
|
@ -619,6 +623,26 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
|
|||
}
|
||||
}
|
||||
|
||||
private void issueClose(Object connectionID) {
|
||||
ConnectionEntry conn = connections.get(connectionID);
|
||||
|
||||
if (conn != null && !conn.connection.isSupportReconnect()) {
|
||||
RemotingConnection removedConnection = removeConnection(connectionID);
|
||||
if (removedConnection != null) {
|
||||
try {
|
||||
if (server.hasBrokerConnectionPlugins()) {
|
||||
server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection));
|
||||
}
|
||||
} catch (ActiveMQException t) {
|
||||
logger.warn("Error executing afterDestroyConnection plugin method: {}", t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
conn.connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
||||
issueFailure(connectionID, me);
|
||||
|
|
|
@ -452,8 +452,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
}
|
||||
}
|
||||
|
||||
consumers.clear();
|
||||
serverProducers.clear();
|
||||
try {
|
||||
if (consumers != null) {
|
||||
consumers.clear();
|
||||
}
|
||||
if (serverProducers != null) {
|
||||
serverProducers.clear();
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
|
||||
|
||||
if (closeables != null) {
|
||||
for (Closeable closeable : closeables) {
|
||||
|
@ -1146,7 +1155,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
}
|
||||
logger.debug("deleting temporary queue {}", bindingName);
|
||||
AddressInfo addressInfo = server.getAddressInfo(binding.getAddress());
|
||||
server.destroyQueue(bindingName, null, false, false, addressInfo == null || addressInfo.isTemporary());
|
||||
try {
|
||||
server.destroyQueue(bindingName, null, false, false, addressInfo == null || addressInfo.isTemporary());
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
if (observer != null) {
|
||||
observer.tempQueueDeleted(bindingName);
|
||||
}
|
||||
|
|
|
@ -198,11 +198,24 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
|
|||
@Override
|
||||
public void afterCreateSession(ServerSession session) throws ActiveMQException {
|
||||
if (logAll || logSessionEvents) {
|
||||
LoggingActiveMQServerPluginLogger.LOGGER.afterCreateSession((session == null ? UNAVAILABLE : session.getName()), (session == null ? UNAVAILABLE : session.getConnectionID()));
|
||||
LoggingActiveMQServerPluginLogger.LOGGER.afterCreateSession((session == null ? UNAVAILABLE : session.getName()), (session == null ? UNAVAILABLE : session.getConnectionID()), getRemoteAddress(session));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String getRemoteAddress(ServerSession session) {
|
||||
if (session == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
RemotingConnection remotingConnection = session.getRemotingConnection();
|
||||
if (remotingConnection == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return remotingConnection.getRemoteAddress();
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a session is closed
|
||||
*
|
||||
|
@ -227,7 +240,7 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
|
|||
@Override
|
||||
public void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
|
||||
if (logAll || logSessionEvents) {
|
||||
LoggingActiveMQServerPluginLogger.LOGGER.afterCloseSession((session == null ? UNAVAILABLE : session.getName()), failed);
|
||||
LoggingActiveMQServerPluginLogger.LOGGER.afterCloseSession((session == null ? UNAVAILABLE : session.getName()), failed, getRemoteAddress(session));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,11 +50,11 @@ public interface LoggingActiveMQServerPluginLogger {
|
|||
@LogMessage(id = 841001, value = "destroyed connection: {}", level = LogMessage.Level.INFO)
|
||||
void afterDestroyConnection(RemotingConnection connection);
|
||||
|
||||
@LogMessage(id = 841002, value = "created session name: {}, session connectionID: {}", level = LogMessage.Level.INFO)
|
||||
void afterCreateSession(String sessionName, Object sesssionConnectionID);
|
||||
@LogMessage(id = 841002, value = "created session name: {}, session connectionID: {}, remote address {}", level = LogMessage.Level.INFO)
|
||||
void afterCreateSession(String sessionName, Object sesssionConnectionID, String remoteAddress);
|
||||
|
||||
@LogMessage(id = 841003, value = "closed session with session name: {}, failed: {}", level = LogMessage.Level.INFO)
|
||||
void afterCloseSession(String sessionName, boolean sesssionConnectionID);
|
||||
@LogMessage(id = 841003, value = "closed session with session name: {}, failed: {}, RemoteAddress: {}", level = LogMessage.Level.INFO)
|
||||
void afterCloseSession(String sessionName, boolean sesssionConnectionID, String remoteAddress);
|
||||
|
||||
@LogMessage(id = 841004, value = "added session metadata for session name : {}, key: {}, data: {}", level = LogMessage.Level.INFO)
|
||||
void afterSessionMetadataAdded(String sessionName, String key, String data);
|
||||
|
|
|
@ -158,11 +158,10 @@ public class ConnectionDroppedTest extends ActiveMQTestBase {
|
|||
|
||||
Assert.assertFalse(loggerHandler.findText("AMQ212037"));
|
||||
|
||||
// TODO: Fix these as part of ARTEMIS-4483
|
||||
/*Assert.assertFalse(loggerHandler.findText("Connection failure"));
|
||||
Assert.assertFalse(loggerHandler.findText("Connection failure"));
|
||||
Assert.assertFalse(loggerHandler.findText("REMOTE_DISCONNECT"));
|
||||
Assert.assertFalse(loggerHandler.findText("AMQ222061"));
|
||||
Assert.assertFalse(loggerHandler.findText("AMQ222107")); */
|
||||
Assert.assertFalse(loggerHandler.findText("AMQ222107"));
|
||||
|
||||
Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
|
||||
Wait.assertEquals(0, server::getConnectionCount, 5000);
|
||||
|
|
|
@ -57,7 +57,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -83,7 +83,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -155,7 +155,7 @@ public class NettyConnectionTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -85,7 +85,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -107,7 +107,7 @@ public class SocksProxyTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -170,7 +170,7 @@ public class SocksProxyTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -214,7 +214,7 @@ public class SocksProxyTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void connectionDestroyed(final Object connectionID) {
|
||||
public void connectionDestroyed(final Object connectionID, boolean failed) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue