Adding JORAM Tests for AMQP with a few fixes around the protocol manager for JMS

This commit is contained in:
Clebert Suconic 2016-05-24 18:09:16 -04:00
parent b27033d7c2
commit 10dfe97ec0
36 changed files with 1322 additions and 439 deletions

View File

@ -23,6 +23,8 @@ import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashSet;
import org.jboss.logging.Logger;
import static java.nio.file.attribute.PosixFilePermission.GROUP_EXECUTE;
import static java.nio.file.attribute.PosixFilePermission.GROUP_READ;
import static java.nio.file.attribute.PosixFilePermission.GROUP_WRITE;
@ -34,6 +36,8 @@ import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
public class FileUtil {
private static final Logger logger = Logger.getLogger(FileUtil.class);
public static void makeExec(File file) throws IOException {
try {
Files.setPosixFilePermissions(file.toPath(), new HashSet<>(Arrays.asList(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE, GROUP_READ, GROUP_WRITE, GROUP_EXECUTE, OTHERS_READ, OTHERS_EXECUTE)));
@ -43,4 +47,32 @@ public class FileUtil {
}
}
public static final boolean deleteDirectory(final File directory) {
if (directory.isDirectory()) {
String[] files = directory.list();
int num = 5;
int attempts = 0;
while (files == null && (attempts < num)) {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
}
files = directory.list();
attempts++;
}
for (String file : files) {
File f = new File(directory, file);
if (!deleteDirectory(f)) {
logger.warn("Failed to clean up file: " + f.getAbsolutePath());
}
}
}
return directory.delete();
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.client;
package org.apache.activemq.artemis.utils;
import java.util.ArrayList;
import java.util.List;

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.utils.SelectorTranslator;
/**
* ActiveMQ Artemis implementation of a JMS QueueBrowser.

View File

@ -59,6 +59,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSession.AddressQuery;
import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery;
import org.apache.activemq.artemis.utils.SelectorTranslator;
/**
* ActiveMQ Artemis implementation of a JMS Session.

View File

@ -42,10 +42,10 @@ import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.SelectorTranslator;
import org.apache.activemq.artemis.jms.management.impl.openmbean.JMSOpenTypeSupport;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.json.JSONArray;
import org.apache.activemq.artemis.utils.json.JSONObject;

View File

@ -35,8 +35,8 @@ import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.SelectorTranslator;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.json.JSONArray;
import org.apache.activemq.artemis.utils.json.JSONObject;

View File

@ -61,7 +61,6 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import org.apache.activemq.artemis.jms.client.SelectorTranslator;
import org.apache.activemq.artemis.jms.persistence.JMSStorageManager;
import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings;
import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory;
@ -82,6 +81,7 @@ import org.apache.activemq.artemis.jms.server.management.JMSNotificationType;
import org.apache.activemq.artemis.jms.server.management.impl.JMSManagementServiceImpl;
import org.apache.activemq.artemis.jms.transaction.JMSTransactionDetail;
import org.apache.activemq.artemis.spi.core.naming.BindingRegistry;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.TimeAndCounterIDGenerator;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.json.JSONArray;

View File

@ -88,7 +88,13 @@ public class ActiveMQJMSVendor implements JMSVendor {
@Override
public void setJMSXGroupID(Message message, String s) {
try {
message.setStringProperty("_AMQ_GROUP_ID", s);
}
catch (Exception e) {
e.printStackTrace();
}
}
@Override
@ -98,7 +104,6 @@ public class ActiveMQJMSVendor implements JMSVendor {
@Override
public void setJMSXDeliveryCount(Message message, long l) {
}
public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) {

View File

@ -217,7 +217,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
String value = msg.getStringProperty(key);
props.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8)));
}
else if (key.startsWith("JMSXGroupID")) {
else if (key.startsWith("JMSXGroupID") || key.startsWith("_AMQ_GROUP_ID")) {
String value = msg.getStringProperty(key);
props.setGroupId(value);
if (apMap == null) {

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@ -158,11 +159,13 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
@Override
public Object createSender(ProtonPlugSender protonSender,
String queue,
String filer,
String filter,
boolean browserOnly) throws Exception {
long consumerID = consumerIDGenerator.generateID();
ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filer), browserOnly);
filter = SelectorTranslator.convertToActiveMQFilterString(filter);
ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly);
// AMQP handles its own flow control for when it's started
consumer.setStarted(true);
@ -274,12 +277,12 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
public void rollbackCurrentTX() throws Exception {
public void rollbackCurrentTX(boolean lastMessageDelivered) throws Exception {
//need to check here as this can be called if init fails
if (serverSession != null) {
recoverContext();
try {
serverSession.rollback(false);
serverSession.rollback(lastMessageDelivered);
}
finally {
resetContext();

View File

@ -59,7 +59,7 @@ public interface AMQPSessionCallback {
void commitCurrentTX() throws Exception;
void rollbackCurrentTX() throws Exception;
void rollbackCurrentTX(boolean lastMessageReceived) throws Exception;
void close() throws Exception;

View File

@ -140,7 +140,7 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
senders.clear();
try {
if (sessionSPI != null) {
sessionSPI.rollbackCurrentTX();
sessionSPI.rollbackCurrentTX(false);
sessionSPI.close();
}
}

View File

@ -78,7 +78,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
Discharge discharge = (Discharge) action;
if (discharge.getFail()) {
try {
sessionSPI.rollbackCurrentTX();
sessionSPI.rollbackCurrentTX(true);
}
catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());

View File

@ -120,7 +120,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
}
@Override
public void rollbackCurrentTX() {
public void rollbackCurrentTX(boolean lastMessage) {
}
@Override

View File

@ -36,7 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.rest.ActiveMQRestLogger;
import org.apache.activemq.artemis.rest.util.Constants;
import org.apache.activemq.artemis.rest.util.LinkStrategy;
import org.apache.activemq.artemis.jms.client.SelectorTranslator;
import org.apache.activemq.artemis.utils.SelectorTranslator;
/**
* Auto-acknowleged consumer

View File

@ -20,9 +20,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.jms.client.SelectorTranslator;
import org.apache.activemq.artemis.rest.ActiveMQRestLogger;
import org.apache.activemq.artemis.rest.queue.push.xml.PushRegistration;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import java.util.ArrayList;
import java.util.List;

View File

@ -129,6 +129,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -2045,29 +2046,7 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected static final boolean deleteDirectory(final File directory) {
if (directory.isDirectory()) {
String[] files = directory.list();
int num = 5;
int attempts = 0;
while (files == null && (attempts < num)) {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
}
files = directory.list();
attempts++;
}
for (String file : files) {
File f = new File(directory, file);
if (!deleteDirectory(f)) {
log.warn("Failed to clean up file: " + f.getAbsolutePath());
}
}
}
return directory.delete();
return FileUtil.deleteDirectory(directory);
}
protected static final void copyRecursive(final File from, final File to) throws Exception {

View File

@ -44,6 +44,12 @@
<artifactId>artemis-ra</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-amqp-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-cli</artifactId>
@ -86,6 +92,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>${qpid.jms.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.amqpJMS;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Hashtable;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import org.apache.activemq.artemis.common.AbstractAdmin;
import org.apache.activemq.artemis.common.testjndi.TestContextFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsTopic;
/**
*
*/
public class ActiveMQAMQPAdmin extends AbstractAdmin {
Context context;
{
// enableJMSFrameTracing();
try {
// Use the jetty JNDI context since it's mutable.
Hashtable<String, String> env = new Hashtable<>();
env.put("java.naming.factory.initial", TestContextFactory.class.getName());
env.put("java.naming.provider.url", "tcp://localhost:61616");
context = new InitialContext(env);
}
catch (NamingException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("resource")
public static void enableJMSFrameTracing() {
try {
String outputStreamName = "amqp-trace.txt";
final PrintStream out = new PrintStream(new FileOutputStream(new File(outputStreamName)));
Handler handler = new Handler() {
@Override
public void publish(LogRecord r) {
out.println(String.format("%s:%s", r.getLoggerName(), r.getMessage()));
}
@Override
public void flush() {
out.flush();
}
@Override
public void close() throws SecurityException {
out.close();
}
};
Logger log = Logger.getLogger("FRM");
log.addHandler(handler);
log.setLevel(Level.FINEST);
}
catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
@Override
public String getName() {
return getClass().getName();
}
@Override
public Context createContext() throws NamingException {
return context;
}
@Override
public void createQueue(String name) {
super.createQueue(name);
try {
context.bind(name, new JmsQueue("jms.queue." + name));
}
catch (NamingException e) {
throw new RuntimeException(e);
}
}
@Override
public void createTopic(String name) {
super.createTopic(name);
try {
context.bind(name, new JmsTopic("jms.topic." + name));
}
catch (NamingException e) {
throw new RuntimeException(e);
}
}
@Override
public void deleteQueue(String name) {
super.deleteQueue(name);
try {
context.unbind(name);
}
catch (NamingException e) {
throw new RuntimeException(e);
}
}
@Override
public void deleteTopic(String name) {
super.deleteTopic(name);
try {
context.unbind(name);
}
catch (NamingException e) {
throw new RuntimeException(e);
}
}
@Override
public void createConnectionFactory(String name) {
try {
final JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
context.bind(name, factory);
}
catch (NamingException e) {
throw new RuntimeException(e);
}
}
@Override
public void deleteConnectionFactory(String name) {
try {
context.unbind(name);
}
catch (NamingException e) {
throw new RuntimeException(e);
}
}
@Override
public void createQueueConnectionFactory(String name) {
createConnectionFactory(name);
}
@Override
public void createTopicConnectionFactory(String name) {
createConnectionFactory(name);
}
@Override
public void deleteQueueConnectionFactory(String name) {
deleteConnectionFactory(name);
}
@Override
public void deleteTopicConnectionFactory(String name) {
deleteConnectionFactory(name);
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.amqpJMS;
import java.io.IOException;
import java.util.Properties;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import org.objectweb.jtests.jms.admin.Admin;
import org.objectweb.jtests.jms.admin.AdminFactory;
import org.objectweb.jtests.jms.conform.connection.ConnectionTest;
import org.objectweb.jtests.jms.conform.connection.TopicConnectionTest;
import org.objectweb.jtests.jms.conform.message.MessageBodyTest;
import org.objectweb.jtests.jms.conform.message.MessageDefaultTest;
import org.objectweb.jtests.jms.conform.message.MessageTypeTest;
import org.objectweb.jtests.jms.conform.message.headers.MessageHeaderTest;
import org.objectweb.jtests.jms.conform.message.properties.JMSXPropertyTest;
import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyConversionTest;
import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyTest;
import org.objectweb.jtests.jms.conform.queue.QueueBrowserTest;
import org.objectweb.jtests.jms.conform.queue.TemporaryQueueTest;
import org.objectweb.jtests.jms.conform.selector.SelectorSyntaxTest;
import org.objectweb.jtests.jms.conform.selector.SelectorTest;
import org.objectweb.jtests.jms.conform.session.QueueSessionTest;
import org.objectweb.jtests.jms.conform.session.SessionTest;
import org.objectweb.jtests.jms.conform.session.TopicSessionTest;
import org.objectweb.jtests.jms.conform.session.UnifiedSessionTest;
import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest;
import org.objectweb.jtests.jms.framework.JMSTestCase;
@RunWith(Suite.class)
@SuiteClasses({TopicConnectionTest.class, ConnectionTest.class, MessageBodyTest.class, MessageDefaultTest.class,
MessageTypeTest.class, MessageHeaderTest.class, JMSXPropertyTest.class, MessagePropertyConversionTest.class, MessagePropertyTest.class,
QueueBrowserTest.class, TemporaryQueueTest.class, SelectorSyntaxTest.class, SelectorTest.class, QueueSessionTest.class, SessionTest.class,
TopicSessionTest.class, UnifiedSessionTest.class, TemporaryTopicTest.class,})
public class JoramAMQPAggregationTest extends Assert {
/**
* Should be overridden
*
* @return
*/
protected static Properties getProviderProperties() throws IOException {
Properties props = new Properties();
props.load(ClassLoader.getSystemResourceAsStream(JMSTestCase.getPropFileName()));
return props;
}
static Admin admin;
@BeforeClass
public static void setUpServer() throws Exception {
JMSTestCase.setPropFileName("amqp_provider.properties");
JMSTestCase.startServer = false;
// Admin step
// gets the provider administration wrapper...
Properties props = getProviderProperties();
admin = AdminFactory.getAdmin(props);
admin.startServer();
}
@AfterClass
public static void tearDownServer() throws Exception {
admin.stopServer();
JMSTestCase.startServer = true;
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms;
package org.apache.activemq.artemis.common;
import javax.naming.Context;
import javax.naming.InitialContext;
@ -22,15 +22,14 @@ import javax.naming.NamingException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.Hashtable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
@ -39,40 +38,41 @@ import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
import org.junit.Assert;
import org.objectweb.jtests.jms.admin.Admin;
public class ActiveMQAdmin implements Admin {
/**
* AbstractAdmin.
*/
public class AbstractAdmin implements Admin {
private ClientSession clientSession;
protected ClientSession clientSession;
private ClientRequestor requestor;
protected ClientRequestor requestor;
private Context context;
protected boolean serverLifeCycleActive;
private Process serverProcess;
protected Process serverProcess;
private ClientSessionFactory sf;
protected ServerLocator serverLocator;
protected ClientSessionFactory sf;
// this is a constant to control if we should use a separate VM for the server.
public static final boolean spawnServer = false;
ServerLocator serverLocator;
/**
* Determines whether to act or 'no-op' on {@link ActiveMQAdmin#serverStart()} and
* {@link ActiveMQAdmin#serverStop()}. This is used when testing combinations of client and
* Determines whether to act or 'no-op' on serverStart() and
* serverStop(). This is used when testing combinations of client and
* servers with different versions.
*
* @see https://github.com/activemq/activemq-version-tests
*/
private final boolean serverLifeCycleActive;
private static final String SERVER_LIVE_CYCLE_PROPERTY = "org.apache.activemq.artemis.jms.ActiveMQAdmin.serverLifeCycle";
private static final String SERVER_LIVE_CYCLE_PROPERTY = "org.apache.activemq.artemis.jms.ActiveMQAMQPAdmin.serverLifeCycle";
public ActiveMQAdmin() {
public AbstractAdmin() {
serverLifeCycleActive = Boolean.valueOf(System.getProperty(SERVER_LIVE_CYCLE_PROPERTY, "true"));
try {
Hashtable<String, String> env = new Hashtable<>();
env.put("java.naming.factory.initial", "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
env.put("java.naming.provider.url", "tcp://localhost:61616");
context = new InitialContext(env);
}
catch (NamingException e) {
e.printStackTrace();
}
}
@Override
public String getName() {
return getClass().getName();
}
@Override
@ -102,23 +102,18 @@ public class ActiveMQAdmin implements Admin {
}
@Override
public void createConnectionFactory(final String name) {
createConnection(name, 0);
}
private void createConnection(final String name, final int cfType) {
try {
invokeSyncOperation(ResourceNames.JMS_SERVER, "createConnectionFactory", name, false, false, cfType, "netty", name);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
public Context createContext() throws NamingException {
return new InitialContext();
}
@Override
public Context createContext() throws NamingException {
return context;
public void createConnectionFactory(final String name) {
throw new RuntimeException("FIXME NYI createConnectionFactory");
}
@Override
public void deleteConnectionFactory(final String name) {
throw new RuntimeException("FIXME NYI deleteConnectionFactory");
}
@Override
@ -133,38 +128,6 @@ public class ActiveMQAdmin implements Admin {
}
}
@Override
public void createQueueConnectionFactory(final String name) {
createConnection(name, 1);
}
@Override
public void createTopic(final String name) {
Boolean result;
try {
result = (Boolean) invokeSyncOperation(ResourceNames.JMS_SERVER, "createTopic", name, name);
Assert.assertEquals(true, result.booleanValue());
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public void createTopicConnectionFactory(final String name) {
createConnection(name, 2);
}
@Override
public void deleteConnectionFactory(final String name) {
try {
invokeSyncOperation(ResourceNames.JMS_SERVER, "destroyConnectionFactory", name);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public void deleteQueue(final String name) {
Boolean result;
@ -177,11 +140,28 @@ public class ActiveMQAdmin implements Admin {
}
}
@Override
public void createQueueConnectionFactory(final String name) {
createConnectionFactory(name);
}
@Override
public void deleteQueueConnectionFactory(final String name) {
deleteConnectionFactory(name);
}
@Override
public void createTopic(final String name) {
Boolean result;
try {
result = (Boolean) invokeSyncOperation(ResourceNames.JMS_SERVER, "createTopic", name, name);
Assert.assertEquals(true, result.booleanValue());
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public void deleteTopic(final String name) {
Boolean result;
@ -195,13 +175,13 @@ public class ActiveMQAdmin implements Admin {
}
@Override
public void deleteTopicConnectionFactory(final String name) {
deleteConnectionFactory(name);
public void createTopicConnectionFactory(final String name) {
createConnectionFactory(name);
}
@Override
public String getName() {
return this.getClass().getName();
public void deleteTopicConnectionFactory(final String name) {
deleteConnectionFactory(name);
}
@Override
@ -210,37 +190,42 @@ public class ActiveMQAdmin implements Admin {
return;
}
String[] vmArgs = new String[]{};
serverProcess = SpawnedVMSupport.spawnVM(SpawnedJMSServer.class.getName(), vmArgs, false);
InputStreamReader isr = new InputStreamReader(serverProcess.getInputStream());
if (spawnServer) {
String[] vmArgs = new String[]{};
serverProcess = SpawnedVMSupport.spawnVM(SpawnedJMSServer.class.getName(), vmArgs, false);
InputStreamReader isr = new InputStreamReader(serverProcess.getInputStream());
final BufferedReader br = new BufferedReader(isr);
String line = null;
while ((line = br.readLine()) != null) {
System.out.println("SERVER: " + line);
if ("OK".equals(line.trim())) {
new Thread() {
@Override
public void run() {
try {
String line1 = null;
while ((line1 = br.readLine()) != null) {
System.out.println("SERVER: " + line1);
final BufferedReader br = new BufferedReader(isr);
String line = null;
while ((line = br.readLine()) != null) {
System.out.println("SERVER: " + line);
if ("OK".equals(line.trim())) {
new Thread() {
@Override
public void run() {
try {
String line1 = null;
while ((line1 = br.readLine()) != null) {
System.out.println("SERVER: " + line1);
}
}
catch (Exception e) {
e.printStackTrace();
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}.start();
return;
}
else if ("KO".equals(line.trim())) {
// something went wrong with the server, destroy it:
serverProcess.destroy();
throw new IllegalStateException("Unable to start the spawned server :" + line);
}.start();
return;
}
else if ("KO".equals(line.trim())) {
// something went wrong with the server, destroy it:
serverProcess.destroy();
throw new IllegalStateException("Unable to start the spawned server :" + line);
}
}
}
else {
SpawnedJMSServer.startServer();
}
}
@Override
@ -248,18 +233,23 @@ public class ActiveMQAdmin implements Admin {
if (!serverLifeCycleActive) {
return;
}
OutputStreamWriter osw = new OutputStreamWriter(serverProcess.getOutputStream());
osw.write("STOP\n");
osw.flush();
int exitValue = serverProcess.waitFor();
if (exitValue != 0) {
serverProcess.destroy();
if (spawnServer) {
OutputStreamWriter osw = new OutputStreamWriter(serverProcess.getOutputStream());
osw.write("STOP\n");
osw.flush();
int exitValue = serverProcess.waitFor();
if (exitValue != 0) {
serverProcess.destroy();
}
}
else {
SpawnedJMSServer.stopServer();
}
}
private Object invokeSyncOperation(final String resourceName,
final String operationName,
final Object... parameters) throws Exception {
protected Object invokeSyncOperation(final String resourceName,
final String operationName,
final Object... parameters) throws Exception {
ClientMessage message = clientSession.createMessage(false);
ManagementHelper.putOperationInvocation(message, resourceName, operationName, parameters);
ClientMessage reply;
@ -282,6 +272,4 @@ public class ActiveMQAdmin implements Admin {
return ManagementHelper.getResult(reply);
}
// Inner classes -------------------------------------------------
}

View File

@ -14,38 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms;
package org.apache.activemq.artemis.common;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.utils.FileUtil;
public class SpawnedJMSServer {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
public static ActiveMQServer server;
public static JMSServerManager serverManager;
// Using files may be useful for debugging (through print-data for instance)
private static final boolean useFiles = false;
// Static --------------------------------------------------------
public static void main(final String[] args) throws Exception {
try {
Configuration config = new ConfigurationImpl().addAcceptorConfiguration(new TransportConfiguration(NettyAcceptorFactory.class.getName())).setSecurityEnabled(false).addConnectorConfiguration("netty", new TransportConfiguration(NettyConnectorFactory.class.getName()));
// disable server persistence since JORAM tests do not restart server
final ActiveMQServer server = ActiveMQServers.newActiveMQServer(config, false);
JMSServerManager serverManager = new JMSServerManagerImpl(server);
serverManager.start();
startServer();
System.out.println("Server started, ready to start client test");
@ -59,7 +54,7 @@ public class SpawnedJMSServer {
String line = null;
while ((line = br.readLine()) != null) {
if ("STOP".equals(line.trim())) {
server.stop();
stopServer();
System.out.println("Server stopped");
System.exit(0);
}
@ -82,6 +77,35 @@ public class SpawnedJMSServer {
}
}
public static ActiveMQServer startServer() throws Exception {
if (server == null) {
Configuration config = new ConfigurationImpl().addAcceptorConfiguration("netty", "tcp://localhost:61616").setSecurityEnabled(false).addConnectorConfiguration("netty", "tcp://localhost:61616");
File dataPlace = new File("./target/dataJoram");
FileUtil.deleteDirectory(dataPlace);
config.setJournalDirectory(new File(dataPlace, "./journal").getAbsolutePath()).
setPagingDirectory(new File(dataPlace, "./paging").getAbsolutePath()).
setLargeMessagesDirectory(new File(dataPlace, "./largemessages").getAbsolutePath()).
setBindingsDirectory(new File(dataPlace, "./bindings").getAbsolutePath()).setPersistenceEnabled(true);
// disable server persistence since JORAM tests do not restart server
server = ActiveMQServers.newActiveMQServer(config, useFiles);
serverManager = new JMSServerManagerImpl(server);
serverManager.start();
}
return server;
}
public static void stopServer() throws Exception {
if (server != null) {
serverManager.stop();
}
server = null;
serverManager = null;
}
// Constructors --------------------------------------------------
// Public --------------------------------------------------------

View File

@ -0,0 +1,482 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.common.testjndi;
import javax.naming.Binding;
import javax.naming.CompositeName;
import javax.naming.Context;
import javax.naming.LinkRef;
import javax.naming.Name;
import javax.naming.NameClassPair;
import javax.naming.NameNotFoundException;
import javax.naming.NameParser;
import javax.naming.NamingEnumeration;
import javax.naming.NamingException;
import javax.naming.NotContextException;
import javax.naming.OperationNotSupportedException;
import javax.naming.Reference;
import javax.naming.spi.NamingManager;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.jndi.NameParserImpl;
/**
* A read-only Context
* <p>
* This version assumes it and all its subcontext are
* read-only and any attempt to modify (e.g. through bind) will result in an
* OperationNotSupportedException. Each Context in the tree builds a cache of
* the entries in all sub-contexts to optimise the performance of lookup.
* <p>
* <p>
* This implementation is intended to optimise the performance of lookup(String)
* to about the level of a HashMap get. It has been observed that the scheme
* resolution phase performed by the JVM takes considerably longer, so for
* optimum performance lookups should be coded like:
* </p>
* <code>
* Context componentContext = (Context)new InitialContext().lookup("java:comp");
* String envEntry = (String) componentContext.lookup("env/myEntry");
* String envEntry2 = (String) componentContext.lookup("env/myEntry2");
* </code>
*/
@SuppressWarnings("unchecked")
public class TestContext implements Context, Serializable {
public static final String SEPARATOR = "/";
protected static final NameParser NAME_PARSER = new NameParserImpl();
private static final long serialVersionUID = -5754338187296859149L;
protected final Hashtable<String, Object> environment; // environment for this context
protected final Map<String, Object> bindings; // bindings at my level
protected final Map<String, Object> treeBindings; // all bindings under me
private boolean frozen;
private String nameInNamespace = "";
public TestContext() {
environment = new Hashtable<>();
bindings = new HashMap<>();
treeBindings = new HashMap<>();
}
public TestContext(Hashtable env) {
if (env == null) {
this.environment = new Hashtable<>();
}
else {
this.environment = new Hashtable<>(env);
}
this.bindings = Collections.EMPTY_MAP;
this.treeBindings = Collections.EMPTY_MAP;
}
public TestContext(Hashtable environment, Map<String, Object> bindings) {
if (environment == null) {
this.environment = new Hashtable<>();
}
else {
this.environment = new Hashtable<>(environment);
}
this.bindings = new HashMap<>();
treeBindings = new HashMap<>();
if (bindings != null) {
for (Map.Entry<String, Object> binding : bindings.entrySet()) {
try {
internalBind(binding.getKey(), binding.getValue());
}
catch (Throwable e) {
ActiveMQClientLogger.LOGGER.error("Failed to bind " + binding.getKey() + "=" + binding.getValue(), e);
}
}
}
frozen = true;
}
public TestContext(Hashtable environment, Map<String, Object> bindings, String nameInNamespace) {
this(environment, bindings);
this.nameInNamespace = nameInNamespace;
}
protected TestContext(TestContext clone, Hashtable env) {
this.bindings = clone.bindings;
this.treeBindings = clone.treeBindings;
this.environment = new Hashtable<>(env);
}
protected TestContext(TestContext clone, Hashtable<String, Object> env, String nameInNamespace) {
this(clone, env);
this.nameInNamespace = nameInNamespace;
}
public void freeze() {
frozen = true;
}
boolean isFrozen() {
return frozen;
}
/**
* internalBind is intended for use only during setup or possibly by
* suitably synchronized superclasses. It binds every possible lookup into a
* map in each context. To do this, each context strips off one name segment
* and if necessary creates a new context for it. Then it asks that context
* to bind the remaining name. It returns a map containing all the bindings
* from the next context, plus the context it just created (if it in fact
* created it). (the names are suitably extended by the segment originally
* lopped off).
*
* @param name
* @param value
* @return
* @throws NamingException
*/
protected Map<String, Object> internalBind(String name, Object value) throws NamingException {
assert name != null && name.length() > 0;
Map<String, Object> newBindings = new HashMap<>();
int pos = name.indexOf('/');
if (pos == -1) {
if (treeBindings.put(name, value) != null) {
throw new NamingException("Something already bound at " + name);
}
bindings.put(name, value);
newBindings.put(name, value);
}
else {
String segment = name.substring(0, pos);
assert segment != null;
assert !segment.equals("");
Object o = treeBindings.get(segment);
if (o == null) {
o = newContext();
treeBindings.put(segment, o);
bindings.put(segment, o);
newBindings.put(segment, o);
}
else if (!(o instanceof TestContext)) {
throw new NamingException("Something already bound where a subcontext should go");
}
TestContext readOnlyContext = (TestContext) o;
String remainder = name.substring(pos + 1);
Map<String, Object> subBindings = readOnlyContext.internalBind(remainder, value);
for (Map.Entry<String, Object> entry : subBindings.entrySet()) {
String subName = segment + "/" + entry.getKey();
Object bound = entry.getValue();
treeBindings.put(subName, bound);
newBindings.put(subName, bound);
}
}
return newBindings;
}
protected TestContext newContext() {
return new TestContext();
}
@Override
public Object addToEnvironment(String propName, Object propVal) throws NamingException {
return environment.put(propName, propVal);
}
@Override
public Hashtable<String, Object> getEnvironment() throws NamingException {
return (Hashtable<String, Object>) environment.clone();
}
@Override
public Object removeFromEnvironment(String propName) throws NamingException {
return environment.remove(propName);
}
@Override
public Object lookup(String name) throws NamingException {
if (name.length() == 0) {
return this;
}
Object result = treeBindings.get(name);
if (result == null) {
result = bindings.get(name);
}
if (result == null) {
int pos = name.indexOf(':');
if (pos > 0) {
String scheme = name.substring(0, pos);
Context ctx = NamingManager.getURLContext(scheme, environment);
if (ctx == null) {
throw new NamingException("scheme " + scheme + " not recognized");
}
return ctx.lookup(name);
}
else {
// Split out the first name of the path
// and look for it in the bindings map.
CompositeName path = new CompositeName(name);
if (path.size() == 0) {
return this;
}
else {
String first = path.get(0);
Object obj = bindings.get(first);
if (obj == null) {
throw new NameNotFoundException(name);
}
else if (obj instanceof Context && path.size() > 1) {
Context subContext = (Context) obj;
obj = subContext.lookup(path.getSuffix(1));
}
return obj;
}
}
}
if (result instanceof LinkRef) {
LinkRef ref = (LinkRef) result;
result = lookup(ref.getLinkName());
}
if (result instanceof Reference) {
try {
result = NamingManager.getObjectInstance(result, null, null, this.environment);
}
catch (NamingException e) {
throw e;
}
catch (Exception e) {
throw (NamingException) new NamingException("could not look up : " + name).initCause(e);
}
}
if (result instanceof TestContext) {
String prefix = getNameInNamespace();
if (prefix.length() > 0) {
prefix = prefix + SEPARATOR;
}
result = new TestContext((TestContext) result, environment, prefix + name);
}
return result;
}
@Override
public Object lookup(Name name) throws NamingException {
return lookup(name.toString());
}
@Override
public Object lookupLink(String name) throws NamingException {
return lookup(name);
}
@Override
public Name composeName(Name name, Name prefix) throws NamingException {
Name result = (Name) prefix.clone();
result.addAll(name);
return result;
}
@Override
public String composeName(String name, String prefix) throws NamingException {
CompositeName result = new CompositeName(prefix);
result.addAll(new CompositeName(name));
return result.toString();
}
@Override
public NamingEnumeration<NameClassPair> list(String name) throws NamingException {
Object o = lookup(name);
if (o == this) {
return new ListEnumeration();
}
else if (o instanceof Context) {
return ((Context) o).list("");
}
else {
throw new NotContextException();
}
}
@Override
public NamingEnumeration<Binding> listBindings(String name) throws NamingException {
Object o = lookup(name);
if (o == this) {
return new ListBindingEnumeration();
}
else if (o instanceof Context) {
return ((Context) o).listBindings("");
}
else {
throw new NotContextException();
}
}
@Override
public Object lookupLink(Name name) throws NamingException {
return lookupLink(name.toString());
}
@Override
public NamingEnumeration<NameClassPair> list(Name name) throws NamingException {
return list(name.toString());
}
@Override
public NamingEnumeration<Binding> listBindings(Name name) throws NamingException {
return listBindings(name.toString());
}
@Override
public void bind(Name name, Object obj) throws NamingException {
internalBind(name.toString(), obj);
}
@Override
public void bind(String name, Object obj) throws NamingException {
internalBind(name, obj);
}
@Override
public void close() throws NamingException {
// ignore
}
@Override
public Context createSubcontext(Name name) throws NamingException {
throw new OperationNotSupportedException();
}
@Override
public Context createSubcontext(String name) throws NamingException {
throw new OperationNotSupportedException();
}
@Override
public void destroySubcontext(Name name) throws NamingException {
throw new OperationNotSupportedException();
}
@Override
public void destroySubcontext(String name) throws NamingException {
throw new OperationNotSupportedException();
}
@Override
public String getNameInNamespace() throws NamingException {
return nameInNamespace;
}
@Override
public NameParser getNameParser(Name name) throws NamingException {
return NAME_PARSER;
}
@Override
public NameParser getNameParser(String name) throws NamingException {
return NAME_PARSER;
}
@Override
public void rebind(Name name, Object obj) throws NamingException {
throw new OperationNotSupportedException();
}
@Override
public void rebind(String name, Object obj) throws NamingException {
throw new OperationNotSupportedException();
}
@Override
public void rename(Name oldName, Name newName) throws NamingException {
throw new OperationNotSupportedException();
}
@Override
public void rename(String oldName, String newName) throws NamingException {
throw new OperationNotSupportedException();
}
@Override
public void unbind(Name name) throws NamingException {
treeBindings.remove(name.toString());
}
@Override
public void unbind(String name) throws NamingException {
treeBindings.remove(name);
}
private abstract class LocalNamingEnumeration implements NamingEnumeration {
private final Iterator<Map.Entry<String, Object>> i = bindings.entrySet().iterator();
@Override
public boolean hasMore() throws NamingException {
return i.hasNext();
}
@Override
public boolean hasMoreElements() {
return i.hasNext();
}
protected Map.Entry<String, Object> getNext() {
return i.next();
}
@Override
public void close() throws NamingException {
}
}
private class ListEnumeration extends LocalNamingEnumeration {
ListEnumeration() {
}
@Override
public Object next() throws NamingException {
return nextElement();
}
@Override
public Object nextElement() {
Map.Entry<String, Object> entry = getNext();
return new NameClassPair(entry.getKey(), entry.getValue().getClass().getName());
}
}
private class ListBindingEnumeration extends LocalNamingEnumeration {
ListBindingEnumeration() {
}
@Override
public Object next() throws NamingException {
return nextElement();
}
@Override
public Object nextElement() {
Map.Entry<String, Object> entry = getNext();
return new Binding(entry.getKey(), entry.getValue());
}
}
}

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.common.testjndi;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.spi.InitialContextFactory;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jndi.LazyCreateContext;
import org.apache.activemq.artemis.uri.ConnectionFactoryParser;
/**
* A factory of the ActiveMQ Artemis InitialContext which contains
* {@link ConnectionFactory} instances as well as a child context called
* <i>destinations</i> which contain all of the current active destinations, in
* child context depending on the QoS such as transient or durable and queue or
* topic.
*/
public class TestContextFactory implements InitialContextFactory {
public static final String REFRESH_TIMEOUT = "refreshTimeout";
public static final String DISCOVERY_INITIAL_WAIT_TIMEOUT = "discoveryInitialWaitTimeout";
public static final String DYNAMIC_QUEUE_CONTEXT = "dynamicQueues";
public static final String DYNAMIC_TOPIC_CONTEXT = "dynamicTopics";
private String connectionFactoryPrefix = "connectionFactory.";
private String queuePrefix = "queue.";
private String topicPrefix = "topic.";
@Override
public Context getInitialContext(Hashtable<?, ?> environment) throws NamingException {
// lets create a factory
Map<String, Object> data = new ConcurrentHashMap<>();
for (Map.Entry<?, ?> entry : environment.entrySet()) {
String key = entry.getKey().toString();
if (key.startsWith(connectionFactoryPrefix)) {
String jndiName = key.substring(connectionFactoryPrefix.length());
try {
ConnectionFactory factory = createConnectionFactory((String) environment.get(key), jndiName);
data.put(jndiName, factory);
}
catch (Exception e) {
e.printStackTrace();
throw new NamingException("Invalid broker URL");
}
}
}
createQueues(data, environment);
createTopics(data, environment);
data.put(DYNAMIC_QUEUE_CONTEXT, new LazyCreateContext() {
private static final long serialVersionUID = 6503881346214855588L;
@Override
protected Object createEntry(String name) {
return ActiveMQJMSClient.createQueue(name);
}
});
data.put(DYNAMIC_TOPIC_CONTEXT, new LazyCreateContext() {
private static final long serialVersionUID = 2019166796234979615L;
@Override
protected Object createEntry(String name) {
return ActiveMQJMSClient.createTopic(name);
}
});
return createContext(environment, data);
}
// Properties
// -------------------------------------------------------------------------
public String getTopicPrefix() {
return topicPrefix;
}
public void setTopicPrefix(String topicPrefix) {
this.topicPrefix = topicPrefix;
}
public String getQueuePrefix() {
return queuePrefix;
}
public void setQueuePrefix(String queuePrefix) {
this.queuePrefix = queuePrefix;
}
// Implementation methods
// -------------------------------------------------------------------------
protected Context createContext(Hashtable<?, ?> environment, Map<String, Object> data) {
return new TestContext(environment, data);
}
protected void createQueues(Map<String, Object> data, Hashtable<?, ?> environment) {
for (Map.Entry<?, ?> entry : environment.entrySet()) {
String key = entry.getKey().toString();
if (key.startsWith(queuePrefix)) {
String jndiName = key.substring(queuePrefix.length());
data.put(jndiName, createQueue(entry.getValue().toString()));
}
}
}
protected void createTopics(Map<String, Object> data, Hashtable<?, ?> environment) {
for (Map.Entry<?, ?> entry : environment.entrySet()) {
String key = entry.getKey().toString();
if (key.startsWith(topicPrefix)) {
String jndiName = key.substring(topicPrefix.length());
data.put(jndiName, createTopic(entry.getValue().toString()));
}
}
}
/**
* Factory method to create new Queue instances
*/
protected Queue createQueue(String name) {
return ActiveMQJMSClient.createQueue(name);
}
/**
* Factory method to create new Topic instances
*/
protected Topic createTopic(String name) {
return ActiveMQJMSClient.createTopic(name);
}
/**
* Factory method to create a new connection factory from the given environment
*/
protected ConnectionFactory createConnectionFactory(String uri, String name) throws Exception {
ConnectionFactoryParser parser = new ConnectionFactoryParser();
return parser.newObject(parser.expandURI(uri), name);
}
}

View File

@ -1,105 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.objectweb.jtests.jms.admin.Admin;
/**
* AbstractAdmin.
*/
public class AbstractAdmin implements Admin {
@Override
public String getName() {
return getClass().getName();
}
@Override
public void start() {
}
@Override
public void stop() throws Exception {
}
@Override
public InitialContext createContext() throws NamingException {
return new InitialContext();
}
@Override
public void createConnectionFactory(final String name) {
throw new RuntimeException("FIXME NYI createConnectionFactory");
}
@Override
public void deleteConnectionFactory(final String name) {
throw new RuntimeException("FIXME NYI deleteConnectionFactory");
}
@Override
public void createQueue(final String name) {
throw new RuntimeException("FIXME NYI createQueue");
}
@Override
public void deleteQueue(final String name) {
throw new RuntimeException("FIXME NYI deleteQueue");
}
@Override
public void createQueueConnectionFactory(final String name) {
createConnectionFactory(name);
}
@Override
public void deleteQueueConnectionFactory(final String name) {
deleteConnectionFactory(name);
}
@Override
public void createTopic(final String name) {
throw new RuntimeException("FIXME NYI createTopic");
}
@Override
public void deleteTopic(final String name) {
throw new RuntimeException("FIXME NYI deleteTopic");
}
@Override
public void createTopicConnectionFactory(final String name) {
createConnectionFactory(name);
}
@Override
public void deleteTopicConnectionFactory(final String name) {
deleteConnectionFactory(name);
}
@Override
public void startServer() {
}
@Override
public void stopServer() {
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Hashtable;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.common.AbstractAdmin;
import org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory;
public class ActiveMQCoreAdmin extends AbstractAdmin {
private Context context;
Hashtable<String, String> jndiProps = new Hashtable<>();
public ActiveMQCoreAdmin() {
super();
jndiProps.put(Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getCanonicalName());
try {
Hashtable<String, String> env = new Hashtable<>();
env.put("java.naming.factory.initial", "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
env.put("java.naming.provider.url", "tcp://localhost:61616");
context = new InitialContext(env);
}
catch (NamingException e) {
e.printStackTrace();
}
}
@Override
public void start() throws Exception {
super.start();
}
@Override
public void stop() throws Exception {
super.stop();
}
@Override
public void createConnectionFactory(final String name) {
createConnection(name, 0);
jndiProps.put("connectionFactory." + name, "tcp://127.0.0.1:61616?type=CF");
}
private void createConnection(final String name, final int cfType) {
try {
invokeSyncOperation(ResourceNames.JMS_SERVER, "createConnectionFactory", name, false, false, cfType, "netty", name);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public Context createContext() throws NamingException {
return new InitialContext(jndiProps);
}
@Override
public void createQueue(final String name) {
super.createQueue(name);
jndiProps.put("queue." + name, name);
}
@Override
public void createQueueConnectionFactory(final String name) {
createConnection(name, 1);
jndiProps.put("connectionFactory." + name, "tcp://127.0.0.1:61616?type=QUEUE_CF");
}
@Override
public void createTopic(final String name) {
super.createTopic(name);
jndiProps.put("topic." + name, name);
}
@Override
public void createTopicConnectionFactory(final String name) {
createConnection(name, 2);
jndiProps.put("connectionFactory." + name, "tcp://127.0.0.1:61616?type=TOPIC_CF");
}
@Override
public void deleteConnectionFactory(final String name) {
try {
invokeSyncOperation(ResourceNames.JMS_SERVER, "destroyConnectionFactory", name);
jndiProps.remove("connectionFactory." + name);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public void deleteQueue(final String name) {
super.deleteQueue(name);
jndiProps.remove("queue." + name);
}
@Override
public void deleteQueueConnectionFactory(final String name) {
deleteConnectionFactory(name);
jndiProps.remove("connectionFactory." + name);
}
@Override
public void deleteTopic(final String name) {
super.deleteTopic(name);
jndiProps.remove("topic." + name);
}
@Override
public void deleteTopicConnectionFactory(final String name) {
deleteConnectionFactory(name);
jndiProps.remove("connectionFactory." + name);
}
@Override
public String getName() {
return this.getClass().getName();
}
// Inner classes -------------------------------------------------
}

View File

@ -1,129 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms;
import javax.naming.Context;
import javax.naming.NamingException;
import org.jboss.logging.Logger;
import org.objectweb.jtests.jms.admin.Admin;
/**
* GenericAdmin.
*
* @FIXME delegate to a JBoss defined admin class
*/
public class GenericAdmin implements Admin {
public static final Logger log = Logger.getLogger(GenericAdmin.class);
public static Admin delegate = new AbstractAdmin();
@Override
public String getName() {
String name = GenericAdmin.delegate.getName();
GenericAdmin.log.debug("Using admin '" + name + "' delegate=" + GenericAdmin.delegate);
return name;
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public Context createContext() throws NamingException {
Context ctx = GenericAdmin.delegate.createContext();
GenericAdmin.log.debug("Using initial context: " + ctx.getEnvironment());
return ctx;
}
@Override
public void createConnectionFactory(final String name) {
GenericAdmin.log.debug("createConnectionFactory '" + name + "'");
GenericAdmin.delegate.createConnectionFactory(name);
}
@Override
public void deleteConnectionFactory(final String name) {
GenericAdmin.log.debug("deleteConnectionFactory '" + name + "'");
GenericAdmin.delegate.deleteConnectionFactory(name);
}
@Override
public void createQueue(final String name) {
GenericAdmin.log.debug("createQueue '" + name + "'");
GenericAdmin.delegate.createQueue(name);
}
@Override
public void deleteQueue(final String name) {
GenericAdmin.log.debug("deleteQueue '" + name + "'");
GenericAdmin.delegate.deleteQueue(name);
}
@Override
public void createQueueConnectionFactory(final String name) {
GenericAdmin.log.debug("createQueueConnectionFactory '" + name + "'");
GenericAdmin.delegate.createQueueConnectionFactory(name);
}
@Override
public void deleteQueueConnectionFactory(final String name) {
GenericAdmin.log.debug("deleteQueueConnectionFactory '" + name + "'");
GenericAdmin.delegate.deleteQueueConnectionFactory(name);
}
@Override
public void createTopic(final String name) {
GenericAdmin.log.debug("createTopic '" + name + "'");
GenericAdmin.delegate.createTopic(name);
}
@Override
public void deleteTopic(final String name) {
GenericAdmin.log.debug("deleteTopic '" + name + "'");
GenericAdmin.delegate.deleteTopic(name);
}
@Override
public void createTopicConnectionFactory(final String name) {
GenericAdmin.log.debug("createTopicConnectionFactory '" + name + "'");
GenericAdmin.delegate.createTopicConnectionFactory(name);
}
@Override
public void deleteTopicConnectionFactory(final String name) {
GenericAdmin.log.debug("deleteTopicConnectionFactory '" + name + "'");
GenericAdmin.delegate.deleteTopicConnectionFactory(name);
}
@Override
public void startServer() throws Exception {
GenericAdmin.log.debug("startEmbeddedServer");
GenericAdmin.delegate.startServer();
}
@Override
public void stopServer() throws Exception {
GenericAdmin.log.debug("stopEmbeddedServer");
GenericAdmin.delegate.stopServer();
}
}

View File

@ -49,7 +49,7 @@ import org.objectweb.jtests.jms.framework.JMSTestCase;
@RunWith(Suite.class)
@SuiteClasses({TopicConnectionTest.class, ConnectionTest.class, MessageBodyTest.class, MessageDefaultTest.class, MessageTypeTest.class, MessageHeaderTest.class, JMSXPropertyTest.class, MessagePropertyConversionTest.class, MessagePropertyTest.class, QueueBrowserTest.class, TemporaryQueueTest.class, SelectorSyntaxTest.class, SelectorTest.class, QueueSessionTest.class, SessionTest.class, TopicSessionTest.class, UnifiedSessionTest.class, TemporaryTopicTest.class,})
public class JoramAggregationTest extends Assert {
public class JoramCoreAggregationTest extends Assert {
/**
* Should be overridden
@ -58,7 +58,7 @@ public class JoramAggregationTest extends Assert {
*/
protected static Properties getProviderProperties() throws IOException {
Properties props = new Properties();
props.load(ClassLoader.getSystemResourceAsStream(JMSTestCase.PROP_FILE_NAME));
props.load(ClassLoader.getSystemResourceAsStream(JMSTestCase.getPropFileName()));
return props;
}
@ -66,6 +66,7 @@ public class JoramAggregationTest extends Assert {
@BeforeClass
public static void setUpServer() throws Exception {
JMSTestCase.setPropFileName("provider.properties");
JMSTestCase.startServer = false;
// Admin step
// gets the provider administration wrapper...

View File

@ -35,7 +35,16 @@ import org.objectweb.jtests.jms.admin.AdminFactory;
*/
public abstract class JMSTestCase extends Assert {
public static final String PROP_FILE_NAME = "provider.properties";
public static String _PROP_FILE_NAME = "provider.properties";
public static String getPropFileName() {
return System.getProperty("joram.provider", _PROP_FILE_NAME);
}
public static void setPropFileName(String fileName) {
System.setProperty("joram.provider", fileName);
_PROP_FILE_NAME = fileName;
}
public static boolean startServer = true;
@ -70,7 +79,7 @@ public abstract class JMSTestCase extends Assert {
*/
protected Properties getProviderProperties() throws IOException {
Properties props = new Properties();
props.load(ClassLoader.getSystemResourceAsStream(JMSTestCase.PROP_FILE_NAME));
props.load(ClassLoader.getSystemResourceAsStream(getPropFileName()));
return props;
}

View File

@ -24,11 +24,7 @@ import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory;
import org.junit.After;
import org.junit.Before;
@ -115,11 +111,7 @@ public abstract class PTPTestCase extends JMSTestCase {
admin.createQueueConnectionFactory(PTPTestCase.QCF_NAME);
admin.createQueue(PTPTestCase.QUEUE_NAME);
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getCanonicalName());
props.put("connectionFactory." + PTPTestCase.QCF_NAME, "tcp://127.0.0.1:61616?type=QUEUE_CF");
props.put("queue." + PTPTestCase.QUEUE_NAME, PTPTestCase.QUEUE_NAME);
Context ctx = new InitialContext(props);
Context ctx = admin.createContext();
senderQCF = (QueueConnectionFactory) ctx.lookup(PTPTestCase.QCF_NAME);
senderQueue = (Queue) ctx.lookup(PTPTestCase.QUEUE_NAME);

View File

@ -24,11 +24,7 @@ import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory;
import org.junit.After;
import org.junit.Before;
@ -115,11 +111,7 @@ public abstract class PubSubTestCase extends JMSTestCase {
admin.createTopicConnectionFactory(PubSubTestCase.TCF_NAME);
admin.createTopic(PubSubTestCase.TOPIC_NAME);
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getCanonicalName());
props.put("connectionFactory." + PubSubTestCase.TCF_NAME, "tcp://127.0.0.1:61616?type=TOPIC_CF");
props.put("topic." + PubSubTestCase.TOPIC_NAME, PubSubTestCase.TOPIC_NAME);
Context ctx = new InitialContext(props);
Context ctx = admin.createContext();
publisherTCF = (TopicConnectionFactory) ctx.lookup(PubSubTestCase.TCF_NAME);
publisherTopic = (Topic) ctx.lookup(PubSubTestCase.TOPIC_NAME);
@ -140,6 +132,7 @@ public abstract class PubSubTestCase extends JMSTestCase {
// end of client step
}
catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}

View File

@ -27,11 +27,7 @@ import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Hashtable;
import org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory;
import org.junit.After;
import org.junit.Before;
@ -165,15 +161,7 @@ public abstract class UnifiedTestCase extends JMSTestCase {
admin.createQueue(UnifiedTestCase.QUEUE_NAME);
admin.createTopic(UnifiedTestCase.TOPIC_NAME);
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getCanonicalName());
props.put("connectionFactory." + UnifiedTestCase.CF_NAME, "tcp://127.0.0.1:61616");
props.put("connectionFactory." + UnifiedTestCase.QCF_NAME, "tcp://127.0.0.1:61616?type=QUEUE_CF");
props.put("connectionFactory." + UnifiedTestCase.TCF_NAME, "tcp://127.0.0.1:61616?type=TOPIC_CF");
props.put("queue." + UnifiedTestCase.DESTINATION_NAME, UnifiedTestCase.DESTINATION_NAME);
props.put("queue." + UnifiedTestCase.QUEUE_NAME, UnifiedTestCase.QUEUE_NAME);
props.put("topic." + UnifiedTestCase.TOPIC_NAME, UnifiedTestCase.TOPIC_NAME);
Context ctx = new InitialContext(props);
Context ctx = admin.createContext();
producerCF = (ConnectionFactory) ctx.lookup(UnifiedTestCase.CF_NAME);
// we see destination of the unified domain as a javax.jms.Destination

View File

@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##
# This property is used to chose which provider is to be tested
# Uncomment the chosen provider and comment the other ones
##
jms.provider.admin.class=org.apache.activemq.artemis.amqpJMS.ActiveMQAMQPAdmin
#jms.provider.admin.class = org.apache.activemq.artemis.api.jms.GenericAdmin
#jms.provider.admin.class = org.objectweb.jtests.providers.admin.JoramAdmin
#jms.provider.admin.class = org.objectweb.jtests.providers.admin.AshnaMQAdmin
#jms.provider.admin.class = org.objectweb.jtests.providers.admin.FioranoMQAdmin
#jms.provider.admin.class = org.objectweb.jtests.providers.admin.PramatiAdmin
#jms.provider.admin.class = org.objectweb.jtests.providers.admin.SwiftMQAdmin

View File

@ -19,7 +19,7 @@
# Uncomment the chosen provider and comment the other ones
##
jms.provider.admin.class=org.apache.activemq.artemis.jms.ActiveMQAdmin
jms.provider.admin.class=org.apache.activemq.artemis.jms.ActiveMQCoreAdmin
#jms.provider.admin.class = org.apache.activemq.artemis.api.jms.GenericAdmin
#jms.provider.admin.class = org.objectweb.jtests.providers.admin.JoramAdmin
#jms.provider.admin.class = org.objectweb.jtests.providers.admin.AshnaMQAdmin

View File

@ -18,4 +18,4 @@
# Time in milliseconds or 0 for never expiring
# Default is set to 30 seconds (long enough to receive slow messages
# and won't hang up tests infinitely)
timeout = 30000
timeout = 1000

View File

@ -17,11 +17,11 @@
package org.apache.activemq.artemis.tests.unit.jms.client;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.junit.Test;
import org.junit.Assert;
import org.apache.activemq.artemis.jms.client.SelectorTranslator;
public class SelectorTranslatorTest extends ActiveMQTestBase {