This commit is contained in:
Clebert Suconic 2017-09-26 14:48:08 -04:00
commit b56950c351
36 changed files with 418 additions and 412 deletions

View File

@ -93,4 +93,11 @@ public interface Packet {
* @return true if confirmation is required * @return true if confirmation is required
*/ */
boolean isRequiresConfirmations(); boolean isRequiresConfirmations();
/** The packe wasn't used because the stream is closed,
* this gives a chance to sub classes to cleanup anything that won't be used. */
default void release() {
}
} }

View File

@ -354,6 +354,7 @@ public class PacketImpl implements Packet {
return result; return result;
} }
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) { if (this == obj) {

View File

@ -52,8 +52,8 @@ public interface ActiveMQJMSServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void serverCachingCommand(Object runnable); void serverCachingCommand(Object runnable);
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.WARN)
@Message(id = 121005, value = "Invalid \"host\" value \"0.0.0.0\" detected for \"{0}\" connector. Switching to \"{1}\". If this new address is incorrect please manually configure the connector to use the proper one.", @Message(id = 122005, value = "Invalid \"host\" value \"0.0.0.0\" detected for \"{0}\" connector. Switching to \"{1}\". If this new address is incorrect please manually configure the connector to use the proper one.",
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void invalidHostForConnector(String name, String newHost); void invalidHostForConnector(String name, String newHost);

View File

@ -35,9 +35,12 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
public abstract class AbstractSequentialFile implements SequentialFile { public abstract class AbstractSequentialFile implements SequentialFile {
private static final Logger logger = Logger.getLogger(AbstractSequentialFile.class);
private File file; private File file;
protected final File directory; protected final File directory;
@ -267,6 +270,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
@Override @Override
public void onError(final int errorCode, final String errorMessage) { public void onError(final int errorCode, final String errorMessage) {
if (logger.isTraceEnabled()) {
logger.trace("onError" + " code: " + errorCode + " message: " + errorMessage);
}
final int size = delegates.size(); final int size = delegates.size();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
try { try {

View File

@ -33,12 +33,15 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.jboss.logging.Logger;
/** /**
* An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories
*/ */
public abstract class AbstractSequentialFileFactory implements SequentialFileFactory { public abstract class AbstractSequentialFileFactory implements SequentialFileFactory {
private static final Logger logger = Logger.getLogger(AbstractSequentialFileFactory.class);
// Timeout used to wait executors to shutdown // Timeout used to wait executors to shutdown
protected static final int EXECUTOR_TIMEOUT = 60; protected static final int EXECUTOR_TIMEOUT = 60;
@ -161,6 +164,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
public void onIOError(Exception exception, String message, SequentialFile file) { public void onIOError(Exception exception, String message, SequentialFile file) {
if (critialErrorListener != null) { if (critialErrorListener != null) {
critialErrorListener.onIOException(exception, message, file); critialErrorListener.onIOException(exception, message, file);
} else {
logger.warn("Critical IO Error Called. No Critical IO Error Handler Registered");
} }
} }

View File

@ -34,9 +34,12 @@ import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jlibaio.LibaioFile; import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
public class AIOSequentialFile extends AbstractSequentialFile { public class AIOSequentialFile extends AbstractSequentialFile {
private static final Logger logger = Logger.getLogger(AIOSequentialFileFactory.class);
private boolean opened = false; private boolean opened = false;
private LibaioFile aioFile; private LibaioFile aioFile;
@ -114,6 +117,10 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override @Override
public synchronized void fill(final int size) throws Exception { public synchronized void fill(final int size) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Filling file: " + getFileName());
}
checkOpened(); checkOpened();
aioFile.fill(size); aioFile.fill(size);
@ -129,9 +136,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException { public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException {
opened = true; opened = true;
if (logger.isTraceEnabled()) {
logger.trace("Opening file: " + getFileName());
}
try { try {
aioFile = aioFactory.libaioContext.openFile(getFile(), factory.isDatasync()); aioFile = aioFactory.libaioContext.openFile(getFile(), factory.isDatasync());
} catch (IOException e) { } catch (IOException e) {
logger.error("Error opening file: " + getFileName());
factory.onIOError(e, e.getMessage(), this); factory.onIOError(e, e.getMessage(), this);
throw new ActiveMQNativeIOError(e.getMessage(), e); throw new ActiveMQNativeIOError(e.getMessage(), e);
} }
@ -156,6 +168,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
// Sending it through the callback would make it released // Sending it through the callback would make it released
aioFile.read(positionToRead, bytesToRead, bytes, getCallback(callback, null)); aioFile.read(positionToRead, bytesToRead, bytes, getCallback(callback, null));
} catch (IOException e) { } catch (IOException e) {
logger.error("IOError reading file: " + getFileName(), e);
factory.onIOError(e, e.getMessage(), this); factory.onIOError(e, e.getMessage(), this);
throw new ActiveMQNativeIOError(e.getMessage(), e); throw new ActiveMQNativeIOError(e.getMessage(), e);
} }
@ -176,6 +189,10 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override @Override
public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception { public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Write Direct, Sync: " + sync + " File: " + getFileName());
}
if (sync) { if (sync) {
SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); SimpleWaitIOCallback completion = new SimpleWaitIOCallback();

View File

@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
@ -77,6 +79,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
final IOCriticalErrorListener listener) { final IOCriticalErrorListener listener) {
super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener); super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener);
callbackPool = new CallbackCache<>(maxIO); callbackPool = new CallbackCache<>(maxIO);
if (logger.isTraceEnabled()) {
logger.trace("New AIO File Created");
}
} }
public AIOSequentialCallback getCallback() { public AIOSequentialCallback getCallback() {
@ -304,7 +309,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
try { try {
libaioFile.write(position, bytes, buffer, this); libaioFile.write(position, bytes, buffer, this);
} catch (IOException e) { } catch (IOException e) {
callback.onError(-1, e.getMessage()); callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
onIOError(e, "Failed to write to file", sequentialFile); onIOError(e, "Failed to write to file", sequentialFile);
} }
} }
@ -337,6 +342,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override @Override
public void onError(int errno, String message) { public void onError(int errno, String message) {
if (logger.isDebugEnabled()) {
logger.trace("AIO on error issued. Error(code: " + errno + " msg: " + message + ")");
}
this.error = true; this.error = true;
this.errorCode = errno; this.errorCode = errno;
this.errorMessage = message; this.errorMessage = message;
@ -357,6 +365,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
if (error) { if (error) {
callback.onError(errorCode, errorMessage); callback.onError(errorCode, errorMessage);
onIOError(new ActiveMQException(errorCode, errorMessage), errorMessage, null);
errorMessage = null; errorMessage = null;
} else { } else {
if (callback != null) { if (callback != null) {
@ -385,6 +394,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
libaioContext.poll(); libaioContext.poll();
} catch (Throwable e) { } catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e); ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
onIOError(new ActiveMQException("Error on libaio poll"), e.getMessage(), null);
} }
} }
} }

View File

@ -1671,38 +1671,15 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
ActiveMQConnectionFactory cf; ActiveMQConnectionFactory cf;
List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA(); Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
if (ha == null) { if (ha == null) {
ha = ActiveMQClient.DEFAULT_IS_HA; ha = ActiveMQClient.DEFAULT_IS_HA;
} }
if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) { BroadcastEndpointFactory endpointFactory = this.createBroadcastEndpointFactory(overrideProperties);
BroadcastEndpointFactory endpointFactory = null;
if (jgroupsLocatorClassName != null) { if (endpointFactory != null) {
String jchannelRefName = raProperties.getJgroupsChannelRefName();
JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
} else if (discoveryAddress != null) {
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
if (discoveryPort == null) {
discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
}
String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
} else if (jgroupsFileName != null) {
endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
}
Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
if (refreshTimeout == null) { if (refreshTimeout == null) {
refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT; refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
@ -1769,35 +1746,11 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
ActiveMQConnectionFactory cf; ActiveMQConnectionFactory cf;
List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames(); List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
if (connectorClassName == null) { if (connectorClassName == null) {
BroadcastEndpointFactory endpointFactory = null; BroadcastEndpointFactory endpointFactory = this.createBroadcastEndpointFactory(overrideProperties);
if (discoveryAddress != null) {
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
if (discoveryPort == null) {
discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
}
String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
} else if (jgroupsFileName != null) {
endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
} else {
String jgroupsLocatorClass = raProperties.getJgroupsChannelLocatorClass();
if (jgroupsLocatorClass != null) {
String jgroupsChannelRefName = raProperties.getJgroupsChannelRefName();
JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClass, jgroupsChannelRefName);
endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
}
if (endpointFactory == null) { if (endpointFactory == null) {
throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory"); throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
} }
}
Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout(); Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
if (refreshTimeout == null) { if (refreshTimeout == null) {
@ -1854,6 +1807,36 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
return cf; return cf;
} }
protected BroadcastEndpointFactory createBroadcastEndpointFactory(final ConnectionFactoryProperties overrideProperties) {
String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
if (discoveryAddress != null) {
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
if (discoveryPort == null) {
discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
}
String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
return new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
}
String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
if (jgroupsLocatorClassName != null) {
String jchannelRefName = raProperties.getJgroupsChannelRefName();
JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
return new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
}
String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
if (jgroupsFileName != null) {
return new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
}
return null;
}
public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams, public Map<String, Object> overrideConnectionParameters(final Map<String, Object> connectionParams,
final Map<String, Object> overrideConnectionParams) { final Map<String, Object> overrideConnectionParams) {
Map<String, Object> map = new HashMap<>(); Map<String, Object> map = new HashMap<>();

View File

@ -1546,7 +1546,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
if (largeServerMessage.getPendingRecordID() >= 0) { if (largeServerMessage.getPendingRecordID() >= 0) {
try { try {
confirmPendingLargeMessage(largeServerMessage.getPendingRecordID()); confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
largeServerMessage.setPendingRecordID(-1); largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID);
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
} }

View File

@ -145,8 +145,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
int fileSize = config.getJournalFileSize(); int fileSize = config.getJournalFileSize();
// we need to correct the file size if its not a multiple of the alignement // we need to correct the file size if its not a multiple of the alignement
if (fileSize % journalFF.getAlignment() != 0) { int modulus = fileSize % journalFF.getAlignment();
int difference = fileSize % journalFF.getAlignment(); if (modulus != 0) {
int difference = modulus;
int low = config.getJournalFileSize() - difference; int low = config.getJournalFileSize() - difference;
int high = low + journalFF.getAlignment(); int high = low + journalFF.getAlignment();
fileSize = difference < journalFF.getAlignment() / 2 ? low : high; fileSize = difference < journalFF.getAlignment() / 2 ? low : high;
@ -271,7 +272,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId); ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId);
} }
if (replicator != null) { if (replicator != null) {
replicator.largeMessageDelete(largeMsgId); replicator.largeMessageDelete(largeMsgId, JournalStorageManager.this);
} }
} }
largeMessagesToDelete.clear(); largeMessagesToDelete.clear();
@ -374,10 +375,17 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
journalFF.releaseBuffer(buffer); journalFF.releaseBuffer(buffer);
} }
public long storePendingLargeMessage(final long messageID) throws Exception { public long storePendingLargeMessage(final long messageID, long recordID) throws Exception {
readLock(); readLock();
try { try {
long recordID = generateID(); if (recordID == LargeServerMessage.NO_PENDING_ID) {
recordID = generateID();
} else {
//this means the large message doesn't
//have a pendingRecordID, but one has been
//generated (coming from live server) for use.
recordID = -recordID;
}
messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true)); messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true));
@ -395,7 +403,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
// And the client won't be waiting for the actual file to be deleted. // And the client won't be waiting for the actual file to be deleted.
// We set a temporary record (short lived) on the journal // We set a temporary record (short lived) on the journal
// to avoid a situation where the server is restarted and pending large message stays on forever // to avoid a situation where the server is restarted and pending large message stays on forever
largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID())); largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID()));
} catch (Exception e) { } catch (Exception e) {
throw new ActiveMQInternalErrorException(e.getMessage(), e); throw new ActiveMQInternalErrorException(e.getMessage(), e);
} }
@ -426,7 +434,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
readLock(); readLock();
try { try {
if (replicator != null) { if (replicator != null) {
replicator.largeMessageDelete(largeServerMessage.getMessageID()); replicator.largeMessageDelete(largeServerMessage.getMessageID(), JournalStorageManager.this);
} }
file.delete(); file.delete();
@ -474,7 +482,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
if (largeMessage.isDurable()) { if (largeMessage.isDurable()) {
// We store a marker on the journal that the large file is pending // We store a marker on the journal that the large file is pending
long pendingRecordID = storePendingLargeMessage(id); long pendingRecordID = storePendingLargeMessage(id, LargeServerMessage.NO_PENDING_ID);
largeMessage.setPendingRecordID(pendingRecordID); largeMessage.setPendingRecordID(pendingRecordID);
} }

View File

@ -44,7 +44,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
private final JournalStorageManager storageManager; private final JournalStorageManager storageManager;
private long pendingRecordID = -1; private long pendingRecordID = NO_PENDING_ID;
private boolean paged; private boolean paged;

View File

@ -158,4 +158,14 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
storageManager.addBytesToLargeMessage(appendFile, mainLM.getMessageID(), bytes); storageManager.addBytesToLargeMessage(appendFile, mainLM.getMessageID(), bytes);
} }
@Override
public void setPendingRecordID(long pendingRecordID) {
mainLM.setPendingRecordID(pendingRecordID);
}
@Override
public long getPendingRecordID() {
return mainLM.getPendingRecordID();
}
} }

View File

@ -23,31 +23,40 @@ import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationLargeMessageEndMessage extends PacketImpl { public class ReplicationLargeMessageEndMessage extends PacketImpl {
long messageId; long messageId;
long pendingRecordId;
public ReplicationLargeMessageEndMessage() { public ReplicationLargeMessageEndMessage() {
super(PacketImpl.REPLICATION_LARGE_MESSAGE_END); super(PacketImpl.REPLICATION_LARGE_MESSAGE_END);
} }
public ReplicationLargeMessageEndMessage(final long messageId) { public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) {
this(); this();
this.messageId = messageId; this.messageId = messageId;
//we use negative value to indicate that this id is pre-generated by live node
//so that it won't be generated at backup.
//see https://issues.apache.org/jira/browse/ARTEMIS-1221
this.pendingRecordId = -pendingRecordId;
} }
@Override @Override
public int expectedEncodeSize() { public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE + return PACKET_HEADERS_SIZE +
DataConstants.SIZE_LONG; // buffer.writeLong(messageId); DataConstants.SIZE_LONG + // buffer.writeLong(messageId)
DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId);
} }
@Override @Override
public void encodeRest(final ActiveMQBuffer buffer) { public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(messageId); buffer.writeLong(messageId);
buffer.writeLong(pendingRecordId);
} }
@Override @Override
public void decodeRest(final ActiveMQBuffer buffer) { public void decodeRest(final ActiveMQBuffer buffer) {
messageId = buffer.readLong(); messageId = buffer.readLong();
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
pendingRecordId = buffer.readLong();
}
} }
/** /**
@ -85,4 +94,8 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
return false; return false;
return true; return true;
} }
public long getPendingRecordId() {
return pendingRecordId;
}
} }

View File

@ -159,6 +159,16 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
if (dataSize > 0) { if (dataSize > 0) {
buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
} }
release();
}
@Override
public void release() {
if (byteBuffer != null) {
byteBuffer.release();
byteBuffer = null;
}
} }
@Override @Override

View File

@ -52,4 +52,8 @@ public interface ReplicatedLargeMessage {
*/ */
void addBytes(byte[] body) throws Exception; void addBytes(byte[] body) throws Exception;
void setPendingRecordID(long pendingRecordID);
long getPendingRecordID();
} }

View File

@ -519,6 +519,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
} }
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false); final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
if (message != null) { if (message != null) {
message.setPendingRecordID(packet.getPendingRecordId());
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {

View File

@ -26,6 +26,7 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -42,6 +43,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
@ -91,8 +93,7 @@ public final class ReplicationManager implements ActiveMQComponent {
public boolean toBoolean() { public boolean toBoolean() {
return true; return true;
} }
}, }, ADD {
ADD {
@Override @Override
public boolean toBoolean() { public boolean toBoolean() {
return false; return false;
@ -128,6 +129,8 @@ public final class ReplicationManager implements ActiveMQComponent {
private final long timeout; private final long timeout;
private final long initialReplicationSyncTimeout;
private volatile boolean inSync = true; private volatile boolean inSync = true;
private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0); private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
@ -137,8 +140,10 @@ public final class ReplicationManager implements ActiveMQComponent {
*/ */
public ReplicationManager(CoreRemotingConnection remotingConnection, public ReplicationManager(CoreRemotingConnection remotingConnection,
final long timeout, final long timeout,
final long initialReplicationSyncTimeout,
final ExecutorFactory executorFactory) { final ExecutorFactory executorFactory) {
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1); this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
this.remotingConnection = remotingConnection; this.remotingConnection = remotingConnection;
this.replicationStream = executorFactory.getExecutor(); this.replicationStream = executorFactory.getExecutor();
@ -177,7 +182,7 @@ public final class ReplicationManager implements ActiveMQComponent {
boolean sync, boolean sync,
final boolean lineUp) throws Exception { final boolean lineUp) throws Exception {
if (enabled) { if (enabled) {
sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true); sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
} }
} }
@ -238,9 +243,11 @@ public final class ReplicationManager implements ActiveMQComponent {
} }
} }
public void largeMessageDelete(final Long messageId) { //we pass in storageManager to generate ID only if enabled
public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) {
if (enabled) { if (enabled) {
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId)); long pendingRecordID = storageManager.generateID();
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID));
} }
} }
@ -336,43 +343,30 @@ public final class ReplicationManager implements ActiveMQComponent {
} }
private OperationContext sendReplicatePacket(final Packet packet) { private OperationContext sendReplicatePacket(final Packet packet) {
return sendReplicatePacket(packet, true, true); return sendReplicatePacket(packet, true);
} }
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) { private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
if (!enabled) if (!enabled) {
packet.release();
return null; return null;
boolean runItNow = false; }
final OperationContext repliToken = OperationContextImpl.getContext(executorFactory); final OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
if (lineUp) { if (lineUp) {
repliToken.replicationLineUp(); repliToken.replicationLineUp();
} }
if (enabled) {
if (useExecutor) {
replicationStream.execute(() -> { replicationStream.execute(() -> {
if (enabled) { if (enabled) {
pendingTokens.add(repliToken); pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize()); flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet); replicatingChannel.send(packet);
}
});
} else { } else {
pendingTokens.add(repliToken); packet.release();
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
} else {
// Already replicating channel failed, so just play the action now
runItNow = true;
}
// Execute outside lock
if (runItNow) {
repliToken.replicationDone(); repliToken.replicationDone();
} }
});
return repliToken; return repliToken;
} }
@ -393,7 +387,6 @@ public final class ReplicationManager implements ActiveMQComponent {
} }
} }
return flowWorked; return flowWorked;
} }
@ -508,6 +501,24 @@ public final class ReplicationManager implements ActiveMQComponent {
sendLargeFile(null, queueName, id, file, Long.MAX_VALUE); sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
} }
private class FlushAction implements Runnable {
ReusableLatch latch = new ReusableLatch(1);
public void reset() {
latch.setCount(1);
}
public boolean await(long timeout, TimeUnit unit) throws Exception {
return latch.await(timeout, unit);
}
@Override
public void run() {
latch.countDown();
}
}
/** /**
* Sends large files in reasonably sized chunks to the backup during replication synchronization. * Sends large files in reasonably sized chunks to the backup during replication synchronization.
* *
@ -529,15 +540,19 @@ public final class ReplicationManager implements ActiveMQComponent {
file.open(); file.open();
} }
int size = 32 * 1024; int size = 32 * 1024;
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
int flowControlSize = 10;
int packetsSent = 0;
FlushAction action = new FlushAction();
try { try {
try (final FileInputStream fis = new FileInputStream(file.getJavaFile()); try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
final FileChannel channel = fis.getChannel()) {
// We can afford having a single buffer here for this entire loop // We can afford having a single buffer here for this entire loop
// because sendReplicatePacket will encode the packet as a NettyBuffer // because sendReplicatePacket will encode the packet as a NettyBuffer
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
while (true) { while (true) {
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
buffer.clear(); buffer.clear();
ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
final int bytesRead = channel.read(byteBuffer); final int bytesRead = channel.read(byteBuffer);
@ -555,18 +570,31 @@ public final class ReplicationManager implements ActiveMQComponent {
// We cannot simply send everything of a file through the executor, // We cannot simply send everything of a file through the executor,
// otherwise we would run out of memory. // otherwise we would run out of memory.
// so we don't use the executor here // so we don't use the executor here
sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false); sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
packetsSent++;
if (packetsSent % flowControlSize == 0) {
flushReplicationStream(action);
}
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break; break;
} }
} }
flushReplicationStream(action);
} finally { } finally {
buffer.release();
if (file.isOpen()) if (file.isOpen())
file.close(); file.close();
} }
} }
private void flushReplicationStream(FlushAction action) throws Exception {
action.reset();
replicationStream.execute(action);
if (!action.await(this.timeout, TimeUnit.MILLISECONDS)) {
throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
}
}
/** /**
* Reserve the following fileIDs in the backup server. * Reserve the following fileIDs in the backup server.
* *

View File

@ -22,13 +22,11 @@ import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessage { public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessage {
long NO_PENDING_ID = -1;
@Override @Override
void addBytes(byte[] bytes) throws Exception; void addBytes(byte[] bytes) throws Exception;
void setPendingRecordID(long pendingRecordID);
long getPendingRecordID();
/** /**
* We have to copy the large message content in case of DLQ and paged messages * We have to copy the large message content in case of DLQ and paged messages
* For that we need to pre-mark the LargeMessage with a flag when it is paged * For that we need to pre-mark the LargeMessage with a flag when it is paged

View File

@ -472,7 +472,7 @@ public final class ClusterManager implements ActiveMQComponent {
clusterLocators.add(serverLocator); clusterLocators.add(serverLocator);
Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server.getStorageManager()); Bridge bridge = new BridgeImpl(serverLocator, config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getReconnectAttemptsOnSameNode(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), nodeManager.getUUID(), new SimpleString(config.getName()), queue, executorFactory.getExecutor(), FilterImpl.createFilter(config.getFilterString()), SimpleString.toSimpleString(config.getForwardingAddress()), scheduledExecutor, transformer, config.isUseDuplicateDetection(), config.getUser(), config.getPassword(), server);
bridges.put(config.getName(), bridge); bridges.put(config.getName(), bridge);

View File

@ -47,7 +47,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
@ -156,6 +156,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private boolean keepConnecting = true; private boolean keepConnecting = true;
private ActiveMQServer server;
public BridgeImpl(final ServerLocatorInternal serverLocator, public BridgeImpl(final ServerLocatorInternal serverLocator,
final int initialConnectAttempts, final int initialConnectAttempts,
final int reconnectAttempts, final int reconnectAttempts,
@ -174,7 +176,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
final boolean useDuplicateDetection, final boolean useDuplicateDetection,
final String user, final String user,
final String password, final String password,
final StorageManager storageManager) { final ActiveMQServer server) {
this.reconnectAttempts = reconnectAttempts; this.reconnectAttempts = reconnectAttempts;
@ -211,6 +213,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
this.user = user; this.user = user;
this.password = password; this.password = password;
this.server = server;
} }
public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID) { public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID) {
@ -603,7 +607,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
@Override @Override
public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) { public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
if (server.isStarted()) {
ActiveMQServerLogger.LOGGER.bridgeConnectionFailed(failedOver); ActiveMQServerLogger.LOGGER.bridgeConnectionFailed(failedOver);
}
synchronized (connectionGuard) { synchronized (connectionGuard) {
keepConnecting = true; keepConnecting = true;

View File

@ -37,8 +37,8 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
@ -100,13 +100,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
final boolean useDuplicateDetection, final boolean useDuplicateDetection,
final String user, final String user,
final String password, final String password,
final StorageManager storageManager, final ActiveMQServer server,
final SimpleString managementAddress, final SimpleString managementAddress,
final SimpleString managementNotificationAddress, final SimpleString managementNotificationAddress,
final MessageFlowRecord flowRecord, final MessageFlowRecord flowRecord,
final TransportConfiguration connector) { final TransportConfiguration connector) {
super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same
retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, storageManager); retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, server);
this.discoveryLocator = discoveryLocator; this.discoveryLocator = discoveryLocator;

View File

@ -800,7 +800,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
targetLocator.addIncomingInterceptor(new IncomingInterceptorLookingForExceptionMessage(manager, executorFactory.getExecutor())); targetLocator.addIncomingInterceptor(new IncomingInterceptorLookingForExceptionMessage(manager, executorFactory.getExecutor()));
MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue); MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue);
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server.getStorageManager(), managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector()); ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server, managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector());
targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")"); targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");

View File

@ -27,6 +27,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -45,14 +46,17 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
private final Set<FileStore> stores = new HashSet<>(); private final Set<FileStore> stores = new HashSet<>();
private double maxUsage; private double maxUsage;
private final Object monitorLock = new Object(); private final Object monitorLock = new Object();
private final IOCriticalErrorListener ioCriticalErrorListener;
public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService, public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService,
Executor executor, Executor executor,
long checkPeriod, long checkPeriod,
TimeUnit timeUnit, TimeUnit timeUnit,
double maxUsage) { double maxUsage,
IOCriticalErrorListener ioCriticalErrorListener) {
super(scheduledExecutorService, executor, checkPeriod, timeUnit, false); super(scheduledExecutorService, executor, checkPeriod, timeUnit, false);
this.maxUsage = maxUsage; this.maxUsage = maxUsage;
this.ioCriticalErrorListener = ioCriticalErrorListener;
} }
public FileStoreMonitor addCallback(Callback callback) { public FileStoreMonitor addCallback(Callback callback) {
@ -99,6 +103,8 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
if (over) { if (over) {
break; break;
} }
} catch (IOException ioe) {
ioCriticalErrorListener.onIOException(ioe, "IO Error while calculating disk usage", null);
} catch (Exception e) { } catch (Exception e) {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);
} }

View File

@ -2127,7 +2127,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
try { try {
injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f)); injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, shutdownOnCriticalIO));
} catch (Exception e) { } catch (Exception e) {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);
} }

View File

@ -33,11 +33,13 @@ public class FileLockNodeManager extends NodeManager {
private static final Logger logger = Logger.getLogger(FileLockNodeManager.class); private static final Logger logger = Logger.getLogger(FileLockNodeManager.class);
private static final int LIVE_LOCK_POS = 1; private static final long STATE_LOCK_POS = 0;
private static final int BACKUP_LOCK_POS = 2; private static final long LIVE_LOCK_POS = 1;
private static final int LOCK_LENGTH = 1; private static final long BACKUP_LOCK_POS = 2;
private static final long LOCK_LENGTH = 1;
private static final byte LIVE = 'L'; private static final byte LIVE = 'L';
@ -113,6 +115,7 @@ public class FileLockNodeManager extends NodeManager {
@Override @Override
public void awaitLiveNode() throws Exception { public void awaitLiveNode() throws Exception {
logger.debug("awaiting live node...");
do { do {
byte state = getState(); byte state = getState();
while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) { while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
@ -228,25 +231,52 @@ public class FileLockNodeManager extends NodeManager {
* @param status * @param status
* @throws IOException * @throws IOException
*/ */
private void writeFileLockStatus(byte status) throws IOException { private void writeFileLockStatus(byte status) throws Exception {
if (replicatedBackup && channel == null) if (replicatedBackup && channel == null)
return; return;
logger.debug("writing status: " + status);
ByteBuffer bb = ByteBuffer.allocateDirect(1); ByteBuffer bb = ByteBuffer.allocateDirect(1);
bb.put(status); bb.put(status);
bb.position(0); bb.position(0);
if (!channel.isOpen()) {
setUpServerLockFile();
}
FileLock lock = null;
try {
lock = lock(STATE_LOCK_POS);
channel.write(bb, 0); channel.write(bb, 0);
channel.force(true); channel.force(true);
} finally {
if (lock != null) {
lock.release();
}
}
} }
private byte getState() throws Exception { private byte getState() throws Exception {
byte result;
logger.debug("getting state...");
ByteBuffer bb = ByteBuffer.allocateDirect(1); ByteBuffer bb = ByteBuffer.allocateDirect(1);
int read; int read;
FileLock lock = null;
try {
lock = lock(STATE_LOCK_POS);
read = channel.read(bb, 0); read = channel.read(bb, 0);
if (read <= 0) { if (read <= 0) {
return FileLockNodeManager.NOT_STARTED; result = FileLockNodeManager.NOT_STARTED;
} else { } else {
return bb.get(0); result = bb.get(0);
} }
} finally {
if (lock != null) {
lock.release();
}
}
logger.debug("state: " + result);
return result;
} }
@Override @Override
@ -263,25 +293,27 @@ public class FileLockNodeManager extends NodeManager {
return getNodeId(); return getNodeId();
} }
protected FileLock tryLock(final int lockPos) throws Exception { protected FileLock tryLock(final long lockPos) throws IOException {
try { try {
return channel.tryLock(lockPos, LOCK_LENGTH, false); logger.debug("trying to lock position: " + lockPos);
FileLock lock = channel.tryLock(lockPos, LOCK_LENGTH, false);
if (lock != null) {
logger.debug("locked position: " + lockPos);
} else {
logger.debug("failed to lock position: " + lockPos);
}
return lock;
} catch (java.nio.channels.OverlappingFileLockException ex) { } catch (java.nio.channels.OverlappingFileLockException ex) {
// This just means that another object on the same JVM is holding the lock // This just means that another object on the same JVM is holding the lock
return null; return null;
} }
} }
protected FileLock lock(final int liveLockPos) throws Exception { protected FileLock lock(final long lockPosition) throws Exception {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
while (!interrupted) { while (!interrupted) {
FileLock lock = null; FileLock lock = tryLock(lockPosition);
try {
lock = channel.tryLock(liveLockPos, 1, false);
} catch (java.nio.channels.OverlappingFileLockException ex) {
// This just means that another object on the same JVM is holding the lock
}
if (lock == null) { if (lock == null) {
try { try {
@ -302,7 +334,7 @@ public class FileLockNodeManager extends NodeManager {
// need to investigate further and review // need to investigate further and review
FileLock lock; FileLock lock;
do { do {
lock = channel.tryLock(liveLockPos, 1, false); lock = tryLock(lockPosition);
if (lock == null) { if (lock == null) {
try { try {
Thread.sleep(500); Thread.sleep(500);

View File

@ -157,7 +157,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
ReplicationFailureListener listener = new ReplicationFailureListener(); ReplicationFailureListener listener = new ReplicationFailureListener();
rc.addCloseListener(listener); rc.addCloseListener(listener);
rc.addFailureListener(listener); rc.addFailureListener(listener);
replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory()); replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory());
replicationManager.start(); replicationManager.start();
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
@Override @Override

View File

@ -96,7 +96,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
}; };
final AtomicBoolean fakeReturn = new AtomicBoolean(false); final AtomicBoolean fakeReturn = new AtomicBoolean(false);
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999) { FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999, null) {
@Override @Override
protected double calculateUsage(FileStore store) throws IOException { protected double calculateUsage(FileStore store) throws IOException {
if (fakeReturn.get()) { if (fakeReturn.get()) {
@ -127,7 +127,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
@Test @Test
public void testScheduler() throws Exception { public void testScheduler() throws Exception {
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9); FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9, null);
final ReusableLatch latch = new ReusableLatch(5); final ReusableLatch latch = new ReusableLatch(5);
storeMonitor.addStore(getTestDirfile()); storeMonitor.addStore(getTestDirfile());

View File

@ -56,8 +56,8 @@ public interface ActiveMQXARecoveryLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void serverCachingCommand(Object runnable); void serverCachingCommand(Object runnable);
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.WARN)
@Message(id = 121005, value = "Invalid \"host\" value \"0.0.0.0\" detected for \"{0}\" connector. Switching to \"{1}\". If this new address is incorrect please manually configure the connector to use the proper one.", @Message(id = 122005, value = "Invalid \"host\" value \"0.0.0.0\" detected for \"{0}\" connector. Switching to \"{1}\". If this new address is incorrect please manually configure the connector to use the proper one.",
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void invalidHostForConnector(String name, String newHost); void invalidHostForConnector(String name, String newHost);

View File

@ -65,7 +65,11 @@ public class XARecoveryConfig {
final ClientProtocolManagerFactory clientProtocolManager) { final ClientProtocolManagerFactory clientProtocolManager) {
TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length]; TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length];
for (int i = 0; i < transportConfiguration.length; i++) { for (int i = 0; i < transportConfiguration.length; i++) {
if (clientProtocolManager != null) {
newTransportConfiguration[i] = clientProtocolManager.adaptTransportConfiguration(transportConfiguration[i].newTransportConfig("")); newTransportConfiguration[i] = clientProtocolManager.adaptTransportConfiguration(transportConfiguration[i].newTransportConfig(""));
} else {
newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig("");
}
} }
this.transportConfiguration = newTransportConfiguration; this.transportConfiguration = newTransportConfiguration;

View File

@ -167,6 +167,14 @@
<geronimo-annotation_1.2_spec.version>1.0</geronimo-annotation_1.2_spec.version> <geronimo-annotation_1.2_spec.version>1.0</geronimo-annotation_1.2_spec.version>
</properties> </properties>
<licenses>
<license>
<name>Apache License 2.0</name>
<url>http://repository.jboss.org/licenses/apache-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<scm> <scm>
<connection>scm:git:http://git-wip-us.apache.org/repos/asf/activemq-artemis.git</connection> <connection>scm:git:http://git-wip-us.apache.org/repos/asf/activemq-artemis.git</connection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/activemq-artemis.git</developerConnection> <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/activemq-artemis.git</developerConnection>

View File

@ -1,257 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(BMUnitRunner.class)
public class LargeMessageOverReplicationTest extends ActiveMQTestBase {
public static int messageChunkCount = 0;
private static final ReusableLatch ruleFired = new ReusableLatch(1);
private static ActiveMQServer backupServer;
private static ActiveMQServer liveServer;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000");
ActiveMQConnection connection;
Session session;
Queue queue;
MessageProducer producer;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
ruleFired.setCount(1);
messageChunkCount = 0;
TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
Configuration backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true));
Configuration liveConfig = createDefaultInVMConfig();
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
liveServer = createServer(liveConfig);
liveServer.getConfiguration().addQueueConfiguration(new CoreQueueConfiguration().setName("jms.queue.Queue").setAddress("jms.queue.Queue"));
liveServer.start();
waitForServerToStart(liveServer);
backupServer = createServer(backupConfig);
backupServer.start();
waitForServerToStart(backupServer);
// Just to make sure the expression worked
Assert.assertEquals(10000, factory.getMinLargeMessageSize());
Assert.assertEquals(10000, factory.getProducerWindowSize());
Assert.assertEquals(100, factory.getRetryInterval());
Assert.assertEquals(-1, factory.getReconnectAttempts());
Assert.assertTrue(factory.isHA());
connection = (ActiveMQConnection) factory.createConnection();
waitForRemoteBackup(connection.getSessionFactory(), 30);
session = connection.createSession(true, Session.SESSION_TRANSACTED);
queue = session.createQueue("Queue");
producer = session.createProducer(queue);
}
@After
public void stopServers() throws Exception {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
}
}
if (backupServer != null) {
backupServer.stop();
backupServer = null;
}
if (liveServer != null) {
liveServer.stop();
liveServer = null;
}
backupServer = liveServer = null;
}
/*
* simple test to induce a potential race condition where the server's acceptors are active, but the server's
* state != STARTED
*/
@Test
@BMRules(
rules = {@BMRule(
name = "InterruptSending",
targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext",
targetMethod = "sendLargeMessageChunk",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkSent();")})
public void testSendLargeMessage() throws Exception {
MapMessage message = createLargeMessage();
try {
producer.send(message);
Assert.fail("expected an exception");
// session.commit();
} catch (JMSException expected) {
}
session.rollback();
producer.send(message);
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
MapMessage messageRec = (MapMessage) consumer.receive(5000);
Assert.assertNotNull(messageRec);
for (int i = 0; i < 10; i++) {
Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
}
}
@Test
@BMRules(
rules = {@BMRule(
name = "InterruptReceive",
targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback",
targetMethod = "sendLargeMessageContinuation",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOverReplicationTest.messageChunkReceived();")})
public void testReceiveLargeMessage() throws Exception {
MapMessage message = createLargeMessage();
producer.send(message);
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
MapMessage messageRec = null;
try {
consumer.receive(5000);
Assert.fail("Expected a failure here");
} catch (JMSException expected) {
}
session.rollback();
messageRec = (MapMessage) consumer.receive(5000);
Assert.assertNotNull(messageRec);
session.commit();
for (int i = 0; i < 10; i++) {
Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
}
}
public static void messageChunkReceived() {
messageChunkCount++;
if (messageChunkCount == 100) {
final CountDownLatch latch = new CountDownLatch(1);
new Thread() {
@Override
public void run() {
try {
latch.countDown();
liveServer.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
try {
// just to make sure it's about to be stopped
// avoiding bootstrapping the thread as a delay
latch.await(1, TimeUnit.MINUTES);
} catch (Throwable ignored) {
}
}
}
public static void messageChunkSent() {
messageChunkCount++;
try {
if (messageChunkCount == 10) {
liveServer.stop(true);
System.err.println("activating");
if (!backupServer.waitForActivation(1, TimeUnit.MINUTES)) {
Logger.getLogger(LargeMessageOverReplicationTest.class).warn("Can't failover server");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private MapMessage createLargeMessage() throws JMSException {
MapMessage message = session.createMapMessage();
for (int i = 0; i < 10; i++) {
message.setBytes("test" + i, new byte[1024 * 1024]);
}
return message;
}
}

View File

@ -380,7 +380,8 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
serverSessions.add(session); serverSessions.add(session);
} }
} }
} while (running.get() && serverSessions.size() != numberOfSessions && timeout > System.currentTimeMillis()); }
while (running.get() && serverSessions.size() != numberOfSessions && timeout > System.currentTimeMillis());
System.err.println("Returning " + serverSessions.size() + " sessions"); System.err.println("Returning " + serverSessions.size() + " sessions");
return serverSessions; return serverSessions;

View File

@ -156,12 +156,6 @@
<artifactId>artemis-hornetq-protocol</artifactId> <artifactId>artemis-hornetq-protocol</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-features</artifactId>
<version>${project.version}</version>
<type>pom</type>
</dependency>
<!-- MQTT Deps --> <!-- MQTT Deps -->
<dependency> <dependency>

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.junit.Before;
import org.junit.Test;
public class FailoverTestWithDivert extends FailoverTestBase {
private static final String DIVERT_ADDRESS = "jms.queue.testQueue";
private static final String DIVERT_FORWARD_ADDRESS = "jms.queue.divertedQueue";
private ClientSessionFactoryInternal sf;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return getNettyAcceptorTransportConfiguration(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
return getNettyConnectorTransportConfiguration(live);
}
@Test
public void testUniqueIDsWithDivert() throws Exception {
Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
TransportConfiguration tc = createTransportConfiguration(true, false, params);
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(-1);
sf = createSessionFactoryAndWaitForTopology(locator, 2);
int minLarge = locator.getMinLargeMessageSize();
ClientSession session = sf.createSession(false, false);
addClientSession(session);
session.start();
final int num = 100;
ClientProducer producer = session.createProducer(DIVERT_ADDRESS);
for (int i = 0; i < num; i++) {
ClientMessage message = createLargeMessage(session, 2 * minLarge);
producer.send(message);
}
session.commit();
ClientConsumer consumer = session.createConsumer(DIVERT_ADDRESS);
for (int i = 0; i < num; i++) {
ClientMessage receivedFromSourceQueue = consumer.receive(5000);
assertNotNull(receivedFromSourceQueue);
receivedFromSourceQueue.acknowledge();
}
session.commit();
crash(session);
ClientConsumer consumer1 = session.createConsumer(DIVERT_FORWARD_ADDRESS);
for (int i = 0; i < num; i++) {
ClientMessage receivedFromTargetQueue = consumer1.receive(5000);
assertNotNull(receivedFromTargetQueue);
receivedFromTargetQueue.acknowledge();
}
session.commit();
}
private ClientMessage createLargeMessage(ClientSession session, final int largeSize) {
ClientMessage message = session.createMessage(true);
ActiveMQBuffer bodyBuffer = message.getBodyBuffer();
final int propSize = 10240;
while (bodyBuffer.writerIndex() < largeSize) {
byte[] prop = new byte[propSize];
bodyBuffer.writeBytes(prop);
}
return message;
}
}

View File

@ -189,7 +189,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
setupServer(false); setupServer(false);
try { try {
ClientSessionFactory sf = createSessionFactory(locator); ClientSessionFactory sf = createSessionFactory(locator);
manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory); manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory);
addActiveMQComponent(manager); addActiveMQComponent(manager);
manager.start(); manager.start();
Assert.fail("Exception was expected"); Assert.fail("Exception was expected");
@ -204,7 +204,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
public void testSendPackets() throws Exception { public void testSendPackets() throws Exception {
setupServer(true); setupServer(true);
StorageManager storage = getStorage(); JournalStorageManager storage = getStorage();
manager = liveServer.getReplicationManager(); manager = liveServer.getReplicationManager();
waitForComponent(manager); waitForComponent(manager);
@ -270,7 +270,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
manager.largeMessageWrite(500, new byte[1024]); manager.largeMessageWrite(500, new byte[1024]);
manager.largeMessageDelete(Long.valueOf(500)); manager.largeMessageDelete(Long.valueOf(500), storage);
blockOnReplication(storage, manager); blockOnReplication(storage, manager);

View File

@ -204,7 +204,10 @@ public class TimedBufferTest extends ActiveMQTestBase {
Assert.assertTrue(latchFlushed.await(5, TimeUnit.SECONDS)); Assert.assertTrue(latchFlushed.await(5, TimeUnit.SECONDS));
// The purpose of the timed buffer is to batch writes up to a millisecond.. or up to the size of the buffer. // The purpose of the timed buffer is to batch writes up to a millisecond.. or up to the size of the buffer.
Assert.assertTrue("Timed Buffer is not batching accordingly, it was expected to take at least 500 seconds batching multiple writes while it took " + (System.currentTimeMillis() - time) + " milliseconds", System.currentTimeMillis() - time >= 500); Assert.assertTrue("Timed Buffer is not batching accordingly, it was expected to take at least 500 seconds batching multiple writes while it took " + (System.currentTimeMillis() - time) + " milliseconds", System.currentTimeMillis() - time >= 450);
// ^^ there are some discounts that can happen inside the timed buffer that are still considered valid (like discounting the time it took to perform the operation itself
// for that reason the test has been failing (before this commit) at 499 or 480 milliseconds. So, I'm using a reasonable number close to 500 milliseconds that would still be valid for the test
// it should be in fact only writing once.. // it should be in fact only writing once..
// i will set for 3 just in case there's a GC or anything else happening on the test // i will set for 3 just in case there's a GC or anything else happening on the test