ARTEMIS-4689 import command should accept URL

This commit is contained in:
Justin Bertram 2024-03-14 12:24:03 -05:00 committed by clebertsuconic
parent 308aed3060
commit 71d3393224
6 changed files with 333 additions and 192 deletions

View File

@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.Shell;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.InputAbstract;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import picocli.CommandLine.Option;
public class BasicConnectionAbstract extends InputAbstract {
@Option(names = "--url", description = "Connection URL. Default: build URL from the 'artemis' acceptor defined in the broker.xml or tcp://localhost:61616 if the acceptor cannot be parsed.")
protected String brokerURL = DEFAULT_BROKER_URL;
@Option(names = "--acceptor", description = "Name used to find the default connection URL on the acceptor list. If an acceptor with that name cannot be found the CLI will look for a connector with the same name.")
protected String acceptor;
@Option(names = "--user", description = "User used to connect.")
protected String user;
@Option(names = "--password", description = "Password used to connect.")
protected String password;
protected static ThreadLocal<ConnectionInformation> CONNECTION_INFORMATION = new ThreadLocal<>();
static class ConnectionInformation {
String uri, user, password;
private ConnectionInformation(String uri, String user, String password) {
this.uri = uri;
this.user = user;
this.password = password;
}
}
public String getBrokerURL() {
return brokerURL;
}
public void setBrokerURL(String brokerURL) {
this.brokerURL = brokerURL;
}
public String getAcceptor() {
return acceptor;
}
public BasicConnectionAbstract setAcceptor(String acceptor) {
this.acceptor = acceptor;
return this;
}
public String getUser() {
return user;
}
public BasicConnectionAbstract setUser(String user) {
this.user = user;
return this;
}
public String getPassword() {
return password;
}
public BasicConnectionAbstract setPassword(String password) {
this.password = password;
return this;
}
@SuppressWarnings("StringEquality")
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
recoverConnectionInformation();
// it is intentional to make a comparison on the String object here
// this is to test if the original option was switched or not.
// we don't care about being .equals at all.
// as a matter of fact if you pass brokerURL in a way it's equals to DEFAULT_BROKER_URL,
// we should not the broker URL Instance
// and still honor the one passed by parameter.
// SupressWarnings was added to this method to supress the false positive here from error-prone.
if (brokerURL == DEFAULT_BROKER_URL) {
String brokerURLInstance = getBrokerURLInstance(acceptor);
if (brokerURLInstance != null) {
brokerURL = brokerURLInstance;
}
}
context.out.println("Connection brokerURL = " + brokerURL);
return null;
}
protected ConnectionFactory createConnectionFactory() throws Exception {
recoverConnectionInformation();
return createConnectionFactory(brokerURL, user, password);
}
protected ConnectionFactory createConnectionFactory(String brokerURL,
String user,
String password) throws Exception {
recoverConnectionInformation();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL, user, password);
try {
tryConnect(brokerURL, user, password, cf);
return cf;
} catch (JMSSecurityException e) {
// if a security exception will get the user and password through an input
if (getActionContext() != null) {
getActionContext().err.println("Connection failed::" + e.getMessage());
}
user = inputUser(user);
password = inputPassword(password);
cf = new ActiveMQConnectionFactory(brokerURL, user, password);
try {
tryConnect(brokerURL, user, password, cf);
} catch (Exception e2) {
}
return cf;
} catch (JMSException e) {
// if a connection exception will ask for the URL, user and password
if (getActionContext() != null) {
getActionContext().err.println("Connection failed::" + e.getMessage());
}
brokerURL = inputBrokerURL(brokerURL);
user = inputUser(user);
password = inputPassword(password);
cf = new ActiveMQConnectionFactory(brokerURL, user, password);
try {
tryConnect(brokerURL, user, password, cf);
} catch (Exception e2) {
}
return cf;
}
}
protected void recoverConnectionInformation() {
if (CONNECTION_INFORMATION.get() != null) {
ConnectionInformation connectionInfo = CONNECTION_INFORMATION.get();
if (this.user == null) {
this.user = connectionInfo.user;
}
if (this.password == null) {
this.password = connectionInfo.password;
}
if (this.brokerURL == null || this.brokerURL == DEFAULT_BROKER_URL) {
this.brokerURL = connectionInfo.uri;
}
}
}
protected void saveConnectionInfo(String brokerURL, String user, String password) {
if (Shell.inShell() && CONNECTION_INFORMATION.get() == null) {
CONNECTION_INFORMATION.set(new ConnectionInformation(brokerURL, user, password));
getActionContext().out.println("CLI connected to broker " + brokerURL + ", user:" + user);
this.brokerURL = brokerURL;
this.user = user;
this.password = password;
}
}
protected void tryConnect(String brokerURL,
String user,
String password,
ConnectionFactory cf) throws JMSException {
Connection connection = cf.createConnection();
connection.close();
saveConnectionInfo(brokerURL, user, password);
}
protected String inputBrokerURL(String defaultValue) {
return input("--url", "Type in the connection URL for a retry (e.g. tcp://localhost:61616)", defaultValue);
}
protected String inputUser(String user) {
if (user == null) {
this.user = input("--user", "Type the username for a retry", null);
return this.user;
}
return user;
}
protected String inputPassword(String password) {
if (password == null) {
this.password = inputPassword("--password", "Type the password for a retry", null);
return this.password;
}
return password;
}
protected void performCoreManagement(ManagementHelper.MessageAcceptor setup, ManagementHelper.MessageAcceptor ok, ManagementHelper.MessageAcceptor failed) throws Exception {
try (ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory()) {
ManagementHelper.doManagement(factory.getServerLocator(), user, password, setup, ok, failed);
}
}
protected void performCoreManagement(String uri, String user, String password, ManagementHelper.MessageAcceptor setup, ManagementHelper.MessageAcceptor ok, ManagementHelper.MessageAcceptor failed) throws Exception {
try (ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory(uri, user, password)) {
ManagementHelper.doManagement(factory.getServerLocator(), user, password, setup, ok, failed);
}
}
}

View File

@ -16,85 +16,21 @@
*/
package org.apache.activemq.artemis.cli.commands.messages;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.Shell;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.InputAbstract;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
import picocli.CommandLine.Option;
public class ConnectionAbstract extends InputAbstract {
@Option(names = "--url", description = "Connection URL. Default: build URL from the 'artemis' acceptor defined in the broker.xml or tcp://localhost:61616 if the acceptor cannot be parsed.")
protected String brokerURL = DEFAULT_BROKER_URL;
@Option(names = "--acceptor", description = "Name used to find the default connection URL on the acceptor list. If an acceptor with that name cannot be found the CLI will look for a connector with the same name.")
protected String acceptor;
@Option(names = "--user", description = "User used to connect.")
protected String user;
@Option(names = "--password", description = "Password used to connect.")
protected String password;
public class ConnectionAbstract extends BasicConnectionAbstract {
@Option(names = "--clientID", description = "ClientID set on the connection.")
protected String clientID;
@Option(names = "--protocol", description = "Protocol used. Valid values are ${COMPLETION-CANDIDATES}", converter = ConnectionProtocol.ProtocolConverter.class)
protected ConnectionProtocol protocol = ConnectionProtocol.CORE;
protected static ThreadLocal<ConnectionInformation> CONNECTION_INFORMATION = new ThreadLocal<>();
static class ConnectionInformation {
String uri, user, password;
private ConnectionInformation(String uri, String user, String password) {
this.uri = uri;
this.user = user;
this.password = password;
}
}
public String getBrokerURL() {
return brokerURL;
}
public void setBrokerURL(String brokerURL) {
this.brokerURL = brokerURL;
}
public String getAcceptor() {
return acceptor;
}
public ConnectionAbstract setAcceptor(String acceptor) {
this.acceptor = acceptor;
return this;
}
public String getUser() {
return user;
}
public ConnectionAbstract setUser(String user) {
this.user = user;
return this;
}
public String getPassword() {
return password;
}
public ConnectionAbstract setPassword(String password) {
this.password = password;
return this;
}
public String getClientID() {
return clientID;
}
@ -116,33 +52,7 @@ public class ConnectionAbstract extends InputAbstract {
this.protocol = ConnectionProtocol.fromString(protocol);
}
@SuppressWarnings("StringEquality")
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
recoverConnectionInformation();
// it is intentional to make a comparison on the String object here
// this is to test if the original option was switched or not.
// we don't care about being .equals at all.
// as a matter of fact if you pass brokerURL in a way it's equals to DEFAULT_BROKER_URL,
// we should not the broker URL Instance
// and still honor the one passed by parameter.
// SupressWarnings was added to this method to supress the false positive here from error-prone.
if (brokerURL == DEFAULT_BROKER_URL) {
String brokerURLInstance = getBrokerURLInstance(acceptor);
if (brokerURLInstance != null) {
brokerURL = brokerURLInstance;
}
}
context.out.println("Connection brokerURL = " + brokerURL);
return null;
}
protected ConnectionFactory createConnectionFactory() throws Exception {
recoverConnectionInformation();
return createConnectionFactory(brokerURL, user, password, clientID, protocol);
@ -215,32 +125,6 @@ public class ConnectionAbstract extends InputAbstract {
return createCoreConnectionFactory(brokerURL, user, password, clientID);
}
protected void recoverConnectionInformation() {
if (CONNECTION_INFORMATION.get() != null) {
ConnectionInformation connectionInfo = CONNECTION_INFORMATION.get();
if (this.user == null) {
this.user = connectionInfo.user;
}
if (this.password == null) {
this.password = connectionInfo.password;
}
if (this.brokerURL == null || this.brokerURL == DEFAULT_BROKER_URL) {
this.brokerURL = connectionInfo.uri;
}
}
}
void saveConnectionInfo(String brokerURL, String user, String password) {
if (Shell.inShell() && CONNECTION_INFORMATION.get() == null) {
CONNECTION_INFORMATION.set(new ConnectionInformation(brokerURL, user, password));
getActionContext().out.println("CLI connected to broker " + brokerURL + ", user:" + user);
this.brokerURL = brokerURL;
this.user = user;
this.password = password;
}
}
protected ActiveMQConnectionFactory createCoreConnectionFactory(String brokerURL,
String user,
String password,
@ -293,45 +177,4 @@ public class ConnectionAbstract extends InputAbstract {
return cf;
}
}
private void tryConnect(String brokerURL,
String user,
String password,
ConnectionFactory cf) throws JMSException {
Connection connection = cf.createConnection();
connection.close();
saveConnectionInfo(brokerURL, user, password);
}
private String inputBrokerURL(String defaultValue) {
return input("--url", "Type in the connection URL for a retry (e.g. tcp://localhost:61616)", defaultValue);
}
private String inputUser(String user) {
if (user == null) {
this.user = input("--user", "Type the username for a retry", null);
return this.user;
}
return user;
}
private String inputPassword(String password) {
if (password == null) {
this.password = inputPassword("--password", "Type the password for a retry", null);
return this.password;
}
return password;
}
protected void performCoreManagement(ManagementHelper.MessageAcceptor setup, ManagementHelper.MessageAcceptor ok, ManagementHelper.MessageAcceptor failed) throws Exception {
try (ActiveMQConnectionFactory factory = createCoreConnectionFactory()) {
ManagementHelper.doManagement(factory.getServerLocator(), user, password, setup, ok, failed);
}
}
protected void performCoreManagement(String uri, String user, String password, ManagementHelper.MessageAcceptor setup, ManagementHelper.MessageAcceptor ok, ManagementHelper.MessageAcceptor failed) throws Exception {
try (ActiveMQConnectionFactory factory = createCoreConnectionFactory(uri, user, password, null)) {
ManagementHelper.doManagement(factory.getServerLocator(), user, password, setup, ok, failed);
}
}
}

View File

@ -32,6 +32,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeSet;
import org.apache.activemq.artemis.api.core.Message;
@ -48,8 +49,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.cli.commands.ActionAbstract;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.messages.ConnectionAbstract;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
@ -68,7 +69,7 @@ import picocli.CommandLine.Option;
* for speed and simplicity.
*/
@Command(name = "imp", description = "Import all message-data using an XML that could be interpreted by any system.")
public final class XmlDataImporter extends ActionAbstract {
public final class XmlDataImporter extends ConnectionAbstract {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -90,10 +91,12 @@ public final class XmlDataImporter extends ActionAbstract {
private ClientSession session;
private ClientProducer producer;
@Option(names = "--host", description = "The host used to import the data. Default: localhost.")
public String host = "localhost";
@Deprecated(forRemoval = true)
@Option(names = "--host", description = "The host used to import the data. Default: null.", hidden = true)
public String host = null;
@Option(names = "--port", description = "The port used to import the data. Default: 61616.")
@Deprecated(forRemoval = true)
@Option(names = "--port", description = "The port used to import the data. Default: 61616.", hidden = true)
public int port = 61616;
@Option(names = "--transaction", description = "Import every message using a single transction. If anything goes wrong during the process the entire import will be aborted. Default: false.", hidden = true)
@ -102,12 +105,6 @@ public final class XmlDataImporter extends ActionAbstract {
@Option(names = "--commit-interval", description = "How often to commit.", hidden = true)
public int commitInterval = 1000;
@Option(names = "--user", description = "User name used to import the data. Default: null.")
public String user = null;
@Option(names = "--password", description = "User name used to import the data. Default: null.")
public String password = null;
@Option(names = "--input", description = "The input file name. Default: exp.dmp.", required = true)
public String input = "exp.dmp";
@ -119,22 +116,6 @@ public final class XmlDataImporter extends ActionAbstract {
TreeSet<XMLMessageImporter.MessageInfo> messages;
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
@Override
public Object execute(ActionContext context) throws Exception {
process(input, host, port);
@ -174,6 +155,7 @@ public final class XmlDataImporter extends ActionAbstract {
public void process(InputStream inputStream,
ClientSession session,
ClientSession managementSession) throws Exception {
Objects.requireNonNull(inputStream);
reader = XmlProvider.createXMLStreamReader(inputStream);
messageReader = new XMLMessageImporter(reader, session);
messageReader.setOldPrefixTranslation(oldPrefixTranslation);
@ -190,10 +172,16 @@ public final class XmlDataImporter extends ActionAbstract {
}
public void process(InputStream inputStream, String host, int port) throws Exception {
HashMap<String, Object> connectionParams = new HashMap<>();
connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
connectionParams.put(TransportConstants.PORT_PROP_NAME, Integer.toString(port));
ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
ServerLocator serverLocator;
if (host != null) {
Map<String, Object> connectionParams = Map.of(
TransportConstants.HOST_PROP_NAME, host,
TransportConstants.PORT_PROP_NAME, Integer.toString(port)
);
serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
} else {
serverLocator = ActiveMQClient.createServerLocator(brokerURL);
}
ClientSessionFactory sf = serverLocator.createSessionFactory();
ClientSession session = null;

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.tools.xml;
import java.io.InputStream;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.ManagementContext;
import org.apache.activemq.cli.test.CliTestBase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class XmlDataImporterTest extends CliTestBase {
ActiveMQServer server;
@Before
@Override
public void setup() throws Exception {
super.setup();
server = ((Pair<ManagementContext, ActiveMQServer>)startServer()).getB();
}
@After
@Override
public void tearDown() throws Exception {
stopServer();
super.tearDown();
}
@Test
public void testBrokerUrl() throws Exception {
XmlDataImporter importer = new XmlDataImporter();
importer.setBrokerURL("tcp://localhost:61616");
importer.setUser("admin");
importer.setPassword("admin");
assertEquals(0L, server.getActiveMQServerControl().getTotalConnectionCount());
try {
importer.process((InputStream) null, null, 0);
} catch (NullPointerException npe) {
/*
* A NullPointerException is expected here since InputStream is null, but the connection should still be made
* if the broker URL is input & used successfully.
*/
}
assertEquals(1L, server.getActiveMQServerControl().getTotalConnectionCount());
}
@Test
public void testHostAndPort() throws Exception {
XmlDataImporter importer = new XmlDataImporter();
importer.setUser("admin");
importer.setPassword("admin");
assertEquals(0L, server.getActiveMQServerControl().getTotalConnectionCount());
try {
importer.process((InputStream) null, "localhost", 61616);
} catch (NullPointerException npe) {
/*
* A NullPointerException is expected here since InputStream is null, but the connection should still be made
* if the host & port are input & used successfully.
*/
}
assertEquals(1L, server.getActiveMQServerControl().getTotalConnectionCount());
}
}

View File

@ -48,12 +48,12 @@ public class CliPerfClientTest extends CliTestBase {
@Test
public void testNonDurableStarts() throws Exception {
new PerfClientCommand().setDurableSubscription(false).setMessageCount(1).setUser("admin").setPassword("admin").setClientID("perfClientTest").execute(new TestActionContext());
new PerfClientCommand().setDurableSubscription(false).setMessageCount(1).setClientID("perfClientTest").setUser("admin").setPassword("admin").execute(new TestActionContext());
}
@Test
public void testDurableStarts() throws Exception {
new PerfClientCommand().setDurableSubscription(true).setMessageCount(1).setUser("admin").setPassword("admin").setClientID("perfClientTest").execute(new TestActionContext());
new PerfClientCommand().setDurableSubscription(true).setMessageCount(1).setClientID("perfClientTest").setUser("admin").setPassword("admin").execute(new TestActionContext());
}
@Test

View File

@ -157,9 +157,9 @@ public class MessageSerializerTest extends CliTestBase {
.setDurable(durable)
.setDestination(address)
.setMessageCount(messageCount)
.setClientID(clientId)
.setUser("admin")
.setPassword("admin")
.setClientID(clientId)
.execute(new TestActionContext());
}