This commit is contained in:
Clebert Suconic 2019-08-02 13:26:16 -04:00
commit 6500ca0f86
12 changed files with 120 additions and 11 deletions

View File

@ -266,7 +266,7 @@ public class Create extends InputAbstract {
@Option(name = "--no-fsync", description = "Disable usage of fdatasync (channel.force(false) from java nio) on the journal")
private boolean noJournalSync;
@Option(name = "--device-block-size", description = "The block size by the device, default at 4096.")
@Option(name = "--journal-device-block-size", description = "The block size by the device, default at 4096.")
private int journalDeviceBlockSize = 4096;
@Option(name = "--global-max-size", description = "Maximum amount of memory which message data may consume (Default: Undefined, half of the system's memory)")

View File

@ -29,7 +29,7 @@ import org.apache.activemq.artemis.cli.factory.BrokerFactory;
import org.apache.activemq.artemis.cli.factory.jmx.ManagementFactory;
import org.apache.activemq.artemis.cli.factory.security.SecurityManagerFactory;
import org.apache.activemq.artemis.components.ExternalComponent;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.core.server.management.ManagementContext;
import org.apache.activemq.artemis.dto.BrokerDTO;
import org.apache.activemq.artemis.dto.ComponentDTO;
@ -83,7 +83,7 @@ public class Run extends LockAbstract {
managementContext.start();
server.start();
server.getServer().registerActivateCallback(new ActivateCallback() {
server.getServer().registerActivateCallback(new CleaningActivateCallback() {
@Override
public void deActivate() {
try {

View File

@ -46,10 +46,10 @@ import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
@ -97,7 +97,7 @@ import org.w3c.dom.NodeList;
* redeployed.
*/
@Deprecated
public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback {
public class JMSServerManagerImpl extends CleaningActivateCallback implements JMSServerManager {
private static final String REJECT_FILTER = Filter.GENERIC_IGNORED_FILTER;

View File

@ -42,4 +42,15 @@ public interface ActivateCallback {
*/
default void activationComplete() {
}
/*
* This is called when the broker is stopped (no shutdown in place)
*/
default void stop(ActiveMQServer server) {
}
default void shutdown(ActiveMQServer server) {
}
}

View File

@ -1269,6 +1269,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.analyzer = null;
}
for (ActivateCallback callback: activateCallbacks) {
if (isShutdown) {
callback.shutdown(this);
} else {
callback.stop(this);
}
}
if (identity != null) {
ActiveMQServerLogger.LOGGER.serverStopped("identity=" + identity + ",version=" + getVersion().getFullVersion(), tempNodeID, getUptime());
} else {

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
/** This is an abstract ActivateCallback that will cleanup itself when the broker is shutodwn */
public abstract class CleaningActivateCallback implements ActivateCallback {
public CleaningActivateCallback() {
}
@Override
public void stop(ActiveMQServer server) {
server.unregisterActivateCallback(this);
}
@Override
public void shutdown(ActiveMQServer server) {
server.unregisterActivateCallback(this);
}
}

View File

@ -107,7 +107,7 @@ public final class InVMNodeManager extends NodeManager {
public ActivateCallback startLiveNode() throws Exception {
state = FAILING_BACK;
liveLock.acquire();
return new ActivateCallback() {
return new CleaningActivateCallback() {
@Override
public void activationComplete() {
try {

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@ -429,7 +430,7 @@ public final class JdbcNodeManager extends NodeManager {
ActiveMQServerLogger.LOGGER.obtainedLiveLock();
return new ActivateCallback() {
return new CleaningActivateCallback() {
@Override
public void activationComplete() {
LOGGER.debug("ENTER activationComplete");

View File

@ -71,7 +71,6 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -82,6 +81,7 @@ import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
@ -553,7 +553,7 @@ public class ManagementServiceImpl implements ManagementService {
* Ensure the management notification address is created otherwise if auto-create-address = false then cluster
* bridges won't be able to connect.
*/
messagingServer.registerActivateCallback(new ActivateCallback() {
messagingServer.registerActivateCallback(new CleaningActivateCallback() {
@Override
public void activated() {
try {

View File

@ -480,7 +480,6 @@ public class BridgeReconnectTest extends BridgeTestBase {
ClientMessage r1 = cons1.receive(30000);
assertNotNull("received expected msg", r1);
assertEquals("property value matches", i, r1.getObjectProperty(propKey));
BridgeReconnectTest.log.info("got message " + r1.getObjectProperty(propKey));
}
BridgeReconnectTest.log.info("got messages");

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.impl.BroadcastGroupImpl;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -194,7 +195,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
@Override
public ActivateCallback startLiveNode() throws Exception {
return new ActivateCallback() {
return new CleaningActivateCallback() {
};
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.server;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
/**
* A simple test-case used for documentation purposes.
*/
public class ActivationCallbackTest extends ActiveMQTestBase {
protected ActiveMQServer server;
@Test
public void callbackOnShutdown() throws Exception {
server = createServer(false, createDefaultNettyConfig());
final CountDownLatch latch = new CountDownLatch(1);
server.registerActivateCallback(new ActivateCallback() {
@Override
public void shutdown(ActiveMQServer server) {
latch.countDown();
}
});
server.start();
Assert.assertEquals(1, latch.getCount());
server.stop();
Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
}
}