This closes #839
This commit is contained in:
commit
596f355b20
|
@ -81,6 +81,24 @@ public class JDBCUtils {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static SQLProvider.Factory getSQLProviderFactory(String url) {
|
||||||
|
SQLProvider.Factory factory;
|
||||||
|
if (url.contains("derby")) {
|
||||||
|
logger.tracef("getSQLProvider Returning Derby SQL provider for url::%s", url);
|
||||||
|
factory = new DerbySQLProvider.Factory();
|
||||||
|
} else if (url.contains("postgres")) {
|
||||||
|
logger.tracef("getSQLProvider Returning postgres SQL provider for url::%s", url);
|
||||||
|
factory = new PostgresSQLProvider.Factory();
|
||||||
|
} else if (url.contains("mysql")) {
|
||||||
|
logger.tracef("getSQLProvider Returning mysql SQL provider for url::%s", url);
|
||||||
|
factory = new MySQLSQLProvider.Factory();
|
||||||
|
} else {
|
||||||
|
logger.tracef("getSQLProvider Returning generic SQL provider for url::%s", url);
|
||||||
|
factory = new GenericSQLProvider.Factory();
|
||||||
|
}
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
public static SQLProvider getSQLProvider(String driverClass, String tableName) {
|
public static SQLProvider getSQLProvider(String driverClass, String tableName) {
|
||||||
SQLProvider.Factory factory;
|
SQLProvider.Factory factory;
|
||||||
if (driverClass.contains("derby")) {
|
if (driverClass.contains("derby")) {
|
||||||
|
|
|
@ -0,0 +1,79 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.osgi;
|
||||||
|
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.logging.Level;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
|
||||||
|
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
|
||||||
|
import org.osgi.framework.BundleContext;
|
||||||
|
import org.osgi.framework.ServiceReference;
|
||||||
|
import org.osgi.util.tracker.ServiceTrackerCustomizer;
|
||||||
|
|
||||||
|
public class DataSourceTracker implements ServiceTrackerCustomizer<DataSource, DataSource> {
|
||||||
|
|
||||||
|
private static final Logger LOG = Logger.getLogger(ProtocolTracker.class.getName());
|
||||||
|
private final String name;
|
||||||
|
private final BundleContext context;
|
||||||
|
private final DatabaseStorageConfiguration dsc;
|
||||||
|
private final ServerTrackerCallBack callback;
|
||||||
|
|
||||||
|
public DataSourceTracker(String name, BundleContext context, DatabaseStorageConfiguration dsc,
|
||||||
|
ServerTrackerCallBack callback) {
|
||||||
|
this.name = name;
|
||||||
|
this.context = context;
|
||||||
|
this.dsc = dsc;
|
||||||
|
this.callback = callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DataSource addingService(ServiceReference<DataSource> reference) {
|
||||||
|
DataSource dataSource = context.getService(reference);
|
||||||
|
dsc.setDataSource(dataSource);
|
||||||
|
try (Connection conn = dataSource.getConnection()) {
|
||||||
|
dsc.setSqlProvider(JDBCUtils.getSQLProviderFactory(conn.getMetaData().getURL()));
|
||||||
|
} catch (SQLException ex) {
|
||||||
|
LOG.log(Level.WARNING, "Error getting dataSource provider infos", ex);
|
||||||
|
}
|
||||||
|
callback.setDataSourceDependency(false);
|
||||||
|
try {
|
||||||
|
callback.start();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.log(Level.WARNING, "Error starting broker " + name, ex);
|
||||||
|
}
|
||||||
|
return dataSource;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void modifiedService(ServiceReference<DataSource> reference, DataSource service) {
|
||||||
|
// not supported
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removedService(ServiceReference<DataSource> reference, DataSource service) {
|
||||||
|
callback.setDataSourceDependency(true);
|
||||||
|
try {
|
||||||
|
callback.stop();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.log(Level.WARNING, "Error stopping broker " + name, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,7 +31,10 @@ import java.util.Set;
|
||||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
|
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
|
||||||
|
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.StoreConfiguration.StoreType;
|
||||||
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
|
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
@ -57,6 +60,7 @@ public class OsgiBroker {
|
||||||
private Map<String, ActiveMQComponent> components;
|
private Map<String, ActiveMQComponent> components;
|
||||||
private Map<String, ServiceRegistration<?>> registrations;
|
private Map<String, ServiceRegistration<?>> registrations;
|
||||||
private ServiceTracker tracker;
|
private ServiceTracker tracker;
|
||||||
|
private ServiceTracker dataSourceTracker;
|
||||||
|
|
||||||
@Activate
|
@Activate
|
||||||
public void activate(ComponentContext cctx) throws Exception {
|
public void activate(ComponentContext cctx) throws Exception {
|
||||||
|
@ -92,42 +96,23 @@ public class OsgiBroker {
|
||||||
final ActiveMQServer server = (ActiveMQServer) components.get("core");
|
final ActiveMQServer server = (ActiveMQServer) components.get("core");
|
||||||
|
|
||||||
String[] requiredProtocols = getRequiredProtocols(server.getConfiguration().getAcceptorConfigurations());
|
String[] requiredProtocols = getRequiredProtocols(server.getConfiguration().getAcceptorConfigurations());
|
||||||
ProtocolTrackerCallBack callback = new ProtocolTrackerCallBack() {
|
ServerTrackerCallBack callback = new ServerTrackerCallBackImpl(server, context, properties);
|
||||||
|
|
||||||
@Override
|
StoreConfiguration storeConfiguration = server.getConfiguration().getStoreConfiguration();
|
||||||
public void addFactory(ProtocolManagerFactory<Interceptor> pmf) {
|
String dataSourceName = String.class.cast(properties.get("dataSourceName"));
|
||||||
server.addProtocolManagerFactory(pmf);
|
|
||||||
|
if (storeConfiguration != null &&
|
||||||
|
storeConfiguration.getStoreType() == StoreType.DATABASE && dataSourceName != null &&
|
||||||
|
!dataSourceName.isEmpty()) {
|
||||||
|
callback.setDataSourceDependency(true);
|
||||||
|
String filter = "(&(objectClass=javax.sql.DataSource)(osgi.jndi.service.name=" + dataSourceName + "))";
|
||||||
|
DataSourceTracker trackerCust =
|
||||||
|
new DataSourceTracker(name, context, DatabaseStorageConfiguration.class.cast(storeConfiguration),
|
||||||
|
(ServerTrackerCallBack) callback);
|
||||||
|
dataSourceTracker = new ServiceTracker(context, context.createFilter(filter), trackerCust);
|
||||||
|
dataSourceTracker.open();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void removeFactory(ProtocolManagerFactory<Interceptor> pmf) {
|
|
||||||
server.removeProtocolManagerFactory(pmf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop() throws Exception {
|
|
||||||
ActiveMQComponent[] mqComponents = new ActiveMQComponent[components.size()];
|
|
||||||
components.values().toArray(mqComponents);
|
|
||||||
for (int i = mqComponents.length - 1; i >= 0; i--) {
|
|
||||||
mqComponents[i].stop();
|
|
||||||
}
|
|
||||||
unregister();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() throws Exception {
|
|
||||||
List<ActiveMQComponent> componentsByStartOrder = getComponentsByStartOrder(components);
|
|
||||||
for (ActiveMQComponent component : componentsByStartOrder) {
|
|
||||||
component.start();
|
|
||||||
}
|
|
||||||
register(context, properties);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isStarted() {
|
|
||||||
return server.isStarted();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
ProtocolTracker trackerCust = new ProtocolTracker(name, context, requiredProtocols, callback);
|
ProtocolTracker trackerCust = new ProtocolTracker(name, context, requiredProtocols, callback);
|
||||||
tracker = new ServiceTracker(context, ProtocolManagerFactory.class, trackerCust);
|
tracker = new ServiceTracker(context, ProtocolManagerFactory.class, trackerCust);
|
||||||
tracker.open();
|
tracker.open();
|
||||||
|
@ -160,6 +145,9 @@ public class OsgiBroker {
|
||||||
@Deactivate
|
@Deactivate
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
tracker.close();
|
tracker.close();
|
||||||
|
if (dataSourceTracker != null) {
|
||||||
|
dataSourceTracker.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, ActiveMQComponent> getComponents() {
|
public Map<String, ActiveMQComponent> getComponents() {
|
||||||
|
@ -220,4 +208,66 @@ public class OsgiBroker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class ServerTrackerCallBackImpl implements ServerTrackerCallBack {
|
||||||
|
|
||||||
|
private volatile boolean dataSourceDependency = false;
|
||||||
|
|
||||||
|
private final ActiveMQServer server;
|
||||||
|
private final BundleContext context;
|
||||||
|
private final Dictionary<String, Object> properties;
|
||||||
|
|
||||||
|
ServerTrackerCallBackImpl(ActiveMQServer server, BundleContext context,
|
||||||
|
Dictionary<String, Object> properties) {
|
||||||
|
this.server = server;
|
||||||
|
this.context = context;
|
||||||
|
this.properties = properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addFactory(ProtocolManagerFactory<Interceptor> pmf) {
|
||||||
|
server.addProtocolManagerFactory(pmf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeFactory(ProtocolManagerFactory<Interceptor> pmf) {
|
||||||
|
server.removeProtocolManagerFactory(pmf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() throws Exception {
|
||||||
|
ActiveMQComponent[] mqComponents = new ActiveMQComponent[components.size()];
|
||||||
|
components.values().toArray(mqComponents);
|
||||||
|
for (int i = mqComponents.length - 1; i >= 0; i--) {
|
||||||
|
mqComponents[i].stop();
|
||||||
|
}
|
||||||
|
unregister();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() throws Exception {
|
||||||
|
if (!dataSourceDependency) {
|
||||||
|
List<ActiveMQComponent> componentsByStartOrder = getComponentsByStartOrder(components);
|
||||||
|
for (ActiveMQComponent component : componentsByStartOrder) {
|
||||||
|
component.start();
|
||||||
|
}
|
||||||
|
register(context, properties);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isStarted() {
|
||||||
|
return server.isStarted();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDataSourceDependency(boolean dataSourceDependency) {
|
||||||
|
this.dataSourceDependency = dataSourceDependency;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,12 +43,12 @@ public class ProtocolTracker implements ServiceTrackerCustomizer<ProtocolManager
|
||||||
private String name;
|
private String name;
|
||||||
private BundleContext context;
|
private BundleContext context;
|
||||||
private Map<String, Boolean> protocols;
|
private Map<String, Boolean> protocols;
|
||||||
private ProtocolTrackerCallBack callback;
|
private ServerTrackerCallBack callback;
|
||||||
|
|
||||||
public ProtocolTracker(String name,
|
public ProtocolTracker(String name,
|
||||||
BundleContext context,
|
BundleContext context,
|
||||||
String[] requiredProtocols,
|
String[] requiredProtocols,
|
||||||
ProtocolTrackerCallBack callback) {
|
ServerTrackerCallBack callback) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
|
|
|
@ -20,9 +20,12 @@ import org.apache.activemq.artemis.api.core.Interceptor;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||||
|
|
||||||
public interface ProtocolTrackerCallBack extends ActiveMQComponent {
|
public interface ServerTrackerCallBack extends ActiveMQComponent {
|
||||||
|
|
||||||
void addFactory(ProtocolManagerFactory<Interceptor> pmf);
|
void addFactory(ProtocolManagerFactory<Interceptor> pmf);
|
||||||
|
|
||||||
void removeFactory(ProtocolManagerFactory<Interceptor> pmf);
|
void removeFactory(ProtocolManagerFactory<Interceptor> pmf);
|
||||||
|
|
||||||
|
void setDataSourceDependency(boolean dataSourceDependency);
|
||||||
|
|
||||||
}
|
}
|
|
@ -34,7 +34,7 @@ public class ProtocolTrackerTest {
|
||||||
IMocksControl c = EasyMock.createControl();
|
IMocksControl c = EasyMock.createControl();
|
||||||
BundleContext context = c.createMock(BundleContext.class);
|
BundleContext context = c.createMock(BundleContext.class);
|
||||||
String[] requiredProtocols = {"a", "b"};
|
String[] requiredProtocols = {"a", "b"};
|
||||||
ProtocolTrackerCallBack callback = c.createMock(ProtocolTrackerCallBack.class);
|
ServerTrackerCallBack callback = c.createMock(ServerTrackerCallBack.class);
|
||||||
|
|
||||||
RefFact protA = new RefFact(c, context, new String[]{"a"});
|
RefFact protA = new RefFact(c, context, new String[]{"a"});
|
||||||
RefFact protB = new RefFact(c, context, new String[]{"b"});
|
RefFact protB = new RefFact(c, context, new String[]{"b"});
|
||||||
|
|
|
@ -29,7 +29,7 @@ public class FileStorageConfiguration implements StoreConfiguration {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StoreType getStoreType() {
|
public StoreType getStoreType() {
|
||||||
return StoreType.DATABASE;
|
return StoreType.FILE;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getMessageTableName() {
|
public String getMessageTableName() {
|
||||||
|
|
Loading…
Reference in New Issue