ARTEMIS-601 Implementing reload manager on JMS Destinations and Address Settings
This commit is contained in:
parent
155a345d3c
commit
04d482037c
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.jms.server.config;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
|
||||
public interface JMSConfiguration {
|
||||
|
@ -35,4 +36,9 @@ public interface JMSConfiguration {
|
|||
String getDomain();
|
||||
|
||||
JMSConfiguration setDomain(String domain);
|
||||
|
||||
URL getConfigurationUrl();
|
||||
|
||||
JMSConfiguration setConfigurationUrl(URL configurationUrl);
|
||||
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ public class FileJMSConfiguration extends JMSConfigurationImpl implements Deploy
|
|||
@Override
|
||||
public void parse(Element config, URL url) throws Exception {
|
||||
parseConfiguration(config);
|
||||
setConfigurationUrl(url);
|
||||
parsed = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.jms.server.config.impl;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -35,6 +36,8 @@ public class JMSConfigurationImpl implements JMSConfiguration {
|
|||
|
||||
private String domain = ActiveMQDefaultConfiguration.getDefaultJmxDomain();
|
||||
|
||||
private URL configurationUrl;
|
||||
|
||||
// JMSConfiguration implementation -------------------------------
|
||||
|
||||
public JMSConfigurationImpl() {
|
||||
|
@ -83,4 +86,15 @@ public class JMSConfigurationImpl implements JMSConfiguration {
|
|||
this.domain = domain;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public URL getConfigurationUrl() {
|
||||
return configurationUrl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JMSConfiguration setConfigurationUrl(URL configurationUrl) {
|
||||
this.configurationUrl = configurationUrl;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,11 @@ import javax.json.JsonArrayBuilder;
|
|||
import javax.json.JsonObject;
|
||||
import javax.naming.NamingException;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.Reader;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URL;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -59,6 +63,8 @@ import org.apache.activemq.artemis.core.server.QueueCreator;
|
|||
import org.apache.activemq.artemis.core.server.QueueDeleter;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.ResourceManager;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
@ -82,6 +88,7 @@ import org.apache.activemq.artemis.jms.server.config.JMSConfiguration;
|
|||
import org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration;
|
||||
import org.apache.activemq.artemis.jms.server.config.TopicConfiguration;
|
||||
import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
|
||||
import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration;
|
||||
import org.apache.activemq.artemis.jms.server.management.JMSManagementService;
|
||||
import org.apache.activemq.artemis.jms.server.management.JMSNotificationType;
|
||||
import org.apache.activemq.artemis.jms.server.management.impl.JMSManagementServiceImpl;
|
||||
|
@ -91,6 +98,8 @@ import org.apache.activemq.artemis.utils.JsonLoader;
|
|||
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
||||
import org.apache.activemq.artemis.utils.TimeAndCounterIDGenerator;
|
||||
import org.apache.activemq.artemis.utils.TypedProperties;
|
||||
import org.apache.activemq.artemis.utils.XMLUtil;
|
||||
import org.w3c.dom.Element;
|
||||
|
||||
/**
|
||||
* A Deployer used to create and add to Bindings queues, topics and connection
|
||||
|
@ -204,6 +213,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
|||
// do not clear the cachedCommands - HORNETQ-1047
|
||||
|
||||
recoverBindings();
|
||||
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
active = false;
|
||||
|
@ -394,6 +405,11 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
|||
*/
|
||||
startCalled = true;
|
||||
server.start();
|
||||
ReloadManager reloadManager = server.getReloadManager();
|
||||
if (config != null && config.getConfigurationUrl() != null && reloadManager != null) {
|
||||
reloadManager.addCallback(config.getConfigurationUrl(), new JMSReloader());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
@ -1741,4 +1757,25 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class JMSReloader implements ReloadCallback {
|
||||
@Override
|
||||
public void reload(URL url) throws Exception {
|
||||
InputStream input = url.openStream();
|
||||
Reader reader = new InputStreamReader(input);
|
||||
String xml = XMLUtil.readerToString(reader);
|
||||
xml = XMLUtil.replaceSystemProps(xml);
|
||||
Element e = XMLUtil.stringToElement(xml);
|
||||
|
||||
if (config instanceof FileJMSConfiguration) {
|
||||
((FileJMSConfiguration)config).parse(e, url);
|
||||
|
||||
JMSServerManagerImpl.this.deploy();
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
|||
import org.apache.activemq.artemis.core.server.impl.Activation;
|
||||
import org.apache.activemq.artemis.core.server.impl.ConnectorsService;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.ResourceManager;
|
||||
|
@ -379,6 +380,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
|
|||
* */
|
||||
void removeProtocolManagerFactory(ProtocolManagerFactory factory);
|
||||
|
||||
ReloadManager getReloadManager();
|
||||
|
||||
ActiveMQServer createBackupServer(Configuration configuration);
|
||||
|
||||
void addScaledDownNode(SimpleString scaledDownNodeId);
|
||||
|
|
|
@ -129,6 +129,9 @@ import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
|
|||
import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
||||
|
@ -244,6 +247,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
private MemoryManager memoryManager;
|
||||
|
||||
private ReloadManager reloadManager;
|
||||
|
||||
/**
|
||||
* This will be set by the JMS Queue Manager.
|
||||
*/
|
||||
|
@ -374,6 +379,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
this.serviceRegistry = serviceRegistry == null ? new ServiceRegistryImpl() : serviceRegistry;
|
||||
}
|
||||
|
||||
public ReloadManager getReloadManager() {
|
||||
return reloadManager;
|
||||
}
|
||||
|
||||
// life-cycle methods
|
||||
// ----------------------------------------------------------------
|
||||
|
||||
|
@ -453,8 +462,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice, serviceRegistry);
|
||||
connectorsService.start();
|
||||
|
||||
this.reloadManager = new ReloadManagerImpl(getScheduledPool(), configuration.getConfigurationFileRefreshPeriod());
|
||||
|
||||
if (configuration.getConfigurationUrl() != null && getScheduledPool() != null) {
|
||||
getScheduledPool().scheduleWithFixedDelay(new ConfigurationFileReloader(this), configuration.getConfigurationFileRefreshPeriod(), configuration.getConfigurationFileRefreshPeriod(), TimeUnit.MILLISECONDS);
|
||||
reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader());
|
||||
}
|
||||
}
|
||||
finally {
|
||||
|
@ -2358,38 +2369,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
}
|
||||
|
||||
private final class ConfigurationFileReloader implements Runnable {
|
||||
long lastModified;
|
||||
boolean first = true;
|
||||
ActiveMQServer server;
|
||||
|
||||
ConfigurationFileReloader(ActiveMQServer server) {
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
private final class ConfigurationFileReloader implements ReloadCallback {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
URL url = server.getConfiguration().getConfigurationUrl();
|
||||
long currentLastModified = new File(url.getPath()).lastModified();
|
||||
if (first) {
|
||||
first = false;
|
||||
lastModified = currentLastModified;
|
||||
return;
|
||||
}
|
||||
if (currentLastModified > lastModified) {
|
||||
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
|
||||
ActiveMQServerLogger.LOGGER.debug("Configuration file change detected. Reloading...");
|
||||
}
|
||||
Configuration config = new FileConfigurationParser().parseMainConfig(url.openStream());
|
||||
public void reload(URL uri) throws Exception {
|
||||
Configuration config = new FileConfigurationParser().parseMainConfig(uri.openStream());
|
||||
securityRepository.swap(config.getSecurityRoles().entrySet());
|
||||
|
||||
lastModified = currentLastModified;
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.configurationReloadFailed(e);
|
||||
}
|
||||
addressSettingsRepository.swap(config.getAddressesSettings().entrySet());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.reload;
|
||||
|
||||
import java.net.URL;
|
||||
|
||||
public interface ReloadCallback {
|
||||
void reload(URL uri) throws Exception;
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.reload;
|
||||
|
||||
import java.net.URL;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
|
||||
public interface ReloadManager extends ActiveMQComponent {
|
||||
void addCallback(URL uri, ReloadCallback callback);
|
||||
|
||||
/** Callback for the next tick */
|
||||
void setTick(Runnable callback);
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.reload;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.URL;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class ReloadManagerImpl implements ReloadManager {
|
||||
private static final Logger logger = Logger.getLogger(ReloadManagerImpl.class);
|
||||
|
||||
private final ScheduledExecutorService scheduledExecutorService;
|
||||
private final long checkPeriod;
|
||||
private ScheduledFuture future;
|
||||
private volatile Runnable tick;
|
||||
|
||||
private Map<URL, ReloadRegistry> registry = new HashMap<>();
|
||||
|
||||
public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, long checkPeriod) {
|
||||
this.scheduledExecutorService = scheduledExecutorService;
|
||||
this.checkPeriod = checkPeriod;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
if (future != null) {
|
||||
return;
|
||||
}
|
||||
future = scheduledExecutorService.scheduleWithFixedDelay(new ConfigurationFileReloader(), checkPeriod, checkPeriod, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setTick(Runnable tick) {
|
||||
this.tick = tick;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
if (future == null) {
|
||||
return; // no big deal
|
||||
}
|
||||
|
||||
future.cancel(false);
|
||||
future = null;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean isStarted() {
|
||||
return future != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void addCallback(URL uri, ReloadCallback callback) {
|
||||
if (future == null) {
|
||||
start();
|
||||
}
|
||||
ReloadRegistry uriRegistry = getRegistry(uri);
|
||||
uriRegistry.add(callback);
|
||||
}
|
||||
|
||||
private synchronized void tick() {
|
||||
for (ReloadRegistry item : registry.values()) {
|
||||
item.check();
|
||||
}
|
||||
|
||||
if (tick != null) {
|
||||
tick.run();
|
||||
tick = null;
|
||||
}
|
||||
}
|
||||
|
||||
private ReloadRegistry getRegistry(URL uri) {
|
||||
ReloadRegistry uriRegistry = registry.get(uri);
|
||||
if (uriRegistry == null) {
|
||||
uriRegistry = new ReloadRegistry(uri);
|
||||
registry.put(uri, uriRegistry);
|
||||
}
|
||||
|
||||
return uriRegistry;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private final class ConfigurationFileReloader implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
tick();
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.configurationReloadFailed(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ReloadRegistry {
|
||||
private final File file;
|
||||
private final URL uri;
|
||||
private volatile long lastModified;
|
||||
|
||||
private final List<ReloadCallback> callbacks = new LinkedList<>();
|
||||
|
||||
ReloadRegistry(URL uri) {
|
||||
this.file = new File(uri.getPath());
|
||||
this.uri = uri;
|
||||
}
|
||||
|
||||
public void check() {
|
||||
|
||||
long fileModified = file.lastModified();
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Validating lastModified " + lastModified + " modified = " + fileModified + " on " + uri);
|
||||
}
|
||||
|
||||
if (lastModified > 0 && fileModified > lastModified) {
|
||||
|
||||
for (ReloadCallback callback : callbacks) {
|
||||
try {
|
||||
callback.reload(uri);
|
||||
}
|
||||
catch (Throwable e) {
|
||||
ActiveMQServerLogger.LOGGER.configurationReloadFailed(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.lastModified = fileModified;
|
||||
}
|
||||
|
||||
|
||||
public List<ReloadCallback> getCallbacks() {
|
||||
return callbacks;
|
||||
}
|
||||
|
||||
public void add(ReloadCallback callback) {
|
||||
callbacks.add(callback);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.reload;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ReloadManagerTest extends ActiveMQTestBase {
|
||||
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
|
||||
private ReloadManagerImpl manager;
|
||||
|
||||
@Before
|
||||
public void startScheduled() {
|
||||
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
|
||||
manager = new ReloadManagerImpl(scheduledExecutorService, 100);
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopScheduled() {
|
||||
manager.stop();
|
||||
scheduledExecutorService.shutdown();
|
||||
scheduledExecutorService = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdate() throws Exception {
|
||||
|
||||
File file = new File(getTemporaryDir(), "checkFile.tst");
|
||||
internalTest(manager, file);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateWithSpace() throws Exception {
|
||||
File spaceDir = new File(getTemporaryDir(), "./with space");
|
||||
spaceDir.mkdirs();
|
||||
File file = new File(spaceDir, "checkFile.tst");
|
||||
internalTest(manager, file);
|
||||
}
|
||||
|
||||
private void internalTest(ReloadManagerImpl manager, File file) throws IOException, InterruptedException {
|
||||
file.createNewFile();
|
||||
|
||||
final ReusableLatch latch = new ReusableLatch(1);
|
||||
|
||||
manager.addCallback(file.toURL(), new ReloadCallback() {
|
||||
@Override
|
||||
public void reload(URL uri) {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertFalse(latch.await(1, TimeUnit.SECONDS));
|
||||
|
||||
file.setLastModified(System.currentTimeMillis());
|
||||
|
||||
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.jms;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class RedeployTest extends ActiveMQTestBase {
|
||||
|
||||
@Test
|
||||
public void testRedeploy() throws Exception {
|
||||
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-test-jms.xml");
|
||||
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-test-updated-jms.xml");
|
||||
Files.copy(url1.openStream(), brokerXML);
|
||||
|
||||
EmbeddedJMS embeddedJMS = new EmbeddedJMS();
|
||||
embeddedJMS.setConfigResourcePath(brokerXML.toUri().toString());
|
||||
embeddedJMS.start();
|
||||
|
||||
final ReusableLatch latch = new ReusableLatch(1);
|
||||
|
||||
Runnable tick = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick);
|
||||
|
||||
try {
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
Assert.assertEquals("jms.queue.DLQ", embeddedJMS.getActiveMQServer().getAddressSettingsRepository().getMatch("jms").getDeadLetterAddress().toString());
|
||||
Assert.assertEquals("jms.queue.ExpiryQueue", embeddedJMS.getActiveMQServer().getAddressSettingsRepository().getMatch("jms").getExpiryAddress().toString());
|
||||
Assert.assertFalse(tryConsume());
|
||||
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
|
||||
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
|
||||
latch.setCount(1);
|
||||
embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick);
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
Assert.assertTrue(tryConsume());
|
||||
|
||||
Assert.assertEquals("jms.queue.NewQueue", embeddedJMS.getActiveMQServer().getAddressSettingsRepository().getMatch("jms").getDeadLetterAddress().toString());
|
||||
Assert.assertEquals("jms.queue.NewQueue", embeddedJMS.getActiveMQServer().getAddressSettingsRepository().getMatch("jms").getExpiryAddress().toString());
|
||||
|
||||
}
|
||||
finally {
|
||||
embeddedJMS.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean tryConsume() throws JMSException {
|
||||
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
|
||||
Queue queue = session.createQueue("NewQueue");
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
return true;
|
||||
}
|
||||
catch (JMSException e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<jms xmlns="urn:activemq:jms">
|
||||
<queue name="DLQ"/>
|
||||
<queue name="ExpiryQueue"/>
|
||||
|
||||
</jms>
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
|
||||
<name>0.0.0.0</name>
|
||||
|
||||
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||
|
||||
<persistence-enabled>false</persistence-enabled>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<!-- this could be ASYNCIO or NIO
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
||||
<!--
|
||||
This value was determined through a calculation.
|
||||
Your system could perform 25 writes per millisecond
|
||||
on the current journal configuration.
|
||||
That translates as a sync write every 40000 nanoseconds
|
||||
-->
|
||||
<journal-buffer-timeout>40000</journal-buffer-timeout>
|
||||
|
||||
|
||||
<acceptors>
|
||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||
|
||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
|
||||
|
||||
<!-- STOMP Acceptor. -->
|
||||
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
|
||||
|
||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
|
||||
|
||||
<!-- MQTT Acceptor -->
|
||||
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="a"/>
|
||||
<permission type="deleteNonDurableQueue" roles="a"/>
|
||||
<permission type="createDurableQueue" roles="a"/>
|
||||
<permission type="deleteDurableQueue" roles="a"/>
|
||||
<permission type="browse" roles="a"/>
|
||||
<permission type="send" roles="a"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="a"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<auto-create-jms-queues>false</auto-create-jms-queues>
|
||||
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
|
||||
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<max-size-bytes>10485760</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>BLOCK</address-full-policy>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,114 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<jms xmlns="urn:activemq:jms">
|
||||
<queue name="DLQ"/>
|
||||
<queue name="ExpiryQueue"/>
|
||||
<queue name="NewQueue"/>
|
||||
|
||||
</jms>
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
|
||||
<name>0.0.0.0</name>
|
||||
|
||||
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||
|
||||
<persistence-enabled>false</persistence-enabled>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<!-- this could be ASYNCIO or NIO
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
||||
<!--
|
||||
This value was determined through a calculation.
|
||||
Your system could perform 25 writes per millisecond
|
||||
on the current journal configuration.
|
||||
That translates as a sync write every 40000 nanoseconds
|
||||
-->
|
||||
<journal-buffer-timeout>40000</journal-buffer-timeout>
|
||||
|
||||
|
||||
<acceptors>
|
||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||
|
||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
|
||||
|
||||
<!-- STOMP Acceptor. -->
|
||||
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
|
||||
|
||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
|
||||
|
||||
<!-- MQTT Acceptor -->
|
||||
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="a"/>
|
||||
<permission type="deleteNonDurableQueue" roles="a"/>
|
||||
<permission type="createDurableQueue" roles="a"/>
|
||||
<permission type="deleteDurableQueue" roles="a"/>
|
||||
<permission type="consume" roles="a"/>
|
||||
<permission type="browse" roles="a"/>
|
||||
<permission type="send" roles="a"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="a"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<auto-create-jms-queues>false</auto-create-jms-queues>
|
||||
<dead-letter-address>jms.queue.NewQueue</dead-letter-address>
|
||||
<expiry-address>jms.queue.NewQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<max-size-bytes>10485760</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>BLOCK</address-full-policy>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
</core>
|
||||
</configuration>
|
Loading…
Reference in New Issue