Switch to DS

This commit is contained in:
Christian Schneider 2015-11-24 12:30:20 +01:00
parent be9dad3f2b
commit 61d1f49c70
7 changed files with 37 additions and 353 deletions

View File

@ -313,10 +313,10 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
* Otherwise, returns the argument.
*/
private static <T> T checkNotNull(T arg, String text) {
if (arg == null) {
throw new NullPointerException(text);
}
return arg;
if (arg == null) {
throw new NullPointerException(text);
}
return arg;
}
/**

View File

@ -112,7 +112,6 @@
<configuration>
<instructions>
<Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>
<Bundle-Activator>org.apache.activemq.artemis.osgi.Activator</Bundle-Activator>
<_exportcontents>*</_exportcontents>
</instructions>
</configuration>

View File

@ -1,30 +0,0 @@
package org.apache.activemq.artemis.osgi;
import java.util.Hashtable;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.cm.ManagedServiceFactory;
public class Activator implements BundleActivator {
ArtemisBrokerFactory factory;
ServiceRegistration<ManagedServiceFactory> registration;
@Override
public void start(BundleContext context) throws Exception {
factory = new ArtemisBrokerFactory(context);
Hashtable<String, String> props = new Hashtable<>();
props.put(Constants.SERVICE_PID, ArtemisBrokerFactory.PID);
registration = context.registerService(ManagedServiceFactory.class, factory, props);
}
@Override
public void stop(BundleContext context) throws Exception {
registration.unregister();
factory.destroy();
}
}

View File

@ -1,85 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.osgi;
import java.util.Dictionary;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.jboss.logging.Logger;
import org.osgi.framework.BundleContext;
public class ArtemisBrokerFactory extends BaseManagedServiceFactory<OsgiBroker, OsgiBroker> {
public static final String PID = "org.apache.activemq.artemis";
private static final Logger LOGGER = Logger.getLogger(ArtemisBrokerFactory.class);
public ArtemisBrokerFactory(BundleContext context) {
super(context, ArtemisBrokerFactory.class.getName());
}
@Override
protected OsgiBroker doCreate(Dictionary<String, ?> properties) throws Exception {
String config = getMandatory(properties, "config");
String name = getMandatory(properties, "name");
String domain = getMandatory(properties, "domain");
ActiveMQJAASSecurityManager security = new ActiveMQJAASSecurityManager(domain);
String serverInstanceDir = null;
String karafDataDir = System.getProperty("karaf.data");
if (karafDataDir != null) {
serverInstanceDir = karafDataDir + "/artemis/" + name;
}
OsgiBroker server = new OsgiBroker(getContext(), name, serverInstanceDir, config, security);
server.start();
return server;
}
private String getMandatory(Dictionary<String, ?> properties, String key) {
String value = (String) properties.get(key);
if (value == null) {
throw new IllegalStateException("Property " + key + " must be set");
}
return value;
}
@Override
protected void doDestroy(OsgiBroker broker) throws Exception {
broker.stop();
}
@Override
protected OsgiBroker register(OsgiBroker broker, Dictionary<String, ?> properties) {
broker.register(getContext(), properties);
return broker;
}
@Override
protected void unregister(OsgiBroker broker) {
broker.unregister();
}
@Override
protected void warn(String message, Throwable t) {
LOGGER.warn(message, t);
}
@Override
protected void info(String message, Throwable t) {
LOGGER.info(message, t);
}
}

View File

@ -1,195 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.osgi;
import java.util.Dictionary;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.osgi.framework.BundleContext;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.cm.ManagedServiceFactory;
public abstract class BaseManagedServiceFactory<T, R> implements ManagedServiceFactory {
public static final long DEFAULT_TIMEOUT_BEFORE_INTERRUPT = 30000;
private final BundleContext context;
private final String name;
private final long timeoutBeforeInterrupt;
private final AtomicBoolean destroyed;
private final ExecutorService executor;
private final Map<String, Pair<T, R>> services;
public BaseManagedServiceFactory(BundleContext context, String name) {
this(context, name, DEFAULT_TIMEOUT_BEFORE_INTERRUPT);
}
public BaseManagedServiceFactory(BundleContext context, String name, long timeoutBeforeInterrupt) {
this.context = context;
this.name = name;
this.timeoutBeforeInterrupt = timeoutBeforeInterrupt;
this.destroyed = new AtomicBoolean(false);
this.executor = Executors.newSingleThreadExecutor();
this.services = new ConcurrentHashMap<>();
}
public String getName() {
return name;
}
public BundleContext getContext() {
return context;
}
public void updated(final String pid, final Dictionary<String, ?> properties) throws ConfigurationException {
if (destroyed.get()) {
return;
}
executor.submit(new Runnable() {
public void run() {
try {
internalUpdate(pid, properties);
} catch (Throwable t) {
warn("Error destroying service for ManagedServiceFactory " + getName(), t);
}
}
});
}
public void deleted(final String pid) {
if (destroyed.get()) {
return;
}
executor.submit(new Runnable() {
public void run() {
try {
internalDelete(pid);
} catch (Throwable throwable) {
warn("Error destroying service for ManagedServiceFactory " + getName(), throwable);
}
}
});
}
private void internalUpdate(String pid, Dictionary<String, ?> properties) {
Pair<T, R> pair = services.remove(pid);
if (pair != null) {
internalDelete(pid);
}
if (destroyed.get()) {
return;
}
try {
T t = doCreate(properties);
try {
if (destroyed.get()) {
throw new IllegalStateException("ManagedServiceFactory has been destroyed");
}
R registration = register(t, properties);
services.put(pid, new Pair<>(t, registration));
} catch (Throwable throwable1) {
try {
doDestroy(t);
} catch (Throwable throwable2) {
// Ignore
}
throw throwable1;
}
} catch (Throwable throwable) {
warn("Error creating service for ManagedServiceFactory " + getName(), throwable);
}
}
private void internalDelete(String pid) {
Pair<T, R> pair = services.remove(pid);
if (pair != null) {
try {
unregister(pair.getSecond());
} catch (Throwable t) {
info("Error unregistering service", t);
}
try {
doDestroy(pair.getFirst());
} catch (Throwable t) {
info("Error destroying service", t);
}
}
}
protected abstract T doCreate(Dictionary<String, ?> properties) throws Exception;
protected abstract void doDestroy(T t) throws Exception;
protected abstract R register(T t, Dictionary<String, ?> properties);
protected abstract void unregister(R registration);
protected abstract void warn(String message, Throwable t);
protected abstract void info(String message, Throwable t);
public void destroy() {
if (destroyed.compareAndSet(false, true)) {
executor.shutdown();
try {
executor.awaitTermination(timeoutBeforeInterrupt, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Shutdown interrupted");
}
if (!executor.isTerminated()) {
executor.shutdownNow();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Shutdown interrupted");
}
}
while (!services.isEmpty()) {
String pid = services.keySet().iterator().next();
internalDelete(pid);
}
}
}
static class Pair<U,V> {
private U first;
private V second;
public Pair(U first, V second) {
this.first = first;
this.second = second;
}
public U getFirst() {
return first;
}
public V getSecond() {
return second;
}
public void setFirst(U first) {
this.first = first;
}
public void setSecond(V second) {
this.second = second;
}
}
}

View File

@ -36,43 +36,36 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.util.tracker.ServiceTracker;
@SuppressWarnings({"unchecked", "rawtypes"})
@Component(configurationPid="org.apache.activemq.artemis")
public class OsgiBroker {
private final String name;
private final String configurationUrl;
private final String brokerInstance;
private boolean started;
private final ActiveMQSecurityManager securityManager;
private String name;
private String configurationUrl;
private Map<String, ActiveMQComponent> components;
private Map<String, ServiceRegistration<?>> registrations;
private BundleContext context;
private ServiceTracker tracker;
public OsgiBroker(BundleContext context, String name, String brokerInstance, String configuration, ActiveMQSecurityManager security) {
this.context = context;
this.name = name;
this.brokerInstance = brokerInstance;
this.securityManager = security;
this.configurationUrl = configuration;
}
public synchronized void start() throws Exception {
if (tracker != null) {
return;
@Activate
public void activate(ComponentContext cctx) throws Exception {
BundleContext context = cctx.getBundleContext();
Dictionary<String, Object> properties = cctx.getProperties();
configurationUrl = getMandatory(properties, "config");
name = getMandatory(properties, "name");
String domain = getMandatory(properties, "domain");
ActiveMQJAASSecurityManager security = new ActiveMQJAASSecurityManager(domain);
String brokerInstance = null;
String karafDataDir = System.getProperty("karaf.data");
if (karafDataDir != null) {
brokerInstance = karafDataDir + "/artemis/" + name;
}
// todo if we start to pullout more configs from the main config then we
@ -86,7 +79,7 @@ public class OsgiBroker {
FileDeploymentManager fileDeploymentManager = new FileDeploymentManager(configurationUrl);
fileDeploymentManager.addDeployable(configuration).addDeployable(jmsConfiguration).readConfiguration();
components = fileDeploymentManager.buildService(securityManager, ManagementFactory.getPlatformMBeanServer());
components = fileDeploymentManager.buildService(security, ManagementFactory.getPlatformMBeanServer());
final ActiveMQServer server = (ActiveMQServer)components.get("core");
@ -129,9 +122,17 @@ public class OsgiBroker {
ProtocolTracker trackerCust = new ProtocolTracker(name, context, requiredProtocols, callback);
tracker = new ServiceTracker(context, ProtocolManagerFactory.class, trackerCust);
tracker.open();
started = true;
}
private String getMandatory(Dictionary<String, ?> properties, String key) {
String value = (String) properties.get(key);
if (value == null) {
throw new IllegalStateException("Property " + key + " must be set");
}
return value;
}
private String[] getRequiredProtocols(Set<TransportConfiguration> acceptors) {
ArrayList<String> protocols = new ArrayList<String>();
for (TransportConfiguration acceptor : acceptors) {
@ -143,16 +144,9 @@ public class OsgiBroker {
return protocols.toArray(new String[]{});
}
@Deactivate
public void stop() throws Exception {
if (!started) {
return;
}
tracker.close();
tracker = null;
}
public boolean isStarted() {
return started;
}
public Map<String, ActiveMQComponent> getComponents() {

View File

@ -32,6 +32,7 @@ public class ProtocolTrackerTest {
EasyMock.expectLastCall();
callback.removeFactory(protA.factory);
EasyMock.expectLastCall();
callback.stop();
EasyMock.expectLastCall();