ARTEMIS-728 Broker doesn't support unique jms client-id (qpid-jms client)
This commit is contained in:
parent
1f392da88e
commit
406d09d986
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.protocol.proton.plug;
|
package org.apache.activemq.artemis.core.protocol.proton.plug;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -23,19 +25,35 @@ import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||||
import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection;
|
import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection;
|
||||||
import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
|
import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
|
||||||
import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
|
import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
|
||||||
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
|
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
import org.proton.plug.AMQPConnectionCallback;
|
import org.proton.plug.AMQPConnectionCallback;
|
||||||
import org.proton.plug.AMQPConnectionContext;
|
import org.proton.plug.AMQPConnectionContext;
|
||||||
import org.proton.plug.AMQPSessionCallback;
|
import org.proton.plug.AMQPSessionCallback;
|
||||||
|
import org.proton.plug.SASLResult;
|
||||||
import org.proton.plug.ServerSASL;
|
import org.proton.plug.ServerSASL;
|
||||||
|
import org.proton.plug.handler.ExtCapability;
|
||||||
import org.proton.plug.sasl.AnonymousServerSASL;
|
import org.proton.plug.sasl.AnonymousServerSASL;
|
||||||
|
import org.proton.plug.sasl.PlainSASLResult;
|
||||||
|
|
||||||
|
import static org.proton.plug.AmqpSupport.CONTAINER_ID;
|
||||||
|
import static org.proton.plug.AmqpSupport.INVALID_FIELD;
|
||||||
|
import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
|
||||||
|
|
||||||
public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback {
|
public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback {
|
||||||
|
private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
|
||||||
|
|
||||||
private final ProtonProtocolManager manager;
|
private final ProtonProtocolManager manager;
|
||||||
|
|
||||||
|
@ -49,6 +67,8 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
|
||||||
|
|
||||||
private final Executor closeExecutor;
|
private final Executor closeExecutor;
|
||||||
|
|
||||||
|
private ServerSession internalSession;
|
||||||
|
|
||||||
public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager,
|
public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager,
|
||||||
Connection connection,
|
Connection connection,
|
||||||
Executor closeExecutor) {
|
Executor closeExecutor) {
|
||||||
|
@ -85,8 +105,43 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
|
||||||
return supportsAnonymous;
|
return supportsAnonymous;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() throws Exception {
|
||||||
|
//This internal core session is used to represent the connection
|
||||||
|
//in core server. It is used to identify unique clientIDs.
|
||||||
|
//Note the Qpid-JMS client does create a initial session
|
||||||
|
//for each connection. However is comes in as a Begin
|
||||||
|
//After Open. This makes it unusable for this purpose
|
||||||
|
//as we need to decide the uniqueness in response to
|
||||||
|
//Open, and the checking Uniqueness and adding the unique
|
||||||
|
//client-id to server need to be atomic.
|
||||||
|
if (internalSession == null) {
|
||||||
|
SASLResult saslResult = amqpConnection.getSASLResult();
|
||||||
|
String user = null;
|
||||||
|
String passcode = null;
|
||||||
|
if (saslResult != null) {
|
||||||
|
user = saslResult.getUser();
|
||||||
|
if (saslResult instanceof PlainSASLResult) {
|
||||||
|
passcode = ((PlainSASLResult) saslResult).getPassword();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
internalSession = manager.getServer().createSession(createInternalSessionName(), user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonConnectionDelegate, // RemotingConnection remotingConnection,
|
||||||
|
false,
|
||||||
|
false,
|
||||||
|
false,
|
||||||
|
false,
|
||||||
|
null, (SessionCallback) createSessionCallback(this.amqpConnection), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
try {
|
||||||
|
internalSession.close(false);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error("error closing internal session", e);
|
||||||
|
}
|
||||||
connection.close();
|
connection.close();
|
||||||
amqpConnection.close();
|
amqpConnection.close();
|
||||||
}
|
}
|
||||||
|
@ -151,4 +206,28 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
|
||||||
public void sendSASLSupported() {
|
public void sendSASLSupported() {
|
||||||
connection.write(ActiveMQBuffers.wrappedBuffer(new byte[]{'A', 'M', 'Q', 'P', 3, 1, 0, 0}));
|
connection.write(ActiveMQBuffers.wrappedBuffer(new byte[]{'A', 'M', 'Q', 'P', 3, 1, 0, 0}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection, SASLResult saslResult) {
|
||||||
|
String remote = connection.getRemoteContainer();
|
||||||
|
|
||||||
|
if (ExtCapability.needUniqueConnection(connection)) {
|
||||||
|
if (!internalSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, remote)) {
|
||||||
|
//https://issues.apache.org/jira/browse/ARTEMIS-728
|
||||||
|
Map<Symbol, Object> connProp = new HashMap<>();
|
||||||
|
connProp.put(CONNECTION_OPEN_FAILED, "true");
|
||||||
|
connection.setProperties(connProp);
|
||||||
|
connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
|
||||||
|
Map<Symbol, Symbol> info = new HashMap<>();
|
||||||
|
info.put(INVALID_FIELD, CONTAINER_ID);
|
||||||
|
connection.getCondition().setInfo(info);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String createInternalSessionName() {
|
||||||
|
return "amqp:" + UUIDGenerator.getInstance().generateStringUUID();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,12 @@
|
||||||
package org.proton.plug;
|
package org.proton.plug;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
|
|
||||||
public interface AMQPConnectionCallback {
|
public interface AMQPConnectionCallback {
|
||||||
|
|
||||||
|
void init() throws Exception;
|
||||||
|
|
||||||
void close();
|
void close();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -41,4 +44,6 @@ public interface AMQPConnectionCallback {
|
||||||
boolean isSupportsAnonymous();
|
boolean isSupportsAnonymous();
|
||||||
|
|
||||||
void sendSASLSupported();
|
void sendSASLSupported();
|
||||||
|
|
||||||
|
boolean validateConnection(Connection connection, SASLResult saslResult);
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,7 @@ public class AmqpSupport {
|
||||||
// Lifetime policy symbols
|
// Lifetime policy symbols
|
||||||
public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
|
public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
|
||||||
|
|
||||||
|
public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
|
||||||
/**
|
/**
|
||||||
* Search for a given Symbol in a given array of Symbol object.
|
* Search for a given Symbol in a given array of Symbol object.
|
||||||
*
|
*
|
||||||
|
|
|
@ -188,6 +188,13 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean validateConnection(Connection connection) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void initInternal() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
// This listener will perform a bunch of things here
|
// This listener will perform a bunch of things here
|
||||||
class LocalListener extends DefaultEventHandler {
|
class LocalListener extends DefaultEventHandler {
|
||||||
|
|
||||||
|
@ -213,13 +220,25 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
|
||||||
@Override
|
@Override
|
||||||
public void onRemoteOpen(Connection connection) throws Exception {
|
public void onRemoteOpen(Connection connection) throws Exception {
|
||||||
synchronized (getLock()) {
|
synchronized (getLock()) {
|
||||||
connection.setContext(AbstractConnectionContext.this);
|
try {
|
||||||
connection.setContainer(containerId);
|
initInternal();
|
||||||
connection.setProperties(connectionProperties);
|
}
|
||||||
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
|
catch (Exception e) {
|
||||||
connection.open();
|
log.error("Error init connection", e);
|
||||||
|
}
|
||||||
|
if (!validateConnection(connection)) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
connection.setContext(AbstractConnectionContext.this);
|
||||||
|
connection.setContainer(containerId);
|
||||||
|
connection.setProperties(connectionProperties);
|
||||||
|
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
|
||||||
|
connection.open();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
initialise();
|
initialise();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This can be null which is in effect an empty map, also we really dont need to check this for in bound connections
|
* This can be null which is in effect an empty map, also we really dont need to check this for in bound connections
|
||||||
* but its here in case we add support for outbound connections.
|
* but its here in case we add support for outbound connections.
|
||||||
|
|
|
@ -39,7 +39,7 @@ public class ProtonInitializable {
|
||||||
|
|
||||||
public void initialise() throws Exception {
|
public void initialise() throws Exception {
|
||||||
if (!initialized) {
|
if (!initialized) {
|
||||||
initialized = false;
|
initialized = true;
|
||||||
try {
|
try {
|
||||||
if (afterInit != null) {
|
if (afterInit != null) {
|
||||||
afterInit.run();
|
afterInit.run();
|
||||||
|
|
|
@ -16,10 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.proton.plug.context.server;
|
package org.proton.plug.context.server;
|
||||||
|
|
||||||
import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
|
|
||||||
|
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.transaction.Coordinator;
|
import org.apache.qpid.proton.amqp.transaction.Coordinator;
|
||||||
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
import org.apache.qpid.proton.engine.Link;
|
import org.apache.qpid.proton.engine.Link;
|
||||||
import org.apache.qpid.proton.engine.Receiver;
|
import org.apache.qpid.proton.engine.Receiver;
|
||||||
import org.apache.qpid.proton.engine.Sender;
|
import org.apache.qpid.proton.engine.Sender;
|
||||||
|
@ -30,6 +29,7 @@ import org.proton.plug.AMQPSessionCallback;
|
||||||
import org.proton.plug.context.AbstractConnectionContext;
|
import org.proton.plug.context.AbstractConnectionContext;
|
||||||
import org.proton.plug.context.AbstractProtonSessionContext;
|
import org.proton.plug.context.AbstractProtonSessionContext;
|
||||||
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||||
|
import org.proton.plug.handler.ExtCapability;
|
||||||
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
@ -58,6 +58,16 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
|
||||||
return protonSession;
|
return protonSession;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean validateConnection(Connection connection) {
|
||||||
|
return connectionCallback.validateConnection(connection, handler.getSASLResult());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initInternal() throws Exception {
|
||||||
|
connectionCallback.init();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void remoteLinkOpened(Link link) throws Exception {
|
protected void remoteLinkOpened(Link link) throws Exception {
|
||||||
|
|
||||||
|
@ -84,6 +94,6 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext imp
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Symbol[] getConnectionCapabilitiesOffered() {
|
public Symbol[] getConnectionCapabilitiesOffered() {
|
||||||
return new Symbol[]{DELAYED_DELIVERY};
|
return ExtCapability.getCapabilities();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.proton.plug.handler;
|
||||||
|
|
||||||
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
|
|
||||||
|
import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
|
||||||
|
import static org.proton.plug.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
|
||||||
|
|
||||||
|
public class ExtCapability {
|
||||||
|
|
||||||
|
public static final Symbol[] capabilities = new Symbol[] {
|
||||||
|
SOLE_CONNECTION_CAPABILITY, DELAYED_DELIVERY
|
||||||
|
};
|
||||||
|
|
||||||
|
public static Symbol[] getCapabilities() {
|
||||||
|
return capabilities;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean needUniqueConnection(Connection connection) {
|
||||||
|
Symbol[] extCapabilities = connection.getRemoteDesiredCapabilities();
|
||||||
|
if (extCapabilities != null) {
|
||||||
|
for (Symbol sym : extCapabilities) {
|
||||||
|
if (sym.compareTo(SOLE_CONNECTION_CAPABILITY) == 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -386,5 +386,4 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.junit.Test;
|
||||||
import org.proton.plug.AMQPConnectionCallback;
|
import org.proton.plug.AMQPConnectionCallback;
|
||||||
import org.proton.plug.AMQPConnectionContext;
|
import org.proton.plug.AMQPConnectionContext;
|
||||||
import org.proton.plug.AMQPSessionCallback;
|
import org.proton.plug.AMQPSessionCallback;
|
||||||
|
import org.proton.plug.SASLResult;
|
||||||
import org.proton.plug.ServerSASL;
|
import org.proton.plug.ServerSASL;
|
||||||
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||||
import org.proton.plug.handler.EventHandler;
|
import org.proton.plug.handler.EventHandler;
|
||||||
|
@ -71,6 +72,10 @@ public class AbstractConnectionContextTest {
|
||||||
|
|
||||||
private class TestConnectionCallback implements AMQPConnectionCallback {
|
private class TestConnectionCallback implements AMQPConnectionCallback {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
|
||||||
|
@ -110,5 +115,10 @@ public class AbstractConnectionContextTest {
|
||||||
public void sendSASLSupported() {
|
public void sendSASLSupported() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean validateConnection(Connection connection, SASLResult saslResult) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,10 +21,12 @@ import java.util.concurrent.Executors;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.proton.plug.AMQPConnectionContext;
|
import org.proton.plug.AMQPConnectionContext;
|
||||||
import org.proton.plug.AMQPConnectionCallback;
|
import org.proton.plug.AMQPConnectionCallback;
|
||||||
import org.proton.plug.AMQPSessionCallback;
|
import org.proton.plug.AMQPSessionCallback;
|
||||||
|
import org.proton.plug.SASLResult;
|
||||||
import org.proton.plug.ServerSASL;
|
import org.proton.plug.ServerSASL;
|
||||||
import org.proton.plug.context.server.ProtonServerConnectionContext;
|
import org.proton.plug.context.server.ProtonServerConnectionContext;
|
||||||
import org.proton.plug.sasl.AnonymousServerSASL;
|
import org.proton.plug.sasl.AnonymousServerSASL;
|
||||||
|
@ -58,6 +60,10 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
mainExecutor.shutdown();
|
mainExecutor.shutdown();
|
||||||
|
@ -78,6 +84,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean validateConnection(Connection connection, SASLResult saslResult) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
|
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
|
||||||
if (log.isTraceEnabled()) {
|
if (log.isTraceEnabled()) {
|
||||||
|
@ -125,6 +136,10 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
|
||||||
|
|
||||||
class ReturnSPI implements AMQPConnectionCallback {
|
class ReturnSPI implements AMQPConnectionCallback {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
|
||||||
|
@ -145,6 +160,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean validateConnection(Connection connection, SASLResult saslResult) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
|
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
|
||||||
|
|
||||||
|
|
|
@ -22,10 +22,12 @@ import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.proton.plug.AMQPConnectionContext;
|
import org.proton.plug.AMQPConnectionContext;
|
||||||
import org.proton.plug.AMQPConnectionCallback;
|
import org.proton.plug.AMQPConnectionCallback;
|
||||||
import org.proton.plug.AMQPSessionCallback;
|
import org.proton.plug.AMQPSessionCallback;
|
||||||
|
import org.proton.plug.SASLResult;
|
||||||
import org.proton.plug.ServerSASL;
|
import org.proton.plug.ServerSASL;
|
||||||
import org.proton.plug.sasl.AnonymousServerSASL;
|
import org.proton.plug.sasl.AnonymousServerSASL;
|
||||||
import org.proton.plug.sasl.ServerSASLPlain;
|
import org.proton.plug.sasl.ServerSASLPlain;
|
||||||
|
@ -52,6 +54,10 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
|
||||||
|
@ -72,6 +78,11 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean validateConnection(Connection connection, SASLResult saslResult) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
final ReusableLatch latch = new ReusableLatch(0);
|
final ReusableLatch latch = new ReusableLatch(0);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -25,10 +25,12 @@ import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.proton.plug.AMQPConnectionContext;
|
import org.proton.plug.AMQPConnectionContext;
|
||||||
import org.proton.plug.AMQPConnectionCallback;
|
import org.proton.plug.AMQPConnectionCallback;
|
||||||
import org.proton.plug.AMQPSessionCallback;
|
import org.proton.plug.AMQPSessionCallback;
|
||||||
|
import org.proton.plug.SASLResult;
|
||||||
import org.proton.plug.ServerSASL;
|
import org.proton.plug.ServerSASL;
|
||||||
import org.proton.plug.sasl.AnonymousServerSASL;
|
import org.proton.plug.sasl.AnonymousServerSASL;
|
||||||
import org.proton.plug.sasl.ServerSASLPlain;
|
import org.proton.plug.sasl.ServerSASLPlain;
|
||||||
|
@ -48,6 +50,10 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
|
||||||
|
|
||||||
ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
|
@ -80,6 +86,11 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean validateConnection(Connection connection, SASLResult saslResult) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
|
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
|
||||||
final int bufferSize = bytes.writerIndex();
|
final int bufferSize = bytes.writerIndex();
|
||||||
|
|
|
@ -27,6 +27,7 @@ import javax.jms.ConnectionFactory;
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.ExceptionListener;
|
import javax.jms.ExceptionListener;
|
||||||
|
import javax.jms.InvalidClientIDException;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.MapMessage;
|
import javax.jms.MapMessage;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
|
@ -1533,6 +1534,35 @@ public class ProtonTest extends ProtonTestBase {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientID() throws Exception {
|
||||||
|
Connection testConn1 = createConnection(false);
|
||||||
|
Connection testConn2 = createConnection(false);
|
||||||
|
try {
|
||||||
|
testConn1.setClientID("client-id1");
|
||||||
|
try {
|
||||||
|
testConn1.setClientID("client-id2");
|
||||||
|
fail("didn't get expected exception");
|
||||||
|
}
|
||||||
|
catch (javax.jms.IllegalStateException e) {
|
||||||
|
//expected
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
testConn2.setClientID("client-id1");
|
||||||
|
fail("didn't get expected exception");
|
||||||
|
}
|
||||||
|
catch (InvalidClientIDException e) {
|
||||||
|
//expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
testConn1.close();
|
||||||
|
testConn2.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private javax.jms.Queue createQueue(String address) throws Exception {
|
private javax.jms.Queue createQueue(String address) throws Exception {
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
try {
|
try {
|
||||||
|
@ -1543,29 +1573,19 @@ public class ProtonTest extends ProtonTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private javax.jms.Connection createConnection() throws JMSException {
|
private Connection createConnection() throws JMSException {
|
||||||
|
return this.createConnection(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private javax.jms.Connection createConnection(boolean isStart) throws JMSException {
|
||||||
Connection connection;
|
Connection connection;
|
||||||
if (protocol == 3) {
|
if (protocol == 3) {
|
||||||
factory = new JmsConnectionFactory(amqpConnectionUri);
|
factory = new JmsConnectionFactory(amqpConnectionUri);
|
||||||
connection = factory.createConnection();
|
connection = factory.createConnection();
|
||||||
connection.setExceptionListener(new ExceptionListener() {
|
|
||||||
@Override
|
|
||||||
public void onException(JMSException exception) {
|
|
||||||
exception.printStackTrace();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
connection.start();
|
|
||||||
}
|
}
|
||||||
else if (protocol == 0) {
|
else if (protocol == 0) {
|
||||||
factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
|
factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
|
||||||
connection = factory.createConnection();
|
connection = factory.createConnection();
|
||||||
connection.setExceptionListener(new ExceptionListener() {
|
|
||||||
@Override
|
|
||||||
public void onException(JMSException exception) {
|
|
||||||
exception.printStackTrace();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
connection.start();
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
TransportConfiguration transport;
|
TransportConfiguration transport;
|
||||||
|
@ -1579,6 +1599,8 @@ public class ProtonTest extends ProtonTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
connection = factory.createConnection(userName, password);
|
connection = factory.createConnection(userName, password);
|
||||||
|
}
|
||||||
|
if (isStart) {
|
||||||
connection.setExceptionListener(new ExceptionListener() {
|
connection.setExceptionListener(new ExceptionListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onException(JMSException exception) {
|
public void onException(JMSException exception) {
|
||||||
|
|
Loading…
Reference in New Issue