OSGi support based on branch from gnodet

This commit is contained in:
Christian Schneider 2015-11-24 11:12:10 +01:00
parent 37e7ba9f12
commit be9dad3f2b
31 changed files with 1066 additions and 12 deletions

View File

@ -38,3 +38,8 @@ If you are trying to copy the examples somewhere else and modifying them. Consid
# if trying to modify the 'topic' example:
cd examples/jms/topic && mvn dependency:list
## Artemis on Apache Karaf
feature:repo-add mvn:org.apache.activemq/artemis-features/1.1.1-SNAPSHOT/xml
feature:install artemis-core artemis-hornetq artemis-stomp artemis-mqtt artemis-amqp

View File

@ -41,7 +41,8 @@
<dependency>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
<scope>compile</scope>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>

55
artemis-features/pom.xml Normal file
View File

@ -0,0 +1,55 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-pom</artifactId>
<version>1.1.1-SNAPSHOT</version>
</parent>
<artifactId>artemis-features</artifactId>
<packaging>pom</packaging>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>filter</id>
<phase>generate-resources</phase>
<goals>
<goal>resources</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>attach-artifacts</id>
<phase>package</phase>
<goals>
<goal>attach-artifact</goal>
</goals>
<configuration>
<artifacts>
<artifact>
<file>target/classes/features.xml</file>
<type>xml</type>
</artifact>
</artifacts>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<features name="artemis-${pom.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.3.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.3.0 http://karaf.apache.org/xmlns/features/v1.3.0">
<feature name="netty-core" version="${netty.version}">
<bundle>mvn:io.netty/netty-common/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-buffer/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-codec/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-handler/${netty.version}</bundle>
</feature>
<feature name="artemis-core" version="${pom.version}">
<feature>transaction</feature>
<feature>netty-core</feature>
<feature>scr</feature>
<bundle>mvn:org.apache.geronimo.specs/geronimo-jms_2.0_spec/${geronimo.jms.2.spec.version}</bundle>
<bundle>mvn:com.google.guava/guava/18.0</bundle>
<bundle>mvn:io.netty/netty-codec-http/${netty.version}</bundle>
<bundle>mvn:commons-beanutils/commons-beanutils/1.9.2</bundle>
<bundle>mvn:commons-collections/commons-collections/3.2.1</bundle>
<bundle>mvn:org.jboss.logging/jboss-logging/3.1.4.GA</bundle>
<!-- <bundle>wrap:mvn:org.jboss.modules/jboss-modules/1.3.1.Final</bundle> -->
<!-- <bundle>mvn:org.jboss.logmanager/jboss-logmanager/1.5.3.Final</bundle>-->
<bundle>mvn:org.jgroups/jgroups/3.6.0.Final</bundle>
<bundle>mvn:org.apache.activemq/artemis-native/${pom.version}</bundle>
<bundle>mvn:org.apache.activemq/artemis-server-osgi/${pom.version}</bundle>
</feature>
<feature name="artemis-amqp" version="${pom.version}">
<feature>artemis-core</feature>
<bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.qpid/0.32_1</bundle>
<bundle>wrap:mvn:org.apache.qpid/proton-j/0.10</bundle>
<bundle>wrap:mvn:org.apache.qpid/proton-jms/0.10</bundle>
<bundle>mvn:org.apache.activemq/artemis-proton-plug/${pom.version}</bundle>
<bundle>mvn:org.apache.activemq/artemis-amqp-protocol/${pom.version}</bundle>
</feature>
<feature name="artemis-stomp" version="${pom.version}">
<feature>artemis-core</feature>
<bundle>mvn:org.apache.activemq/artemis-stomp-protocol/${pom.version}</bundle>
</feature>
<feature name="artemis-mqtt" version="${pom.version}">
<feature>artemis-core</feature>
<bundle>mvn:org.apache.activemq/artemis-mqtt-protocol/${pom.version}</bundle>
</feature>
<feature name="artemis-hornetq" version="${pom.version}">
<feature>artemis-core</feature>
<bundle>mvn:org.apache.activemq/artemis-hqclient-protocol/${pom.version}</bundle>
<bundle>mvn:org.apache.activemq/artemis-hornetq-protocol/${pom.version}</bundle>
</feature>
</features>

View File

@ -25,14 +25,8 @@
</parent>
<artifactId>artemis-native</artifactId>
<packaging>jar</packaging>
<packaging>bundle</packaging>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>

View File

@ -26,8 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/**
* This class is used as an aggregator for the {@link LibaioFile}.
* <br>
@ -309,6 +307,17 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
return new LibaioFile<>(res, null);
}
/**
* Checks that the given argument is not null. If it is, throws {@link NullPointerException}.
* Otherwise, returns the argument.
*/
private static <T> T checkNotNull(T arg, String text) {
if (arg == null) {
throw new NullPointerException(text);
}
return arg;
}
/**
* It will poll the libaio queue for results. It should block until min is reached

View File

@ -101,4 +101,5 @@
</dependencies>
<packaging>bundle</packaging>
</project>

View File

@ -21,10 +21,13 @@ import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.osgi.service.component.annotations.Component;
import java.util.Collections;
import java.util.List;
@Component(service=ProtocolManagerFactory.class)
public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
private static final String AMQP_PROTOCOL_NAME = "AMQP";

View File

@ -23,6 +23,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>artemis-hornetq-protocol</artifactId>
<packaging>bundle</packaging>
<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>

View File

@ -22,7 +22,10 @@ import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.osgi.service.component.annotations.Component;
@Component(service = ProtocolManagerFactory.class)
public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory {
public static final String HORNETQ_PROTOCOL_NAME = "HORNETQ";
@ -31,6 +34,7 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory {
private static String[] SUPPORTED_PROTOCOLS = {HORNETQ_PROTOCOL_NAME};
@Override
public ProtocolManager createProtocolManager(final ActiveMQServer server,
final List<Interceptor> incomingInterceptors,
List<Interceptor> outgoingInterceptors) {

View File

@ -23,6 +23,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>artemis-hqclient-protocol</artifactId>
<packaging>bundle</packaging>
<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
@ -50,4 +51,19 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>3.0.0</version>
<extensions>true</extensions>
<configuration>
<instructions>
<!-- <Bundle-Activator>org.apache.activemq.artemis.core.protocol.hornetq.Activator</Bundle-Activator> -->
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -21,7 +21,9 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
import org.osgi.service.component.annotations.Component;
@Component(service = ClientProtocolManagerFactory.class)
public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory {

View File

@ -23,6 +23,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>artemis-mqtt-protocol</artifactId>
<packaging>bundle</packaging>
<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
@ -52,4 +53,21 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>3.0.0</version>
<extensions>true</extensions>
<configuration>
<instructions>
<Embed-Dependency>netty-codec-mqtt</Embed-Dependency>
<Export-Package>!*</Export-Package>
<Import-Package>io.netty.*;version="[4,6)", *</Import-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -22,7 +22,9 @@ import java.util.List;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.osgi.service.component.annotations.Component;
@Component(service=ProtocolManagerFactory.class)
public class MQTTProtocolManagerFactory implements ProtocolManagerFactory {
public static final String MQTT_PROTOCOL_NAME = "MQTT";

View File

@ -63,4 +63,5 @@
</dependency>
</dependencies>
<packaging>bundle</packaging>
</project>

View File

@ -24,7 +24,10 @@ import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.osgi.service.component.annotations.Component;
@Component(service = ProtocolManagerFactory.class)
public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE";

View File

@ -110,4 +110,5 @@
</dependencies>
<packaging>bundle</packaging>
</project>

View File

@ -50,4 +50,5 @@
</dependency>
</dependencies>
<packaging>bundle</packaging>
</project>

View File

@ -22,7 +22,10 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.osgi.service.component.annotations.Component;
@Component(service = ProtocolManagerFactory.class)
public class StompProtocolManagerFactory extends AbstractProtocolManagerFactory<StompFrameInterceptor> {
public static final String STOMP_PROTOCOL_NAME = "STOMP";

1
artemis-server-osgi/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target/

123
artemis-server-osgi/pom.xml Normal file
View File

@ -0,0 +1,123 @@
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-pom</artifactId>
<version>1.1.1-SNAPSHOT</version>
</parent>
<artifactId>artemis-server-osgi</artifactId>
<packaging>bundle</packaging>
<name>ActiveMQ Artemis Server OSGi</name>
<description>
Combines the commons, core-client and server jars as they contain too many duplicate packages
to be deployed separately.
</description>
<properties>
<activemq.basedir>${project.basedir}/..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-journal</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-selector</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-service-extensions</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jboss.modules</groupId>
<artifactId>jboss-modules</artifactId>
<version>1.3.1.Final</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>3.0.0</version>
<extensions>true</extensions>
<configuration>
<instructions>
<Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>
<Bundle-Activator>org.apache.activemq.artemis.osgi.Activator</Bundle-Activator>
<_exportcontents>*</_exportcontents>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,30 @@
package org.apache.activemq.artemis.osgi;
import java.util.Hashtable;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.cm.ManagedServiceFactory;
public class Activator implements BundleActivator {
ArtemisBrokerFactory factory;
ServiceRegistration<ManagedServiceFactory> registration;
@Override
public void start(BundleContext context) throws Exception {
factory = new ArtemisBrokerFactory(context);
Hashtable<String, String> props = new Hashtable<>();
props.put(Constants.SERVICE_PID, ArtemisBrokerFactory.PID);
registration = context.registerService(ManagedServiceFactory.class, factory, props);
}
@Override
public void stop(BundleContext context) throws Exception {
registration.unregister();
factory.destroy();
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.osgi;
import java.util.Dictionary;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.jboss.logging.Logger;
import org.osgi.framework.BundleContext;
public class ArtemisBrokerFactory extends BaseManagedServiceFactory<OsgiBroker, OsgiBroker> {
public static final String PID = "org.apache.activemq.artemis";
private static final Logger LOGGER = Logger.getLogger(ArtemisBrokerFactory.class);
public ArtemisBrokerFactory(BundleContext context) {
super(context, ArtemisBrokerFactory.class.getName());
}
@Override
protected OsgiBroker doCreate(Dictionary<String, ?> properties) throws Exception {
String config = getMandatory(properties, "config");
String name = getMandatory(properties, "name");
String domain = getMandatory(properties, "domain");
ActiveMQJAASSecurityManager security = new ActiveMQJAASSecurityManager(domain);
String serverInstanceDir = null;
String karafDataDir = System.getProperty("karaf.data");
if (karafDataDir != null) {
serverInstanceDir = karafDataDir + "/artemis/" + name;
}
OsgiBroker server = new OsgiBroker(getContext(), name, serverInstanceDir, config, security);
server.start();
return server;
}
private String getMandatory(Dictionary<String, ?> properties, String key) {
String value = (String) properties.get(key);
if (value == null) {
throw new IllegalStateException("Property " + key + " must be set");
}
return value;
}
@Override
protected void doDestroy(OsgiBroker broker) throws Exception {
broker.stop();
}
@Override
protected OsgiBroker register(OsgiBroker broker, Dictionary<String, ?> properties) {
broker.register(getContext(), properties);
return broker;
}
@Override
protected void unregister(OsgiBroker broker) {
broker.unregister();
}
@Override
protected void warn(String message, Throwable t) {
LOGGER.warn(message, t);
}
@Override
protected void info(String message, Throwable t) {
LOGGER.info(message, t);
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.osgi;
import java.util.Dictionary;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.osgi.framework.BundleContext;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.cm.ManagedServiceFactory;
public abstract class BaseManagedServiceFactory<T, R> implements ManagedServiceFactory {
public static final long DEFAULT_TIMEOUT_BEFORE_INTERRUPT = 30000;
private final BundleContext context;
private final String name;
private final long timeoutBeforeInterrupt;
private final AtomicBoolean destroyed;
private final ExecutorService executor;
private final Map<String, Pair<T, R>> services;
public BaseManagedServiceFactory(BundleContext context, String name) {
this(context, name, DEFAULT_TIMEOUT_BEFORE_INTERRUPT);
}
public BaseManagedServiceFactory(BundleContext context, String name, long timeoutBeforeInterrupt) {
this.context = context;
this.name = name;
this.timeoutBeforeInterrupt = timeoutBeforeInterrupt;
this.destroyed = new AtomicBoolean(false);
this.executor = Executors.newSingleThreadExecutor();
this.services = new ConcurrentHashMap<>();
}
public String getName() {
return name;
}
public BundleContext getContext() {
return context;
}
public void updated(final String pid, final Dictionary<String, ?> properties) throws ConfigurationException {
if (destroyed.get()) {
return;
}
executor.submit(new Runnable() {
public void run() {
try {
internalUpdate(pid, properties);
} catch (Throwable t) {
warn("Error destroying service for ManagedServiceFactory " + getName(), t);
}
}
});
}
public void deleted(final String pid) {
if (destroyed.get()) {
return;
}
executor.submit(new Runnable() {
public void run() {
try {
internalDelete(pid);
} catch (Throwable throwable) {
warn("Error destroying service for ManagedServiceFactory " + getName(), throwable);
}
}
});
}
private void internalUpdate(String pid, Dictionary<String, ?> properties) {
Pair<T, R> pair = services.remove(pid);
if (pair != null) {
internalDelete(pid);
}
if (destroyed.get()) {
return;
}
try {
T t = doCreate(properties);
try {
if (destroyed.get()) {
throw new IllegalStateException("ManagedServiceFactory has been destroyed");
}
R registration = register(t, properties);
services.put(pid, new Pair<>(t, registration));
} catch (Throwable throwable1) {
try {
doDestroy(t);
} catch (Throwable throwable2) {
// Ignore
}
throw throwable1;
}
} catch (Throwable throwable) {
warn("Error creating service for ManagedServiceFactory " + getName(), throwable);
}
}
private void internalDelete(String pid) {
Pair<T, R> pair = services.remove(pid);
if (pair != null) {
try {
unregister(pair.getSecond());
} catch (Throwable t) {
info("Error unregistering service", t);
}
try {
doDestroy(pair.getFirst());
} catch (Throwable t) {
info("Error destroying service", t);
}
}
}
protected abstract T doCreate(Dictionary<String, ?> properties) throws Exception;
protected abstract void doDestroy(T t) throws Exception;
protected abstract R register(T t, Dictionary<String, ?> properties);
protected abstract void unregister(R registration);
protected abstract void warn(String message, Throwable t);
protected abstract void info(String message, Throwable t);
public void destroy() {
if (destroyed.compareAndSet(false, true)) {
executor.shutdown();
try {
executor.awaitTermination(timeoutBeforeInterrupt, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Shutdown interrupted");
}
if (!executor.isTerminated()) {
executor.shutdownNow();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Shutdown interrupted");
}
}
while (!services.isEmpty()) {
String pid = services.keySet().iterator().next();
internalDelete(pid);
}
}
}
static class Pair<U,V> {
private U first;
private V second;
public Pair(U first, V second) {
this.first = first;
this.second = second;
}
public U getFirst() {
return first;
}
public V getSecond() {
return second;
}
public void setFirst(U first) {
this.first = first;
}
public void setSecond(V second) {
this.second = second;
}
}
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.osgi;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.util.tracker.ServiceTracker;
@SuppressWarnings({"unchecked", "rawtypes"})
public class OsgiBroker {
private final String name;
private final String configurationUrl;
private final String brokerInstance;
private boolean started;
private final ActiveMQSecurityManager securityManager;
private Map<String, ActiveMQComponent> components;
private Map<String, ServiceRegistration<?>> registrations;
private BundleContext context;
private ServiceTracker tracker;
public OsgiBroker(BundleContext context, String name, String brokerInstance, String configuration, ActiveMQSecurityManager security) {
this.context = context;
this.name = name;
this.brokerInstance = brokerInstance;
this.securityManager = security;
this.configurationUrl = configuration;
}
public synchronized void start() throws Exception {
if (tracker != null) {
return;
}
// todo if we start to pullout more configs from the main config then we
// should pull out the configuration objects from factories if available
FileConfiguration configuration = new FileConfiguration();
if (brokerInstance != null) {
configuration.setBrokerInstance(new File(brokerInstance));
}
FileJMSConfiguration jmsConfiguration = new FileJMSConfiguration();
FileDeploymentManager fileDeploymentManager = new FileDeploymentManager(configurationUrl);
fileDeploymentManager.addDeployable(configuration).addDeployable(jmsConfiguration).readConfiguration();
components = fileDeploymentManager.buildService(securityManager, ManagementFactory.getPlatformMBeanServer());
final ActiveMQServer server = (ActiveMQServer)components.get("core");
String[] requiredProtocols = getRequiredProtocols(server.getConfiguration().getAcceptorConfigurations());
ProtocolTrackerCallBack callback = new ProtocolTrackerCallBack() {
@Override
public void addFactory(ProtocolManagerFactory<Interceptor> pmf) {
server.addProtocolManagerFactory(pmf);
}
@Override
public void removeFactory(ProtocolManagerFactory<Interceptor> pmf) {
server.removeProtocolManagerFactory(pmf);
}
@Override
public void stop() throws Exception {
ActiveMQComponent[] mqComponents = new ActiveMQComponent[components.size()];
components.values().toArray(mqComponents);
for (int i = mqComponents.length - 1; i >= 0; i--) {
mqComponents[i].stop();
}
}
@Override
public void start() throws Exception {
List<ActiveMQComponent> componentsByStartOrder = getComponentsByStartOrder(components);
for (ActiveMQComponent component : componentsByStartOrder) {
component.start();
}
}
@Override
public boolean isStarted() {
return server.isStarted();
}
};
ProtocolTracker trackerCust = new ProtocolTracker(name, context, requiredProtocols, callback);
tracker = new ServiceTracker(context, ProtocolManagerFactory.class, trackerCust);
tracker.open();
started = true;
}
private String[] getRequiredProtocols(Set<TransportConfiguration> acceptors) {
ArrayList<String> protocols = new ArrayList<String>();
for (TransportConfiguration acceptor : acceptors) {
String protoName = acceptor.getName().toUpperCase();
if (!"ARTEMIS".equals(protoName)) {
protocols.add(protoName);
}
}
return protocols.toArray(new String[]{});
}
public void stop() throws Exception {
if (!started) {
return;
}
tracker.close();
tracker = null;
}
public boolean isStarted() {
return started;
}
public Map<String, ActiveMQComponent> getComponents() {
return components;
}
/*
* this makes sure the components are started in the correct order. Its
* simple at the mo as e only have core and jms but will need impproving if
* we get more.
*/
public ArrayList<ActiveMQComponent> getComponentsByStartOrder(Map<String, ActiveMQComponent> components) {
ArrayList<ActiveMQComponent> activeMQComponents = new ArrayList<ActiveMQComponent>();
ActiveMQComponent jmsComponent = components.get("jms");
if (jmsComponent != null) {
activeMQComponents.add(jmsComponent);
}
activeMQComponents.add(components.get("core"));
return activeMQComponents;
}
public void register(BundleContext context, Dictionary<String, ?> properties) {
registrations = new HashMap<>();
for (Map.Entry<String, ActiveMQComponent> component : getComponents().entrySet()) {
String[] classes = getInterfaces(component.getValue());
Hashtable<String, Object> props = new Hashtable<>();
for (Enumeration<String> keyEnum = properties.keys(); keyEnum.hasMoreElements();) {
String key = keyEnum.nextElement();
Object val = properties.get(key);
props.put(key, val);
}
ServiceRegistration<?> registration = context.registerService(classes, component.getValue(), props);
registrations.put(component.getKey(), registration);
}
}
private String[] getInterfaces(ActiveMQComponent value) {
Set<String> interfaces = new HashSet<>();
getInterfaces(value.getClass(), interfaces);
return interfaces.toArray(new String[interfaces.size()]);
}
private void getInterfaces(Class<?> clazz, Set<String> interfaces) {
for (Class<?> itf : clazz.getInterfaces()) {
if (interfaces.add(itf.getName())) {
getInterfaces(itf, interfaces);
}
}
if (clazz.getSuperclass() != null) {
getInterfaces(clazz.getSuperclass(), interfaces);
}
}
public void unregister() {
if (registrations != null) {
for (ServiceRegistration<?> reg : registrations.values()) {
reg.unregister();
}
}
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.osgi;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
/**
* Tracks the available ProtocolManagerFactory services as well as the required protocols.
* When a new service appears the factory is added to the server.
* When all needed protocols are present the server is started.
* When required a service disappears the server is stopped.
*/
@SuppressWarnings("rawtypes")
public class ProtocolTracker implements ServiceTrackerCustomizer<ProtocolManagerFactory<Interceptor>, ProtocolManagerFactory<Interceptor>> {
private static Logger LOG = Logger.getLogger(ProtocolTracker.class.getName());
private String name;
private BundleContext context;
private Map<String, Boolean> protocols;
private ProtocolTrackerCallBack callback;
public ProtocolTracker(String name, BundleContext context, String[] requiredProtocols, ProtocolTrackerCallBack callback) {
this.name = name;
this.context = context;
this.callback = callback;
this.protocols = new HashMap<String, Boolean>();
for (String requiredProtocol : requiredProtocols) {
this.protocols.put(requiredProtocol, false);
}
LOG.info("Broker config " + name + " found. Tracking protocols " + Arrays.asList(requiredProtocols));
}
@Override
public ProtocolManagerFactory addingService(ServiceReference<ProtocolManagerFactory<Interceptor>> reference) {
ProtocolManagerFactory<Interceptor> pmf = context.getService(reference);
callback.addFactory(pmf);
for (String protocol : pmf.getProtocols()) {
protocolAdded(protocol);
}
return pmf;
}
@Override
public void modifiedService(ServiceReference<ProtocolManagerFactory<Interceptor>> reference, ProtocolManagerFactory<Interceptor> pmf) {
// Not supported
}
@Override
public void removedService(ServiceReference<ProtocolManagerFactory<Interceptor>> reference, ProtocolManagerFactory<Interceptor> pmf) {
for (String protocol : pmf.getProtocols()) {
protocolRemoved(protocol);
}
callback.removeFactory(pmf);
}
private void protocolAdded(String protocol) {
Boolean present = this.protocols.get(protocol);
if (present != null && !present) {
this.protocols.put(protocol, true);
List<String> missing = getMissing();
LOG.info("Required protocol " + protocol + " was added for broker " + name + ". " +
(missing.isEmpty() ? "Starting broker." : "Still waiting for " + missing));
if (missing.isEmpty()) {
try {
callback.start();
} catch (Exception e) {
LOG.log(Level.WARNING, "Error starting broker " + name, e);
}
}
}
}
private void protocolRemoved(String protocol) {
Boolean present = this.protocols.get(protocol);
if (present != null && present) {
List<String> missing = getMissing();
LOG.info("Required protocol " + protocol + " was removed for broker " + name + ". "
+ (missing.isEmpty() ? "Stopping broker. " : ""));
if (missing.isEmpty()) {
try {
callback.stop();
} catch (Exception e) {
LOG.log(Level.WARNING, "Error stopping broker " + name, e);
}
}
this.protocols.put(protocol, false);
}
}
private List<String> getMissing() {
List<String> missing = new ArrayList<String>();
for (String protocol : protocols.keySet()) {
Boolean present = protocols.get(protocol);
if (!present) {
missing.add(protocol);
}
}
return missing;
}
}

View File

@ -0,0 +1,10 @@
package org.apache.activemq.artemis.osgi;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
public interface ProtocolTrackerCallBack extends ActiveMQComponent {
void addFactory(ProtocolManagerFactory<Interceptor> pmf);
void removeFactory(ProtocolManagerFactory<Interceptor> pmf);
}

View File

@ -0,0 +1,58 @@
package org.apache.activemq.artemis.osgi;
import static org.easymock.EasyMock.expect;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
import org.junit.Test;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
@SuppressWarnings({"rawtypes", "unchecked"})
public class ProtocolTrackerTest {
@Test
public void testLifecycle() throws Exception {
IMocksControl c = EasyMock.createControl();
BundleContext context = c.createMock(BundleContext.class);
String[] requiredProtocols = {"a", "b"};
ProtocolTrackerCallBack callback = c.createMock(ProtocolTrackerCallBack.class);
RefFact protA = new RefFact(c, context, new String[]{"a"});
RefFact protB = new RefFact(c, context, new String[]{"b"});
callback.addFactory(protA.factory);
EasyMock.expectLastCall();
callback.addFactory(protB.factory);
EasyMock.expectLastCall();
callback.start();
EasyMock.expectLastCall();
callback.removeFactory(protA.factory);
callback.stop();
EasyMock.expectLastCall();
c.replay();
ProtocolTracker tracker = new ProtocolTracker("test", context, requiredProtocols, callback);
tracker.addingService(protA.ref);
tracker.addingService(protB.ref);
tracker.removedService(protA.ref, protA.factory);
c.verify();
}
class RefFact {
ServiceReference<ProtocolManagerFactory<Interceptor>> ref;
ProtocolManagerFactory factory;
public RefFact(IMocksControl c, BundleContext context, String[] protocols) {
ref = c.createMock(ServiceReference.class);
factory = c.createMock(ProtocolManagerFactory.class);
expect(factory.getProtocols()).andReturn(protocols).atLeastOnce();
expect(context.getService(ref)).andReturn(factory).atLeastOnce();
}
}
}

View File

@ -37,7 +37,6 @@ import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.stomp.WebSocketServerHandler;
import org.apache.activemq.artemis.core.remoting.impl.netty.ConnectionCreator;
import org.apache.activemq.artemis.core.remoting.impl.netty.HttpAcceptorHandler;
import org.apache.activemq.artemis.core.remoting.impl.netty.HttpKeepAliveRunnable;
@ -45,6 +44,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.protocol.stomp.WebSocketServerHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.utils.ConfigurationHelper;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.stomp;
package org.apache.activemq.artemis.core.server.protocol.stomp;
import java.nio.charset.StandardCharsets;

27
pom.xml
View File

@ -48,11 +48,13 @@
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
<module>artemis-maven-plugin</module>
<module>artemis-server-osgi</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>artemis-distribution</module>
<module>tests</module>
<module>artemis-features</module>
</modules>
<name>ActiveMQ Artemis Parent</name>
@ -489,8 +491,27 @@
<version>1.2</version>
<!-- License: Apache 2.0 -->
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- OSGi support -->
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
<version>6.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.cmpn</artifactId>
<version>6.0.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<profiles>
<profile>
@ -1039,6 +1060,12 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>3.0.0</version>
<extensions>true</extensions>
</plugin>
</plugins>
</build>