This closes #1167
This commit is contained in:
commit
57bae51e85
|
@ -46,6 +46,7 @@ cd expiry; mvn verify; cd ..
|
|||
cd http-transport; mvn verify; cd ..
|
||||
cd interceptor; mvn verify; cd ..
|
||||
cd interceptor-client; mvn verify; cd ..
|
||||
cd interceptor-client-mqtt; mvn verify; cd ..
|
||||
cd jms-auto-closeable; mvn verify; cd ..
|
||||
cd instantiate-connection-factory; mvn verify; cd ..
|
||||
cd jms-bridge; mvn verify; cd ..
|
||||
|
@ -132,5 +133,10 @@ cd scale-down; mvn verify; cd ..
|
|||
cd transaction-failover; mvn verify; cd ..
|
||||
|
||||
|
||||
|
||||
cd $ARTEMIS_HOME/examples/smoke/ha
|
||||
cd replicated-flowcontrol; mvn verify; cd ..
|
||||
|
||||
|
||||
cd $CURRENT_DIR
|
||||
rm -rf target
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.uri.ClusterConnectionConfigurationParser;
|
||||
import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
|
||||
import org.apache.activemq.artemis.utils.uri.URISupport;
|
||||
|
||||
public final class ClusterConnectionConfiguration implements Serializable {
|
||||
|
@ -375,14 +374,12 @@ public final class ClusterConnectionConfiguration implements Serializable {
|
|||
public TransportConfiguration[] getTransportConfigurations(Configuration configuration) throws Exception {
|
||||
|
||||
if (getCompositeMembers() != null) {
|
||||
ConnectorTransportConfigurationParser connectorTransportConfigurationParser = new ConnectorTransportConfigurationParser();
|
||||
|
||||
URI[] members = getCompositeMembers().getComponents();
|
||||
|
||||
List<TransportConfiguration> list = new LinkedList<>();
|
||||
|
||||
for (URI member : members) {
|
||||
list.addAll(connectorTransportConfigurationParser.newObject(member, null));
|
||||
list.addAll(ConfigurationUtils.parseConnectorURI(null, member));
|
||||
}
|
||||
|
||||
return list.toArray(new TransportConfiguration[list.size()]);
|
||||
|
|
|
@ -16,7 +16,11 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.config;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.ColocatedPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
|
||||
|
@ -35,6 +39,8 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
|
|||
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
|
||||
import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
|
||||
import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
|
||||
|
||||
public final class ConfigurationUtils {
|
||||
|
||||
|
@ -53,7 +59,8 @@ public final class ConfigurationUtils {
|
|||
throw new ActiveMQIllegalStateException("Missing cluster-configuration for replication-clustername '" + replicationCluster + "'.");
|
||||
}
|
||||
|
||||
public static HAPolicy getHAPolicy(HAPolicyConfiguration conf, ActiveMQServer server) throws ActiveMQIllegalStateException {
|
||||
public static HAPolicy getHAPolicy(HAPolicyConfiguration conf,
|
||||
ActiveMQServer server) throws ActiveMQIllegalStateException {
|
||||
if (conf == null) {
|
||||
return new LiveOnlyPolicy();
|
||||
}
|
||||
|
@ -131,6 +138,57 @@ public final class ConfigurationUtils {
|
|||
compareTTLWithCheckPeriod(configuration);
|
||||
}
|
||||
|
||||
public static List<TransportConfiguration> parseAcceptorURI(String name, String uri) {
|
||||
try {
|
||||
AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
|
||||
|
||||
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
|
||||
|
||||
return configurations;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<TransportConfiguration> parseAcceptorURI(String name, URI uri) {
|
||||
try {
|
||||
AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
|
||||
|
||||
List<TransportConfiguration> configurations = parser.newObject(uri, name);
|
||||
|
||||
return configurations;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static List<TransportConfiguration> parseConnectorURI(String name, String uri) {
|
||||
|
||||
try {
|
||||
ConnectorTransportConfigurationParser parser = new ConnectorTransportConfigurationParser();
|
||||
|
||||
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
|
||||
|
||||
return configurations;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<TransportConfiguration> parseConnectorURI(String name, URI uri) {
|
||||
try {
|
||||
ConnectorTransportConfigurationParser parser = new ConnectorTransportConfigurationParser();
|
||||
|
||||
List<TransportConfiguration> configurations = parser.newObject(uri, name);
|
||||
|
||||
return configurations;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static void compareTTLWithCheckPeriod(Configuration configuration) {
|
||||
for (ClusterConnectionConfiguration c : configuration.getClusterConfigurations())
|
||||
compareTTLWithCheckPeriod(c.getName(), c.getConnectionTTL(), configuration.getConnectionTTLOverride(), c.getClientFailureCheckPeriod());
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
|||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
|
||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
||||
|
@ -64,8 +65,6 @@ import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
|
|||
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||
import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
|
||||
import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
|
||||
import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
|
||||
import org.apache.activemq.artemis.utils.uri.BeanSupport;
|
||||
import org.jboss.logging.Logger;
|
||||
|
@ -493,10 +492,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
|
||||
@Override
|
||||
public ConfigurationImpl addAcceptorConfiguration(final String name, final String uri) throws Exception {
|
||||
|
||||
AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
|
||||
|
||||
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
|
||||
List<TransportConfiguration> configurations = ConfigurationUtils.parseAcceptorURI(name, uri);
|
||||
|
||||
for (TransportConfiguration config : configurations) {
|
||||
addAcceptorConfiguration(config);
|
||||
|
@ -531,9 +527,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
@Override
|
||||
public ConfigurationImpl addConnectorConfiguration(final String name, final String uri) throws Exception {
|
||||
|
||||
ConnectorTransportConfigurationParser parser = new ConnectorTransportConfigurationParser();
|
||||
|
||||
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
|
||||
List<TransportConfiguration> configurations = ConfigurationUtils.parseConnectorURI(name, uri);
|
||||
|
||||
for (TransportConfiguration config : configurations) {
|
||||
addConnectorConfiguration(name, config);
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
|
|||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
|
||||
|
@ -42,6 +43,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
|||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
|
||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
||||
|
@ -63,7 +65,6 @@ import org.apache.activemq.artemis.core.security.Role;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
||||
|
@ -71,8 +72,6 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
|||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||
import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
|
||||
import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
|
||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
|
||||
|
@ -995,9 +994,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
String uri = e.getChildNodes().item(0).getNodeValue();
|
||||
|
||||
AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
|
||||
|
||||
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
|
||||
List<TransportConfiguration> configurations = ConfigurationUtils.parseAcceptorURI(name, uri);
|
||||
|
||||
Map<String, Object> params = configurations.get(0).getParams();
|
||||
|
||||
|
@ -1020,9 +1017,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
String uri = e.getChildNodes().item(0).getNodeValue();
|
||||
|
||||
ConnectorTransportConfigurationParser parser = new ConnectorTransportConfigurationParser();
|
||||
|
||||
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
|
||||
List<TransportConfiguration> configurations = ConfigurationUtils.parseConnectorURI(name, uri);
|
||||
|
||||
Map<String, Object> params = configurations.get(0).getParams();
|
||||
|
||||
|
|
|
@ -332,6 +332,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
@Override
|
||||
public void resumeCleanup() {
|
||||
this.cleanupEnabled = true;
|
||||
scheduleCleanup();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1123,6 +1123,8 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
@Override
|
||||
public Collection<Integer> getCurrentIds() throws Exception {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
List<Integer> ids = new ArrayList<>();
|
||||
if (fileFactory != null) {
|
||||
for (String fileName : fileFactory.listFiles("page")) {
|
||||
|
@ -1130,23 +1132,22 @@ public class PagingStoreImpl implements PagingStore {
|
|||
}
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
for (Integer id : pageIds) {
|
||||
SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id));
|
||||
if (!sFile.exists()) {
|
||||
continue;
|
||||
}
|
||||
replicator.syncPages(sFile, id, getAddress());
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception {
|
||||
for (Integer id : pageIds) {
|
||||
SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id));
|
||||
if (!sFile.exists()) {
|
||||
continue;
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.replicaSyncFile(sFile, sFile.size());
|
||||
replicator.syncPages(sFile, id, getAddress());
|
||||
}
|
||||
}
|
||||
|
||||
// Inner classes -------------------------------------------------
|
||||
}
|
||||
|
|
|
@ -587,10 +587,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
stopReplication();
|
||||
throw e;
|
||||
} finally {
|
||||
pagingManager.resumeCleanup();
|
||||
// Re-enable compact and reclaim of journal files
|
||||
originalBindingsJournal.replicationSyncFinished();
|
||||
originalMessageJournal.replicationSyncFinished();
|
||||
pagingManager.resumeCleanup();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,11 +16,11 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
|
||||
|
@ -42,7 +42,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
|
|||
*/
|
||||
private long fileId;
|
||||
private int dataSize;
|
||||
private ByteBuffer byteBuffer;
|
||||
private ByteBuf byteBuffer;
|
||||
private byte[] byteArray;
|
||||
private SimpleString pageStoreName;
|
||||
private FileType fileType;
|
||||
|
@ -78,7 +78,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
|
|||
SimpleString storeName,
|
||||
long id,
|
||||
int size,
|
||||
ByteBuffer buffer) {
|
||||
ByteBuf buffer) {
|
||||
this();
|
||||
this.byteBuffer = buffer;
|
||||
this.pageStoreName = storeName;
|
||||
|
@ -124,7 +124,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
|
|||
* (which might receive appends)
|
||||
*/
|
||||
if (dataSize > 0) {
|
||||
buffer.writeBytes(byteBuffer);
|
||||
buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
|
||||
}
|
||||
|
||||
if (byteBuffer != null) {
|
||||
byteBuffer.release();
|
||||
byteBuffer = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
|||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
|
@ -68,7 +69,6 @@ import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
|
|||
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
|
||||
import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
|
@ -227,9 +227,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
|
|||
|
||||
@Override
|
||||
public Acceptor createAcceptor(String name, String uri) throws Exception {
|
||||
AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
|
||||
|
||||
List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
|
||||
List<TransportConfiguration> configurations = ConfigurationUtils.parseAcceptorURI(name, uri);
|
||||
|
||||
return createAcceptor(configurations.get(0));
|
||||
}
|
||||
|
|
|
@ -411,7 +411,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
if (!channel1.isOpen()) {
|
||||
channel1.open();
|
||||
}
|
||||
channel1.writeDirect(ByteBuffer.wrap(data), true);
|
||||
channel1.writeDirect(ByteBuffer.wrap(data), false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,8 +25,11 @@ import java.util.Map;
|
|||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
|
@ -122,6 +125,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
|
||||
private final ExecutorFactory executorFactory;
|
||||
|
||||
private final Executor replicationStream;
|
||||
|
||||
private SessionFailureListener failureListener;
|
||||
|
||||
private CoreRemotingConnection remotingConnection;
|
||||
|
@ -141,6 +146,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
this.executorFactory = executorFactory;
|
||||
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
|
||||
this.remotingConnection = remotingConnection;
|
||||
this.replicationStream = executorFactory.getExecutor();
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
|
@ -178,7 +184,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
boolean sync,
|
||||
final boolean lineUp) throws Exception {
|
||||
if (enabled) {
|
||||
sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
|
||||
sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -343,15 +349,15 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
}
|
||||
|
||||
private OperationContext sendReplicatePacket(final Packet packet) {
|
||||
return sendReplicatePacket(packet, true);
|
||||
return sendReplicatePacket(packet, true, true);
|
||||
}
|
||||
|
||||
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
|
||||
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) {
|
||||
if (!enabled)
|
||||
return null;
|
||||
boolean runItNow = false;
|
||||
|
||||
OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
|
||||
final OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
|
||||
if (lineUp) {
|
||||
repliToken.replicationLineUp();
|
||||
}
|
||||
|
@ -359,10 +365,17 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
synchronized (replicationLock) {
|
||||
if (enabled) {
|
||||
pendingTokens.add(repliToken);
|
||||
if (!flowControl()) {
|
||||
return repliToken;
|
||||
}
|
||||
if (useExecutor) {
|
||||
replicationStream.execute(() -> {
|
||||
if (enabled) {
|
||||
flowControl();
|
||||
replicatingChannel.send(packet);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
flowControl();
|
||||
replicatingChannel.send(packet);
|
||||
}
|
||||
} else {
|
||||
// Already replicating channel failed, so just play the action now
|
||||
runItNow = true;
|
||||
|
@ -383,11 +396,12 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
* In case you refactor this in any way, this method must hold a lock on replication lock. .
|
||||
*/
|
||||
private boolean flowControl() {
|
||||
synchronized (replicationLock) {
|
||||
// synchronized (replicationLock) { -- I'm not adding this because the caller already has it
|
||||
// future maintainers of this code please be aware that the intention here is hold the lock on replication lock
|
||||
if (!replicatingChannel.getConnection().isWritable(this)) {
|
||||
try {
|
||||
logger.trace("flowControl waiting on writable");
|
||||
logger.trace("flowControl waiting on writable replication");
|
||||
writable.set(false);
|
||||
//don't wait for ever as this may hang tests etc, we've probably been closed anyway
|
||||
long now = System.currentTimeMillis();
|
||||
|
@ -396,7 +410,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
replicationLock.wait(deadline - now);
|
||||
now = System.currentTimeMillis();
|
||||
}
|
||||
logger.trace("flow control done");
|
||||
logger.trace("flow control done on replication");
|
||||
|
||||
if (!writable.get()) {
|
||||
ActiveMQServerLogger.LOGGER.slowReplicationResponse();
|
||||
|
@ -412,6 +426,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
throw new ActiveMQInterruptedException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -515,7 +530,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
}
|
||||
SequentialFile file = jf.getFile().cloneFile();
|
||||
try {
|
||||
ActiveMQServerLogger.LOGGER.journalSynch(jf, file.size(), file);
|
||||
ActiveMQServerLogger.LOGGER.replicaSyncFile(file, file.size());
|
||||
sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
|
||||
} finally {
|
||||
if (file.isOpen())
|
||||
|
@ -560,10 +575,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
// We can afford having a single buffer here for this entire loop
|
||||
// because sendReplicatePacket will encode the packet as a NettyBuffer
|
||||
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
|
||||
final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17 == 131072 == 128 * 1024
|
||||
int size = 1 << 17;
|
||||
while (true) {
|
||||
buffer.clear();
|
||||
final int bytesRead = channel.read(buffer);
|
||||
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
|
||||
ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
|
||||
final int bytesRead = channel.read(byteBuffer);
|
||||
int toSend = bytesRead;
|
||||
if (bytesRead > 0) {
|
||||
if (bytesRead >= maxBytesToSend) {
|
||||
|
@ -572,12 +588,13 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
|
|||
} else {
|
||||
maxBytesToSend = maxBytesToSend - bytesRead;
|
||||
}
|
||||
buffer.limit(toSend);
|
||||
}
|
||||
buffer.rewind();
|
||||
|
||||
logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName());
|
||||
// sending -1 or 0 bytes will close the file at the backup
|
||||
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer));
|
||||
// We cannot simply send everything of a file through the executor,
|
||||
// otherwise we would run out of memory.
|
||||
// so we don't use the executor here
|
||||
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false);
|
||||
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
|
|||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
|
@ -189,8 +188,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
void backupServerSynched(ActiveMQServerImpl server);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 221025, value = "Replication: sending {0} (size={1}) to backup. {2}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void journalSynch(JournalFile jf, Long size, SequentialFile file);
|
||||
@Message(id = 221025, value = "Replication: sending {0} (size={1}) to replica.", format = Message.Format.MESSAGE_FORMAT)
|
||||
void replicaSyncFile(SequentialFile jf, Long size);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(
|
||||
|
|
|
@ -17,10 +17,10 @@
|
|||
|
||||
package org.apache.activemq.artemis.uri;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -28,8 +28,7 @@ public class AcceptorParserTest {
|
|||
|
||||
@Test
|
||||
public void testAcceptor() throws Exception {
|
||||
AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
|
||||
List<TransportConfiguration> configs = parser.newObject(new URI("tcp://localhost:8080?tcpSendBufferSize=1048576&tcpReceiveBufferSize=1048576&protocols=openwire&banana=x"), "test");
|
||||
List<TransportConfiguration> configs = ConfigurationUtils.parseAcceptorURI("test", "tcp://localhost:8080?tcpSendBufferSize=1048576&tcpReceiveBufferSize=1048576&protocols=openwire&banana=x");
|
||||
|
||||
for (TransportConfiguration config : configs) {
|
||||
System.out.println("config:" + config);
|
||||
|
|
|
@ -78,7 +78,7 @@ under the License.
|
|||
<addresses>
|
||||
<address name="exampleQueue">
|
||||
<anycast>
|
||||
<queue name="jms.queue.exampleQueue"/>
|
||||
<queue name="exampleQueue"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
|
|
@ -80,7 +80,7 @@ under the License.
|
|||
<addresses>
|
||||
<address name="exampleQueue">
|
||||
<anycast>
|
||||
<queue name="jms.queue.exampleQueue"/>
|
||||
<queue name="exampleQueue"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
|
|
@ -92,7 +92,7 @@ under the License.
|
|||
<addresses>
|
||||
<address name="exampleQueue">
|
||||
<anycast>
|
||||
<queue name="jms.queue.exampleQueue"/>
|
||||
<queue name="exampleQueue"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
|
|
@ -93,7 +93,7 @@ under the License.
|
|||
<addresses>
|
||||
<address name="exampleQueue">
|
||||
<anycast>
|
||||
<queue name="jms.queue.exampleQueue"/>
|
||||
<queue name="exampleQueue"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
|
|
@ -87,7 +87,7 @@ under the License.
|
|||
<addresses>
|
||||
<address name="exampleQueue">
|
||||
<anycast>
|
||||
<queue name="jms.queue.exampleQueue"/>
|
||||
<queue name="exampleQueue"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
|
|
@ -88,7 +88,7 @@ under the License.
|
|||
<addresses>
|
||||
<address name="exampleQueue">
|
||||
<anycast>
|
||||
<queue name="jms.queue.exampleQueue"/>
|
||||
<queue name="exampleQueue"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
|
|
@ -96,6 +96,7 @@ under the License.
|
|||
<modules>
|
||||
<module>features</module>
|
||||
<module>protocols</module>
|
||||
<module>smoke</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.activemq.examples.smoke</groupId>
|
||||
<artifactId>smoke</artifactId>
|
||||
<version>2.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.activemq.examples.smoke.ha</groupId>
|
||||
<artifactId>smoke-ha</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<name>ActiveMQ Artemis Failover Examples</name>
|
||||
|
||||
<!-- Properties -->
|
||||
<properties>
|
||||
<!--
|
||||
Explicitly declaring the source encoding eliminates the following
|
||||
message: [WARNING] Using platform encoding (UTF-8 actually) to copy
|
||||
filtered resources, i.e. build is platform dependent!
|
||||
-->
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<activemq.basedir>${project.basedir}/../../..</activemq.basedir>
|
||||
</properties>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>release</id>
|
||||
<modules>
|
||||
<module>replicated-flowcontrol</module>
|
||||
</modules>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>examples</id>
|
||||
<modules>
|
||||
<module>replicated-flowcontrol</module>
|
||||
</modules>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,112 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.activemq.examples.smoke.ha</groupId>
|
||||
<artifactId>smoke-ha</artifactId>
|
||||
<version>2.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>replicated-flowcontrol</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Smoke test for replication, simulating large flow conrol</name>
|
||||
|
||||
<properties>
|
||||
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-cli</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-jms-client</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.qpid</groupId>
|
||||
<artifactId>qpid-jms-client</artifactId>
|
||||
<version>${qpid.jms.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>create0</id>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<!-- this makes it easier in certain envs -->
|
||||
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
|
||||
<instance>${basedir}/target/server0</instance>
|
||||
<configuration>${basedir}/target/classes/activemq/server0</configuration>
|
||||
<javaOptions>-Dudp-address=${udp-address}</javaOptions>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>create1</id>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<!-- this makes it easier in certain envs -->
|
||||
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
|
||||
<instance>${basedir}/target/server1</instance>
|
||||
<configuration>${basedir}/target/classes/activemq/server1</configuration>
|
||||
<javaOptions>-Dudp-address=${udp-address}</javaOptions>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>runClient</id>
|
||||
<goals>
|
||||
<goal>runClient</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<clientClass>org.apache.activemq.artemis.jms.example.ReplicatedFailbackStaticSmoke</clientClass>
|
||||
<args>
|
||||
<param>${basedir}/target/server0</param>
|
||||
<param>${basedir}/target/server1</param>
|
||||
</args>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq.examples.smoke.ha</groupId>
|
||||
<artifactId>replicated-flowcontrol</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -0,0 +1,225 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jms.example;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.naming.InitialContext;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.util.ServerUtil;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
|
||||
/**
|
||||
* Example of live and replicating backup pair.
|
||||
* <p>
|
||||
* After both servers are started, the live server is killed and the backup becomes active ("fails-over").
|
||||
* <p>
|
||||
* Later the live server is restarted and takes back its position by asking the backup to stop ("fail-back").
|
||||
*/
|
||||
public class ReplicatedFailbackStaticSmoke {
|
||||
|
||||
private static Process server0;
|
||||
|
||||
private static Process server1;
|
||||
|
||||
static final int NUM_MESSAGES = 300_000;
|
||||
static final int START_CONSUMERS = 100_000;
|
||||
static final int START_SERVER = 101_000;
|
||||
static final int KILL_SERVER = -1; // not killing the server right now.. just for future use
|
||||
static final int NUMBER_OF_CONSUMERS = 10;
|
||||
static final ReusableLatch latch = new ReusableLatch(NUM_MESSAGES);
|
||||
|
||||
static AtomicBoolean running = new AtomicBoolean(true);
|
||||
static AtomicInteger totalConsumed = new AtomicInteger(0);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
|
||||
Connection connection = null;
|
||||
|
||||
InitialContext initialContext = null;
|
||||
|
||||
try {
|
||||
server0 = ServerUtil.startServer(args[0], ReplicatedFailbackStaticSmoke.class.getSimpleName() + "0", 0, 30000);
|
||||
|
||||
initialContext = new InitialContext();
|
||||
|
||||
ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
|
||||
|
||||
connection = connectionFactory.createConnection();
|
||||
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
||||
Queue queue = session.createQueue("exampleQueue");
|
||||
|
||||
connection.start();
|
||||
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
BytesMessage bytesMessage = session.createBytesMessage();
|
||||
bytesMessage.writeBytes(new byte[20 * 1024]);
|
||||
|
||||
for (int i = 0; i < NUM_MESSAGES; i++) {
|
||||
|
||||
producer.send(bytesMessage);
|
||||
if (i % 1000 == 0) {
|
||||
System.out.println("Sent " + i + " messages, consumed=" + totalConsumed.get());
|
||||
session.commit();
|
||||
}
|
||||
|
||||
if (i == START_CONSUMERS) {
|
||||
System.out.println("Starting consumers");
|
||||
startConsumers();
|
||||
}
|
||||
|
||||
if (KILL_SERVER >= 0 && i == KILL_SERVER) {
|
||||
System.out.println("Killing server");
|
||||
ServerUtil.killServer(server0);
|
||||
}
|
||||
|
||||
if (i == START_SERVER) {
|
||||
System.out.println("Starting extra server");
|
||||
server1 = ServerUtil.startServer(args[1], ReplicatedFailbackStaticSmoke.class.getSimpleName() + "1", 1, 10000);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
System.out.println("Awaiting all consumers to finish");
|
||||
while (!latch.await(5, TimeUnit.SECONDS)) {
|
||||
System.out.println("Missing " + latch.getCount() + ", totalConsumed = " + totalConsumed);
|
||||
}
|
||||
|
||||
} finally {
|
||||
|
||||
running.set(false);
|
||||
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
|
||||
if (initialContext != null) {
|
||||
initialContext.close();
|
||||
}
|
||||
|
||||
ServerUtil.killServer(server0);
|
||||
ServerUtil.killServer(server1);
|
||||
}
|
||||
}
|
||||
|
||||
static void startConsumers() {
|
||||
for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
|
||||
Consumer consumer = new Consumer(i % 2 == 0, i);
|
||||
consumer.start();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
static class Consumer extends Thread {
|
||||
|
||||
ConnectionFactory factory;
|
||||
Connection connection;
|
||||
Session session;
|
||||
Queue queue;
|
||||
MessageConsumer consumer;
|
||||
int count = 0;
|
||||
int totalCount = 0;
|
||||
|
||||
final int consumerID;
|
||||
|
||||
final boolean amqp;
|
||||
|
||||
Consumer(boolean amqp, int id) {
|
||||
super("amqp=" + amqp + ", id=" + id);
|
||||
this.amqp = amqp;
|
||||
this.consumerID = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Consumer " + consumerID + ", amqp::" + amqp;
|
||||
}
|
||||
|
||||
void connect() throws Exception {
|
||||
count = 0;
|
||||
if (amqp) {
|
||||
factory = new JmsConnectionFactory("amqp://localhost:61616");
|
||||
} else {
|
||||
factory = new ActiveMQConnectionFactory(); // using default is fine here
|
||||
}
|
||||
|
||||
connection = factory.createConnection();
|
||||
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
queue = session.createQueue("exampleQueue");
|
||||
consumer = session.createConsumer(queue);
|
||||
connection.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (running.get()) {
|
||||
try {
|
||||
if (connection == null) {
|
||||
connect();
|
||||
}
|
||||
|
||||
totalCount++;
|
||||
if (totalCount % 1000 == 0) {
|
||||
System.out.println(this + " received " + totalCount + " messages");
|
||||
}
|
||||
|
||||
BytesMessage message = (BytesMessage) consumer.receive(5000);
|
||||
if (message == null) {
|
||||
System.out.println("Consumer " + this + " couldn't get a message");
|
||||
if (count > 0) {
|
||||
session.commit();
|
||||
latch.countDown(count);
|
||||
totalConsumed.addAndGet(count);
|
||||
count = 0;
|
||||
}
|
||||
} else {
|
||||
count++;
|
||||
|
||||
if (count == 100) {
|
||||
session.commit();
|
||||
latch.countDown(count);
|
||||
totalConsumed.addAndGet(count);
|
||||
count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("Giving up the loop " + this);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
|
||||
|
||||
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/largemessages</large-messages-directory>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<cluster-user>exampleUser</cluster-user>
|
||||
|
||||
<cluster-password>secret</cluster-password>
|
||||
|
||||
<ha-policy>
|
||||
<replication>
|
||||
<master>
|
||||
<!--we need this for auto failback-->
|
||||
<check-for-live-server>true</check-for-live-server>
|
||||
</master>
|
||||
</replication>
|
||||
</ha-policy>
|
||||
|
||||
<connectors>
|
||||
<connector name="netty-connector">tcp://localhost:61616</connector>
|
||||
<connector name="netty-backup-connector">tcp://localhost:61617</connector>
|
||||
</connectors>
|
||||
|
||||
<!-- Acceptors -->
|
||||
<acceptors>
|
||||
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
|
||||
</acceptors>
|
||||
|
||||
<cluster-connections>
|
||||
<cluster-connection name="my-cluster">
|
||||
<connector-ref>netty-connector</connector-ref>
|
||||
<static-connectors>
|
||||
<connector-ref>netty-backup-connector</connector-ref>
|
||||
</static-connectors>
|
||||
</cluster-connection>
|
||||
</cluster-connections>
|
||||
<!-- Other config -->
|
||||
|
||||
<security-settings>
|
||||
<!--security for example queue-->
|
||||
<security-setting match="exampleQueue">
|
||||
<permission roles="guest" type="createDurableQueue"/>
|
||||
<permission roles="guest" type="deleteDurableQueue"/>
|
||||
<permission roles="guest" type="createNonDurableQueue"/>
|
||||
<permission roles="guest" type="deleteNonDurableQueue"/>
|
||||
<permission roles="guest" type="consume"/>
|
||||
<permission roles="guest" type="send"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
<address-settings>
|
||||
<!-- if you define auto-create on certain queues, management has to be auto-create -->
|
||||
<address-setting match="activemq.management#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>200MB</max-size-bytes>
|
||||
<page-size-bytes>100MB</page-size-bytes>
|
||||
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="exampleQueue">
|
||||
<anycast>
|
||||
<queue name="exampleQueue"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,121 @@
|
|||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
|
||||
|
||||
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/largemessages</large-messages-directory>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<cluster-user>exampleUser</cluster-user>
|
||||
|
||||
<cluster-password>secret</cluster-password>
|
||||
|
||||
<ha-policy>
|
||||
<replication>
|
||||
<slave>
|
||||
<allow-failback>true</allow-failback>
|
||||
<!-- not needed but tells the backup not to restart after failback as there will be > 0 backups saved -->
|
||||
<max-saved-replicated-journals-size>0</max-saved-replicated-journals-size>
|
||||
</slave>
|
||||
</replication>
|
||||
</ha-policy>
|
||||
|
||||
<!-- Connectors -->
|
||||
<connectors>
|
||||
<connector name="netty-live-connector">tcp://localhost:61616</connector>
|
||||
<connector name="netty-connector">tcp://localhost:61617</connector>
|
||||
</connectors>
|
||||
|
||||
<!-- Acceptors -->
|
||||
<acceptors>
|
||||
<acceptor name="netty-acceptor">tcp://localhost:61617</acceptor>
|
||||
</acceptors>
|
||||
|
||||
<cluster-connections>
|
||||
<cluster-connection name="my-cluster">
|
||||
<connector-ref>netty-connector</connector-ref>
|
||||
<static-connectors>
|
||||
<connector-ref>netty-live-connector</connector-ref>
|
||||
</static-connectors>
|
||||
</cluster-connection>
|
||||
</cluster-connections>
|
||||
<!-- Other config -->
|
||||
|
||||
<security-settings>
|
||||
<!--security for example queue-->
|
||||
<security-setting match="exampleQueue">
|
||||
<permission roles="guest" type="createDurableQueue"/>
|
||||
<permission roles="guest" type="deleteDurableQueue"/>
|
||||
<permission roles="guest" type="createNonDurableQueue"/>
|
||||
<permission roles="guest" type="deleteNonDurableQueue"/>
|
||||
<permission roles="guest" type="consume"/>
|
||||
<permission roles="guest" type="send"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!-- if you define auto-create on certain queues, management has to be auto-create -->
|
||||
<address-setting match="activemq.management#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>200MB</max-size-bytes>
|
||||
<page-size-bytes>100MB</page-size-bytes>
|
||||
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="exampleQueue">
|
||||
<anycast>
|
||||
<queue name="exampleQueue"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,20 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
|
||||
connectionFactory.ConnectionFactory=tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1
|
||||
queue.queue/exampleQueue=exampleQueue
|
|
@ -0,0 +1,60 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.activemq.examples</groupId>
|
||||
<artifactId>artemis-examples</artifactId>
|
||||
<version>2.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.activemq.examples.smoke</groupId>
|
||||
<artifactId>smoke</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<name>ActiveMQ Artemis Smoke Tests</name>
|
||||
|
||||
<!-- Properties -->
|
||||
<properties>
|
||||
<!--
|
||||
Explicitly declaring the source encoding eliminates the following
|
||||
message: [WARNING] Using platform encoding (UTF-8 actually) to copy
|
||||
filtered resources, i.e. build is platform dependent!
|
||||
-->
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
|
||||
</properties>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>examples</id>
|
||||
<modules>
|
||||
<module>ha</module>
|
||||
</modules>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>release</id>
|
||||
<modules>
|
||||
<module>ha</module>
|
||||
</modules>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
|
@ -0,0 +1,8 @@
|
|||
# Smoke Tests
|
||||
|
||||
Everything under this folder is not meant for "educational" purposes.
|
||||
|
||||
These represent production-like scenarios.
|
||||
|
||||
In cases where a testcase may be too short lived, we use these smoke tests
|
||||
to validate production like scenarios
|
Loading…
Reference in New Issue