mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-06 10:09:01 +00:00
This closes #1304
This commit is contained in:
commit
cd5f508d60
@ -59,7 +59,8 @@ public class InputAbstract extends ActionAbstract {
|
||||
case "N":
|
||||
booleanValue = Boolean.FALSE; break;
|
||||
}
|
||||
} while (booleanValue == null);
|
||||
}
|
||||
while (booleanValue == null);
|
||||
|
||||
return booleanValue.booleanValue();
|
||||
}
|
||||
@ -85,7 +86,8 @@ public class InputAbstract extends ActionAbstract {
|
||||
} else {
|
||||
valid = true;
|
||||
}
|
||||
} while (!valid);
|
||||
}
|
||||
while (!valid);
|
||||
|
||||
return inputStr.trim();
|
||||
}
|
||||
@ -116,7 +118,8 @@ public class InputAbstract extends ActionAbstract {
|
||||
} else {
|
||||
valid = true;
|
||||
}
|
||||
} while (!valid);
|
||||
}
|
||||
while (!valid);
|
||||
|
||||
return inputStr.trim();
|
||||
}
|
||||
|
@ -78,7 +78,8 @@ public class AssertionLoggerHandler extends ExtHandler {
|
||||
if (findText(text)) {
|
||||
return true;
|
||||
}
|
||||
} while (timeMax > System.currentTimeMillis());
|
||||
}
|
||||
while (timeMax > System.currentTimeMillis());
|
||||
|
||||
return false;
|
||||
|
||||
|
@ -1311,7 +1311,8 @@ public class Base64 {
|
||||
int b = 0;
|
||||
do {
|
||||
b = in.read();
|
||||
} while (b >= 0 && decodabet[b & 0x7f] <= Base64.WHITE_SPACE_ENC);
|
||||
}
|
||||
while (b >= 0 && decodabet[b & 0x7f] <= Base64.WHITE_SPACE_ENC);
|
||||
|
||||
if (b < 0) {
|
||||
break; // Reads a -1 if end of stream
|
||||
|
@ -115,7 +115,8 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
|
||||
//we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
|
||||
//but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
|
||||
//this check fixes the issue
|
||||
} while (!tasks.isEmpty());
|
||||
}
|
||||
while (!tasks.isEmpty());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -293,6 +293,7 @@ public class UUIDTimer {
|
||||
if (++counter > UUIDTimer.MAX_WAIT_COUNT) {
|
||||
break;
|
||||
}
|
||||
} while (System.currentTimeMillis() < waitUntil);
|
||||
}
|
||||
while (System.currentTimeMillis() < waitUntil);
|
||||
}
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ public interface ICoreMessage extends Message {
|
||||
|
||||
|
||||
/** Used on large messages treatment */
|
||||
void copyHeadersAndProperties(final Message msg);
|
||||
void copyHeadersAndProperties(Message msg);
|
||||
|
||||
/**
|
||||
* @return Returns the message in Map form, useful when encoding to JSON
|
||||
|
@ -216,7 +216,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
|
||||
* @param autoCreated
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
void createAddress(final SimpleString address, Set<RoutingType> routingTypes, final boolean autoCreated) throws ActiveMQException;
|
||||
void createAddress(SimpleString address, Set<RoutingType> routingTypes, boolean autoCreated) throws ActiveMQException;
|
||||
|
||||
/**
|
||||
* Create Address with a single initial routing type
|
||||
@ -225,7 +225,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
|
||||
* @param autoCreated
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
void createAddress(final SimpleString address, RoutingType routingType, final boolean autoCreated) throws ActiveMQException;
|
||||
void createAddress(SimpleString address, RoutingType routingType, boolean autoCreated) throws ActiveMQException;
|
||||
|
||||
// Queue Operations ----------------------------------------------
|
||||
|
||||
|
@ -74,7 +74,7 @@ public interface ServerLocator extends AutoCloseable {
|
||||
* @throws Exception if a failure happened in creating the ClientSessionFactory or the
|
||||
* ServerLocator does not know about the passed in transportConfiguration
|
||||
*/
|
||||
ClientSessionFactory createSessionFactory(final String nodeID) throws Exception;
|
||||
ClientSessionFactory createSessionFactory(String nodeID) throws Exception;
|
||||
|
||||
/**
|
||||
* Creates a {@link ClientSessionFactory} to a specific server. The server must already be known
|
||||
@ -86,7 +86,7 @@ public interface ServerLocator extends AutoCloseable {
|
||||
* @throws Exception if a failure happened in creating the ClientSessionFactory or the
|
||||
* ServerLocator does not know about the passed in transportConfiguration
|
||||
*/
|
||||
ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception;
|
||||
ClientSessionFactory createSessionFactory(TransportConfiguration transportConfiguration) throws Exception;
|
||||
|
||||
/**
|
||||
* Creates a {@link ClientSessionFactory} to a specific server. The server must already be known
|
||||
@ -100,7 +100,7 @@ public interface ServerLocator extends AutoCloseable {
|
||||
* @throws Exception if a failure happened in creating the ClientSessionFactory or the
|
||||
* ServerLocator does not know about the passed in transportConfiguration
|
||||
*/
|
||||
ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration,
|
||||
ClientSessionFactory createSessionFactory(TransportConfiguration transportConfiguration,
|
||||
int reconnectAttempts,
|
||||
boolean failoverOnInitialConnection) throws Exception;
|
||||
|
||||
|
@ -116,7 +116,7 @@ public interface AddressControl {
|
||||
*/
|
||||
@Operation(desc = "Sends a TextMessage to a password-protected address.", impact = MBeanOperationInfo.ACTION)
|
||||
String sendMessage(@Parameter(name = "headers", desc = "The headers to add to the message") Map<String, String> headers,
|
||||
@Parameter(name = "type", desc = "A type for the message") final int type,
|
||||
@Parameter(name = "type", desc = "A type for the message") int type,
|
||||
@Parameter(name = "body", desc = "The body (byte[]) of the message encoded as a string using Base64") String body,
|
||||
@Parameter(name = "durable", desc = "Whether the message is durable") boolean durable,
|
||||
@Parameter(name = "user", desc = "The user to authenticate with") String user,
|
||||
|
@ -363,7 +363,7 @@ public interface QueueControl {
|
||||
*/
|
||||
@Operation(desc = "Sends a TextMessage to a password-protected destination.", impact = MBeanOperationInfo.ACTION)
|
||||
String sendMessage(@Parameter(name = "headers", desc = "The headers to add to the message") Map<String, String> headers,
|
||||
@Parameter(name = "type", desc = "A type for the message") final int type,
|
||||
@Parameter(name = "type", desc = "A type for the message") int type,
|
||||
@Parameter(name = "body", desc = "The body (byte[]) of the message encoded as a string using Base64") String body,
|
||||
@Parameter(name = "durable", desc = "Whether the message is durable") boolean durable,
|
||||
@Parameter(name = "user", desc = "The user to authenticate with") String user,
|
||||
|
@ -37,7 +37,7 @@ public interface ClientConsumerInternal extends ClientConsumer {
|
||||
|
||||
void handleLargeMessageContinuation(byte[] chunk, int flowControlSize, boolean isContinues) throws Exception;
|
||||
|
||||
void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException;
|
||||
void flowControl(int messageBytes, boolean discountSlowConsumer) throws ActiveMQException;
|
||||
|
||||
void clear(boolean waitForOnMessage) throws ActiveMQException;
|
||||
|
||||
|
@ -457,7 +457,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
|
||||
}
|
||||
|
||||
pos += numberOfBytesRead;
|
||||
} while (pos < minLargeMessageSize);
|
||||
}
|
||||
while (pos < minLargeMessageSize);
|
||||
|
||||
totalSize += pos;
|
||||
|
||||
|
@ -45,7 +45,7 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory {
|
||||
|
||||
int numSessions();
|
||||
|
||||
void removeSession(final ClientSessionInternal session, boolean failingOver);
|
||||
void removeSession(ClientSessionInternal session, boolean failingOver);
|
||||
|
||||
void connect(int reconnectAttempts, boolean failoverOnInitialConnection) throws ActiveMQException;
|
||||
|
||||
|
@ -33,7 +33,7 @@ public interface ClientSessionInternal extends ClientSession {
|
||||
|
||||
void acknowledge(ClientConsumer consumer, Message message) throws ActiveMQException;
|
||||
|
||||
void individualAcknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException;
|
||||
void individualAcknowledge(ClientConsumer consumer, Message message) throws ActiveMQException;
|
||||
|
||||
boolean isCacheLargeMessageClient();
|
||||
|
||||
|
@ -46,13 +46,13 @@ public interface LargeMessageController extends ActiveMQBuffer {
|
||||
/**
|
||||
* Sets the OutputStream of this buffer to the specified output.
|
||||
*/
|
||||
void setOutputStream(final OutputStream output) throws ActiveMQException;
|
||||
void setOutputStream(OutputStream output) throws ActiveMQException;
|
||||
|
||||
/**
|
||||
* Saves this buffer to the specified output. This is just a blocking version of
|
||||
* {@link #setOutputStream(OutputStream)}.
|
||||
*/
|
||||
void saveBuffer(final OutputStream output) throws ActiveMQException;
|
||||
void saveBuffer(OutputStream output) throws ActiveMQException;
|
||||
|
||||
void addPacket(byte[] chunk, int flowControlSize, boolean isContinues);
|
||||
|
||||
|
@ -792,7 +792,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} while (retry);
|
||||
}
|
||||
while (retry);
|
||||
}
|
||||
|
||||
// ATM topology is never != null. Checking here just to be consistent with
|
||||
|
@ -29,7 +29,7 @@ public interface ServerLocatorInternal extends ServerLocator {
|
||||
|
||||
void start(Executor executor) throws Exception;
|
||||
|
||||
void factoryClosed(final ClientSessionFactory factory);
|
||||
void factoryClosed(ClientSessionFactory factory);
|
||||
|
||||
AfterConnectInternalListener getAfterConnectInternalListener();
|
||||
|
||||
|
@ -73,7 +73,7 @@ public interface Channel {
|
||||
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
|
||||
* successful
|
||||
*/
|
||||
boolean send(Packet packet, final int reconnectID);
|
||||
boolean send(Packet packet, int reconnectID);
|
||||
|
||||
/**
|
||||
* Sends a packet on this channel using batching algorithm if appropriate
|
||||
|
@ -324,7 +324,8 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
|
||||
inCreateSession = false;
|
||||
inCreateSessionLatch.countDown();
|
||||
}
|
||||
} while (retry);
|
||||
}
|
||||
while (retry);
|
||||
return newSessionContext(name, confirmationWindowSize, sessionChannel, response);
|
||||
|
||||
}
|
||||
|
@ -676,7 +676,8 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} while (retry && !session.isClosing());
|
||||
}
|
||||
while (retry && !session.isClosing());
|
||||
}
|
||||
|
||||
protected CreateSessionMessage newCreateSession(String username,
|
||||
|
@ -160,7 +160,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SUB
|
||||
|
||||
public abstract class PacketDecoder implements Serializable {
|
||||
|
||||
public abstract Packet decode(final ActiveMQBuffer in);
|
||||
public abstract Packet decode(ActiveMQBuffer in);
|
||||
|
||||
public Packet decode(byte packetType) {
|
||||
Packet packet;
|
||||
|
@ -58,13 +58,13 @@ public interface ClientProtocolManager {
|
||||
|
||||
void ping(long connectionTTL);
|
||||
|
||||
SessionContext createSessionContext(final String name,
|
||||
final String username,
|
||||
final String password,
|
||||
final boolean xa,
|
||||
final boolean autoCommitSends,
|
||||
final boolean autoCommitAcks,
|
||||
final boolean preAcknowledge,
|
||||
SessionContext createSessionContext(String name,
|
||||
String username,
|
||||
String password,
|
||||
boolean xa,
|
||||
boolean autoCommitSends,
|
||||
boolean autoCommitAcks,
|
||||
boolean preAcknowledge,
|
||||
int minLargeMessageSize,
|
||||
int confirmationWindowSize) throws ActiveMQException;
|
||||
|
||||
|
@ -94,7 +94,7 @@ public abstract class SessionContext {
|
||||
}
|
||||
|
||||
protected void handleReceiveMessage(ConsumerContext consumerID,
|
||||
final ClientMessageInternal message) throws Exception {
|
||||
ClientMessageInternal message) throws Exception {
|
||||
|
||||
ClientSessionInternal session = this.session;
|
||||
if (session != null) {
|
||||
@ -102,7 +102,7 @@ public abstract class SessionContext {
|
||||
}
|
||||
}
|
||||
|
||||
protected void handleReceiveContinuation(final ConsumerContext consumerID,
|
||||
protected void handleReceiveContinuation(ConsumerContext consumerID,
|
||||
byte[] chunk,
|
||||
int flowControlSize,
|
||||
boolean isContinues) throws Exception {
|
||||
@ -159,7 +159,7 @@ public abstract class SessionContext {
|
||||
byte[] chunk,
|
||||
SendAcknowledgementHandler messageHandler) throws ActiveMQException;
|
||||
|
||||
public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler);
|
||||
public abstract void setSendAcknowledgementHandler(SendAcknowledgementHandler handler);
|
||||
|
||||
/**
|
||||
* Creates a shared queue using the routing type set by the Address. If the Address supports more than one type of delivery
|
||||
@ -210,7 +210,7 @@ public abstract class SessionContext {
|
||||
|
||||
public abstract void forceDelivery(ClientConsumer consumer, long sequence) throws ActiveMQException;
|
||||
|
||||
public abstract ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException;
|
||||
public abstract ClientSession.AddressQuery addressQuery(SimpleString address) throws ActiveMQException;
|
||||
|
||||
public abstract void simpleCommit() throws ActiveMQException;
|
||||
|
||||
@ -231,10 +231,10 @@ public abstract class SessionContext {
|
||||
|
||||
public abstract void sendACK(boolean individual,
|
||||
boolean block,
|
||||
final ClientConsumer consumer,
|
||||
final Message message) throws ActiveMQException;
|
||||
ClientConsumer consumer,
|
||||
Message message) throws ActiveMQException;
|
||||
|
||||
public abstract void expireMessage(final ClientConsumer consumer, Message message) throws ActiveMQException;
|
||||
public abstract void expireMessage(ClientConsumer consumer, Message message) throws ActiveMQException;
|
||||
|
||||
public abstract void sessionClose() throws ActiveMQException;
|
||||
|
||||
@ -242,7 +242,7 @@ public abstract class SessionContext {
|
||||
|
||||
public abstract void addUniqueMetaData(String key, String data) throws ActiveMQException;
|
||||
|
||||
public abstract void sendProducerCreditsMessage(final int credits, final SimpleString address);
|
||||
public abstract void sendProducerCreditsMessage(int credits, SimpleString address);
|
||||
|
||||
public abstract void xaCommit(Xid xid, boolean onePhase) throws XAException, ActiveMQException;
|
||||
|
||||
@ -278,13 +278,13 @@ public abstract class SessionContext {
|
||||
|
||||
public abstract int getServerVersion();
|
||||
|
||||
public abstract void recreateSession(final String username,
|
||||
final String password,
|
||||
final int minLargeMessageSize,
|
||||
final boolean xa,
|
||||
final boolean autoCommitSends,
|
||||
final boolean autoCommitAcks,
|
||||
final boolean preAcknowledge) throws ActiveMQException;
|
||||
public abstract void recreateSession(String username,
|
||||
String password,
|
||||
int minLargeMessageSize,
|
||||
boolean xa,
|
||||
boolean autoCommitSends,
|
||||
boolean autoCommitAcks,
|
||||
boolean preAcknowledge) throws ActiveMQException;
|
||||
|
||||
public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException;
|
||||
|
||||
|
@ -26,12 +26,12 @@ public interface TopologyResponseHandler {
|
||||
void nodeDisconnected(RemotingConnection conn, String nodeID, String scaleDownTargetNodeID);
|
||||
|
||||
void notifyNodeUp(long uniqueEventID,
|
||||
final String backupGroupName,
|
||||
final String scaleDownGroupName,
|
||||
final String nodeName,
|
||||
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
|
||||
final boolean isLast);
|
||||
String backupGroupName,
|
||||
String scaleDownGroupName,
|
||||
String nodeName,
|
||||
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
|
||||
boolean isLast);
|
||||
|
||||
// This is sent when any node on the cluster topology is going down
|
||||
void notifyNodeDown(final long eventTime, final String nodeID);
|
||||
void notifyNodeDown(long eventTime, String nodeID);
|
||||
}
|
||||
|
@ -117,7 +117,8 @@ public class JDBCUtils {
|
||||
formatSqlException(errorMessage, nextEx);
|
||||
nextEx = exception.getNextException();
|
||||
level++;
|
||||
} while (nextEx != null);
|
||||
}
|
||||
while (nextEx != null);
|
||||
return errorMessage;
|
||||
}
|
||||
|
||||
|
@ -260,7 +260,7 @@ public enum JMSFactoryType {
|
||||
* @param groupConfiguration
|
||||
* @return the ActiveMQConnectionFactory
|
||||
*/
|
||||
public abstract ActiveMQConnectionFactory createConnectionFactoryWithHA(final DiscoveryGroupConfiguration groupConfiguration);
|
||||
public abstract ActiveMQConnectionFactory createConnectionFactoryWithHA(DiscoveryGroupConfiguration groupConfiguration);
|
||||
|
||||
/**
|
||||
* Create an ActiveMQConnectionFactory which creates session factories from a set of live servers, no HA backup information is propagated to the client
|
||||
@ -270,7 +270,7 @@ public enum JMSFactoryType {
|
||||
* @param groupConfiguration
|
||||
* @return the ActiveMQConnectionFactory
|
||||
*/
|
||||
public abstract ActiveMQConnectionFactory createConnectionFactoryWithoutHA(final DiscoveryGroupConfiguration groupConfiguration);
|
||||
public abstract ActiveMQConnectionFactory createConnectionFactoryWithoutHA(DiscoveryGroupConfiguration groupConfiguration);
|
||||
|
||||
/**
|
||||
* Create an ActiveMQConnectionFactory which will receive cluster topology updates from the cluster
|
||||
@ -287,7 +287,7 @@ public enum JMSFactoryType {
|
||||
* is made, the cluster topology is downloaded and the rest of the list is ignored.
|
||||
* @return the ActiveMQConnectionFactory
|
||||
*/
|
||||
public abstract ActiveMQConnectionFactory createConnectionFactoryWithHA(final TransportConfiguration... initialServers);
|
||||
public abstract ActiveMQConnectionFactory createConnectionFactoryWithHA(TransportConfiguration... initialServers);
|
||||
|
||||
/**
|
||||
* Create an ActiveMQConnectionFactory which creates session factories using a static list of
|
||||
@ -299,7 +299,7 @@ public enum JMSFactoryType {
|
||||
* @param transportConfigurations
|
||||
* @return the ActiveMQConnectionFactory
|
||||
*/
|
||||
public abstract ActiveMQConnectionFactory createConnectionFactoryWithoutHA(final TransportConfiguration... transportConfigurations);
|
||||
public abstract ActiveMQConnectionFactory createConnectionFactoryWithoutHA(TransportConfiguration... transportConfigurations);
|
||||
|
||||
/**
|
||||
* Returns the connection factory interface that this JMSFactoryType creates.
|
||||
|
@ -1314,7 +1314,8 @@ public final class JMSBridgeImpl implements JMSBridge {
|
||||
ActiveMQJMSBridgeLogger.LOGGER.warn(e.getMessage() + ", retrying TX", e);
|
||||
exHappened = true;
|
||||
}
|
||||
} while (exHappened);
|
||||
}
|
||||
while (exHappened);
|
||||
|
||||
if (maxBatchSize > 1) {
|
||||
// The sending session is transacted - we need to commit it
|
||||
|
@ -28,12 +28,12 @@ public interface JMSServerConfigParser {
|
||||
/**
|
||||
* Parse the JMS Configuration XML as a JMSConfiguration object
|
||||
*/
|
||||
JMSConfiguration parseConfiguration(final InputStream stream) throws Exception;
|
||||
JMSConfiguration parseConfiguration(InputStream stream) throws Exception;
|
||||
|
||||
/**
|
||||
* Parse the JMS Configuration XML as a JMSConfiguration object
|
||||
*/
|
||||
JMSConfiguration parseConfiguration(final Node rootnode) throws Exception;
|
||||
JMSConfiguration parseConfiguration(Node rootnode) throws Exception;
|
||||
|
||||
/**
|
||||
* Parse the topic node as a TopicConfiguration object
|
||||
@ -42,7 +42,7 @@ public interface JMSServerConfigParser {
|
||||
* @return {@link TopicConfiguration} parsed from the node
|
||||
* @throws Exception
|
||||
*/
|
||||
TopicConfiguration parseTopicConfiguration(final Node node) throws Exception;
|
||||
TopicConfiguration parseTopicConfiguration(Node node) throws Exception;
|
||||
|
||||
/**
|
||||
* Parse the Queue Configuration node as a QueueConfiguration object
|
||||
@ -51,5 +51,5 @@ public interface JMSServerConfigParser {
|
||||
* @return {@link JMSQueueConfiguration} parsed from the node
|
||||
* @throws Exception
|
||||
*/
|
||||
JMSQueueConfiguration parseQueueConfiguration(final Node node) throws Exception;
|
||||
JMSQueueConfiguration parseQueueConfiguration(Node node) throws Exception;
|
||||
}
|
||||
|
@ -60,11 +60,11 @@ public interface JMSServerManager extends ActiveMQComponent {
|
||||
boolean durable,
|
||||
String... bindings) throws Exception;
|
||||
|
||||
boolean addTopicToBindingRegistry(final String topicName, final String binding) throws Exception;
|
||||
boolean addTopicToBindingRegistry(String topicName, String binding) throws Exception;
|
||||
|
||||
boolean addQueueToBindingRegistry(final String queueName, final String binding) throws Exception;
|
||||
boolean addQueueToBindingRegistry(String queueName, String binding) throws Exception;
|
||||
|
||||
boolean addConnectionFactoryToBindingRegistry(final String name, final String binding) throws Exception;
|
||||
boolean addConnectionFactoryToBindingRegistry(String name, String binding) throws Exception;
|
||||
|
||||
/**
|
||||
* Creates a JMS Topic
|
||||
@ -306,7 +306,7 @@ public interface JMSServerManager extends ActiveMQComponent {
|
||||
|
||||
void addSecurity(String addressMatch, Set<Role> roles);
|
||||
|
||||
Set<Role> getSecurity(final String addressMatch);
|
||||
Set<Role> getSecurity(String addressMatch);
|
||||
|
||||
BindingRegistry getRegistry();
|
||||
|
||||
|
@ -98,12 +98,12 @@ public interface Journal extends ActiveMQComponent {
|
||||
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
|
||||
}
|
||||
|
||||
void appendUpdateRecord(final long id,
|
||||
final byte recordType,
|
||||
final Persister persister,
|
||||
final Object record,
|
||||
final boolean sync,
|
||||
final IOCompletion callback) throws Exception;
|
||||
void appendUpdateRecord(long id,
|
||||
byte recordType,
|
||||
Persister persister,
|
||||
Object record,
|
||||
boolean sync,
|
||||
IOCompletion callback) throws Exception;
|
||||
|
||||
void appendDeleteRecord(long id, boolean sync) throws Exception;
|
||||
|
||||
@ -117,11 +117,11 @@ public interface Journal extends ActiveMQComponent {
|
||||
appendAddRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record);
|
||||
}
|
||||
|
||||
void appendAddRecordTransactional(final long txID,
|
||||
final long id,
|
||||
final byte recordType,
|
||||
final Persister persister,
|
||||
final Object record) throws Exception;
|
||||
void appendAddRecordTransactional(long txID,
|
||||
long id,
|
||||
byte recordType,
|
||||
Persister persister,
|
||||
Object record) throws Exception;
|
||||
|
||||
void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
|
||||
|
||||
|
@ -271,7 +271,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 144006, value = "IOError code {0}, {1}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void ioError(final int errorCode, final String errorMessage);
|
||||
void ioError(int errorCode, String errorMessage);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 144007, value = "Ignoring journal file {0}: file is shorter then minimum header size. This file is being removed.", format = Message.Format.MESSAGE_FORMAT)
|
||||
|
@ -163,7 +163,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
|
||||
} finally {
|
||||
server.getStorageManager().clearContext();
|
||||
}
|
||||
} while (conn.hasBytes());
|
||||
}
|
||||
while (conn.hasBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -311,7 +311,7 @@ public interface Configuration {
|
||||
*/
|
||||
Configuration setAcceptorConfigurations(Set<TransportConfiguration> infos);
|
||||
|
||||
Configuration addAcceptorConfiguration(final TransportConfiguration infos);
|
||||
Configuration addAcceptorConfiguration(TransportConfiguration infos);
|
||||
|
||||
/**
|
||||
* Add an acceptor to the config
|
||||
@ -335,9 +335,9 @@ public interface Configuration {
|
||||
*/
|
||||
Configuration setConnectorConfigurations(Map<String, TransportConfiguration> infos);
|
||||
|
||||
Configuration addConnectorConfiguration(final String key, final TransportConfiguration info);
|
||||
Configuration addConnectorConfiguration(String key, TransportConfiguration info);
|
||||
|
||||
Configuration addConnectorConfiguration(final String name, final String uri) throws Exception;
|
||||
Configuration addConnectorConfiguration(String name, String uri) throws Exception;
|
||||
|
||||
Configuration clearConnectorConfigurations();
|
||||
|
||||
@ -351,7 +351,7 @@ public interface Configuration {
|
||||
*/
|
||||
Configuration setBroadcastGroupConfigurations(List<BroadcastGroupConfiguration> configs);
|
||||
|
||||
Configuration addBroadcastGroupConfiguration(final BroadcastGroupConfiguration config);
|
||||
Configuration addBroadcastGroupConfiguration(BroadcastGroupConfiguration config);
|
||||
|
||||
/**
|
||||
* Returns the discovery groups configured for this server.
|
||||
@ -363,7 +363,7 @@ public interface Configuration {
|
||||
*/
|
||||
Configuration setDiscoveryGroupConfigurations(Map<String, DiscoveryGroupConfiguration> configs);
|
||||
|
||||
Configuration addDiscoveryGroupConfiguration(final String key,
|
||||
Configuration addDiscoveryGroupConfiguration(String key,
|
||||
DiscoveryGroupConfiguration discoveryGroupConfiguration);
|
||||
|
||||
/**
|
||||
@ -384,7 +384,7 @@ public interface Configuration {
|
||||
/**
|
||||
* Sets the bridges configured for this server.
|
||||
*/
|
||||
Configuration setBridgeConfigurations(final List<BridgeConfiguration> configs);
|
||||
Configuration setBridgeConfigurations(List<BridgeConfiguration> configs);
|
||||
|
||||
/**
|
||||
* Returns the diverts configured for this server.
|
||||
@ -394,9 +394,9 @@ public interface Configuration {
|
||||
/**
|
||||
* Sets the diverts configured for this server.
|
||||
*/
|
||||
Configuration setDivertConfigurations(final List<DivertConfiguration> configs);
|
||||
Configuration setDivertConfigurations(List<DivertConfiguration> configs);
|
||||
|
||||
Configuration addDivertConfiguration(final DivertConfiguration config);
|
||||
Configuration addDivertConfiguration(DivertConfiguration config);
|
||||
|
||||
/**
|
||||
* Returns the cluster connections configured for this server.
|
||||
@ -409,9 +409,9 @@ public interface Configuration {
|
||||
/**
|
||||
* Sets the cluster connections configured for this server.
|
||||
*/
|
||||
Configuration setClusterConfigurations(final List<ClusterConnectionConfiguration> configs);
|
||||
Configuration setClusterConfigurations(List<ClusterConnectionConfiguration> configs);
|
||||
|
||||
Configuration addClusterConfiguration(final ClusterConnectionConfiguration config);
|
||||
Configuration addClusterConfiguration(ClusterConnectionConfiguration config);
|
||||
|
||||
ClusterConnectionConfiguration addClusterConfiguration(String name, String uri) throws Exception;
|
||||
|
||||
@ -425,9 +425,9 @@ public interface Configuration {
|
||||
/**
|
||||
* Sets the queues configured for this server.
|
||||
*/
|
||||
Configuration setQueueConfigurations(final List<CoreQueueConfiguration> configs);
|
||||
Configuration setQueueConfigurations(List<CoreQueueConfiguration> configs);
|
||||
|
||||
Configuration addQueueConfiguration(final CoreQueueConfiguration config);
|
||||
Configuration addQueueConfiguration(CoreQueueConfiguration config);
|
||||
|
||||
/**
|
||||
* Returns the addresses configured for this server.
|
||||
@ -437,12 +437,12 @@ public interface Configuration {
|
||||
/**
|
||||
* Sets the addresses configured for this server.
|
||||
*/
|
||||
Configuration setAddressConfigurations(final List<CoreAddressConfiguration> configs);
|
||||
Configuration setAddressConfigurations(List<CoreAddressConfiguration> configs);
|
||||
|
||||
/**
|
||||
* Adds an addresses configuration
|
||||
*/
|
||||
Configuration addAddressConfiguration(final CoreAddressConfiguration config);
|
||||
Configuration addAddressConfiguration(CoreAddressConfiguration config);
|
||||
|
||||
/**
|
||||
* Returns the management address of this server. <br>
|
||||
@ -938,9 +938,9 @@ public interface Configuration {
|
||||
|
||||
Configuration addConnectorServiceConfiguration(ConnectorServiceConfiguration config);
|
||||
|
||||
Configuration setSecuritySettingPlugins(final List<SecuritySettingPlugin> plugins);
|
||||
Configuration setSecuritySettingPlugins(List<SecuritySettingPlugin> plugins);
|
||||
|
||||
Configuration addSecuritySettingPlugin(final SecuritySettingPlugin plugin);
|
||||
Configuration addSecuritySettingPlugin(SecuritySettingPlugin plugin);
|
||||
|
||||
/**
|
||||
* @return list of {@link ConnectorServiceConfiguration}
|
||||
|
@ -45,10 +45,10 @@ public interface PageTransactionInfo extends EncodingSupport {
|
||||
|
||||
void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx) throws Exception;
|
||||
|
||||
void reloadUpdate(final StorageManager storageManager,
|
||||
final PagingManager pagingManager,
|
||||
final Transaction tx,
|
||||
final int increment) throws Exception;
|
||||
void reloadUpdate(StorageManager storageManager,
|
||||
PagingManager pagingManager,
|
||||
Transaction tx,
|
||||
int increment) throws Exception;
|
||||
|
||||
// To be used after the update was stored or reload
|
||||
void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
|
||||
|
@ -93,9 +93,9 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
|
||||
*/
|
||||
boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
|
||||
|
||||
Page createPage(final int page) throws Exception;
|
||||
Page createPage(int page) throws Exception;
|
||||
|
||||
boolean checkPageFileExists(final int page) throws Exception;
|
||||
boolean checkPageFileExists(int page) throws Exception;
|
||||
|
||||
PagingManager getPagingManager();
|
||||
|
||||
|
@ -31,7 +31,7 @@ public interface PageCursorProvider {
|
||||
|
||||
PageCache getPageCache(long pageNr);
|
||||
|
||||
PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub);
|
||||
PagedReference newReference(PagePosition pos, PagedMessage msg, PageSubscription sub);
|
||||
|
||||
void addPageCache(PageCache cache);
|
||||
|
||||
|
@ -64,7 +64,7 @@ public interface PageSubscription {
|
||||
|
||||
void scheduleCleanupCheck();
|
||||
|
||||
void cleanupEntries(final boolean completeDelete) throws Exception;
|
||||
void cleanupEntries(boolean completeDelete) throws Exception;
|
||||
|
||||
void onPageModeCleared(Transaction tx) throws Exception;
|
||||
|
||||
@ -118,7 +118,7 @@ public interface PageSubscription {
|
||||
|
||||
void processReload() throws Exception;
|
||||
|
||||
void addPendingDelivery(final PagePosition position);
|
||||
void addPendingDelivery(PagePosition position);
|
||||
|
||||
/**
|
||||
* To be used on redeliveries
|
||||
|
@ -28,9 +28,9 @@ public interface PageSubscriptionCounter {
|
||||
|
||||
void increment(Transaction tx, int add) throws Exception;
|
||||
|
||||
void loadValue(final long recordValueID, final long value);
|
||||
void loadValue(long recordValueID, long value);
|
||||
|
||||
void loadInc(final long recordInd, final int add);
|
||||
void loadInc(long recordInd, int add);
|
||||
|
||||
void applyIncrementOnTX(Transaction tx, long recordID, int add);
|
||||
|
||||
@ -54,6 +54,6 @@ public interface PageSubscriptionCounter {
|
||||
// for each queue on the address
|
||||
void delete(Transaction tx) throws Exception;
|
||||
|
||||
void cleanupNonTXCounters(final long pageID) throws Exception;
|
||||
void cleanupNonTXCounters(long pageID) throws Exception;
|
||||
|
||||
}
|
@ -1257,7 +1257,8 @@ final class PageSubscriptionImpl implements PageSubscription {
|
||||
} else if (!browsing && ignored) {
|
||||
positionIgnored(message.getPosition());
|
||||
}
|
||||
} while (!match);
|
||||
}
|
||||
while (!match);
|
||||
|
||||
if (message != null) {
|
||||
lastOperation = lastPosition;
|
||||
|
@ -246,7 +246,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
|
||||
* @param extension the extension to add to the file
|
||||
* @return
|
||||
*/
|
||||
SequentialFile createFileForLargeMessage(final long messageID, LargeMessageExtension extension);
|
||||
SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension);
|
||||
|
||||
void prepare(long txID, Xid xid) throws Exception;
|
||||
|
||||
@ -266,14 +266,14 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
|
||||
|
||||
void deletePageTransactional(long recordID) throws Exception;
|
||||
|
||||
JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
|
||||
final PagingManager pagingManager,
|
||||
final ResourceManager resourceManager,
|
||||
JournalLoadInformation loadMessageJournal(PostOffice postOffice,
|
||||
PagingManager pagingManager,
|
||||
ResourceManager resourceManager,
|
||||
Map<Long, QueueBindingInfo> queueInfos,
|
||||
final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
|
||||
final Set<Pair<Long, Long>> pendingLargeMessages,
|
||||
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
|
||||
Set<Pair<Long, Long>> pendingLargeMessages,
|
||||
List<PageCountPending> pendingNonTXPageCounter,
|
||||
final JournalLoader journalLoader) throws Exception;
|
||||
JournalLoader journalLoader) throws Exception;
|
||||
|
||||
long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;
|
||||
|
||||
|
@ -25,7 +25,7 @@ public interface DuplicateIDCache {
|
||||
|
||||
boolean contains(byte[] duplicateID);
|
||||
|
||||
boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception;
|
||||
boolean atomicVerify(byte[] duplID, Transaction tx) throws Exception;
|
||||
|
||||
void addToCache(byte[] duplicateID) throws Exception;
|
||||
|
||||
@ -42,7 +42,7 @@ public interface DuplicateIDCache {
|
||||
|
||||
void load(List<Pair<byte[], Long>> theIds) throws Exception;
|
||||
|
||||
void load(final Transaction tx, final byte[] duplID);
|
||||
void load(Transaction tx, byte[] duplID);
|
||||
|
||||
void clear() throws Exception;
|
||||
|
||||
|
@ -119,10 +119,10 @@ public interface PostOffice extends ActiveMQComponent {
|
||||
MessageReference reroute(Message message, Queue queue, Transaction tx) throws Exception;
|
||||
|
||||
Pair<RoutingContext, Message> redistribute(Message message,
|
||||
final Queue originatingQueue,
|
||||
Queue originatingQueue,
|
||||
Transaction tx) throws Exception;
|
||||
|
||||
void processRoute(final Message message, final RoutingContext context, final boolean direct) throws Exception;
|
||||
void processRoute(Message message, RoutingContext context, boolean direct) throws Exception;
|
||||
|
||||
DuplicateIDCache getDuplicateIDCache(SimpleString address);
|
||||
|
||||
@ -133,7 +133,7 @@ public interface PostOffice extends ActiveMQComponent {
|
||||
// we can't start expiry scanner until the system is load otherwise we may get weird races - https://issues.jboss.org/browse/HORNETQ-1142
|
||||
void startExpiryScanner();
|
||||
|
||||
boolean isAddressBound(final SimpleString address) throws Exception;
|
||||
boolean isAddressBound(SimpleString address) throws Exception;
|
||||
|
||||
Set<SimpleString> getAddresses();
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
|
||||
public interface ConnectionCreator extends ChannelHandler {
|
||||
|
||||
NettyServerConnection createConnection(final ChannelHandlerContext ctx,
|
||||
NettyServerConnection createConnection(ChannelHandlerContext ctx,
|
||||
String protocol,
|
||||
boolean httpEnabled) throws Exception;
|
||||
}
|
||||
|
@ -144,7 +144,8 @@ public class HttpAcceptorHandler extends ChannelDuplexHandler {
|
||||
return;
|
||||
// otherwise ignore, we'll just try again
|
||||
}
|
||||
} while (responseHolder == null);
|
||||
}
|
||||
while (responseHolder == null);
|
||||
if (!bogusResponse) {
|
||||
piggyBackResponses(responseHolder.response.content());
|
||||
responseHolder.response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(responseHolder.response.content().readableBytes()));
|
||||
@ -175,7 +176,8 @@ public class HttpAcceptorHandler extends ChannelDuplexHandler {
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
}
|
||||
} while (responses.isEmpty());
|
||||
}
|
||||
while (responses.isEmpty());
|
||||
return;
|
||||
}
|
||||
buf.writeBytes(buffer);
|
||||
|
@ -534,8 +534,8 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
|
||||
|
||||
try {
|
||||
try (final FileInputStream fis = new FileInputStream(file.getJavaFile());
|
||||
final FileChannel channel = fis.getChannel()) {
|
||||
try (FileInputStream fis = new FileInputStream(file.getJavaFile());
|
||||
FileChannel channel = fis.getChannel()) {
|
||||
// We can afford having a single buffer here for this entire loop
|
||||
// because sendReplicatePacket will encode the packet as a NettyBuffer
|
||||
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
|
||||
|
@ -78,5 +78,5 @@ public enum CheckType {
|
||||
}
|
||||
};
|
||||
|
||||
public abstract boolean hasRole(final Role role);
|
||||
public abstract boolean hasRole(Role role);
|
||||
}
|
||||
|
@ -296,14 +296,14 @@ public interface ActiveMQServer extends ServiceComponent {
|
||||
* @throws org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseException if the shared queue already exists with a different {@code address} or {@code filterString}
|
||||
* @throws NullPointerException if {@code address} is {@code null}
|
||||
*/
|
||||
void createSharedQueue(final SimpleString address, final RoutingType routingType, final SimpleString name, final SimpleString filterString,
|
||||
final SimpleString user, boolean durable) throws Exception;
|
||||
void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString name, SimpleString filterString,
|
||||
SimpleString user, boolean durable) throws Exception;
|
||||
|
||||
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
|
||||
boolean durable, boolean temporary) throws Exception;
|
||||
|
||||
Queue createQueue(final SimpleString address, final RoutingType routingType, final SimpleString queueName, final SimpleString user,
|
||||
final SimpleString filterString, final boolean durable, final boolean temporary) throws Exception;
|
||||
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString user,
|
||||
SimpleString filterString, boolean durable, boolean temporary) throws Exception;
|
||||
|
||||
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
|
||||
boolean durable, boolean temporary, int maxConsumers, boolean purgeOnNoConsumers,
|
||||
|
@ -87,15 +87,15 @@ public interface Queue extends Bindable {
|
||||
|
||||
void addHead(MessageReference ref, boolean scheduling);
|
||||
|
||||
void addHead(final List<MessageReference> refs, boolean scheduling);
|
||||
void addHead(List<MessageReference> refs, boolean scheduling);
|
||||
|
||||
void acknowledge(MessageReference ref) throws Exception;
|
||||
|
||||
void acknowledge(final MessageReference ref, AckReason reason) throws Exception;
|
||||
void acknowledge(MessageReference ref, AckReason reason) throws Exception;
|
||||
|
||||
void acknowledge(Transaction tx, MessageReference ref) throws Exception;
|
||||
|
||||
void acknowledge(final Transaction tx, final MessageReference ref, AckReason reason) throws Exception;
|
||||
void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception;
|
||||
|
||||
void reacknowledge(Transaction tx, MessageReference ref) throws Exception;
|
||||
|
||||
@ -152,7 +152,7 @@ public interface Queue extends Bindable {
|
||||
|
||||
int deleteAllReferences() throws Exception;
|
||||
|
||||
int deleteAllReferences(final int flushLimit) throws Exception;
|
||||
int deleteAllReferences(int flushLimit) throws Exception;
|
||||
|
||||
boolean deleteReference(long messageID) throws Exception;
|
||||
|
||||
@ -175,7 +175,7 @@ public interface Queue extends Bindable {
|
||||
|
||||
int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;
|
||||
|
||||
void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception;
|
||||
void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception;
|
||||
|
||||
boolean changeReferencePriority(long messageID, byte newPriority) throws Exception;
|
||||
|
||||
@ -187,7 +187,7 @@ public interface Queue extends Bindable {
|
||||
|
||||
int moveReferences(Filter filter, SimpleString toAddress) throws Exception;
|
||||
|
||||
int moveReferences(final int flushLimit,
|
||||
int moveReferences(int flushLimit,
|
||||
Filter filter,
|
||||
SimpleString toAddress,
|
||||
boolean rejectDuplicates) throws Exception;
|
||||
|
@ -29,14 +29,14 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
*/
|
||||
public interface QueueFactory {
|
||||
|
||||
Queue createQueueWith(final QueueConfig config) throws Exception;
|
||||
Queue createQueueWith(QueueConfig config) throws Exception;
|
||||
|
||||
/**
|
||||
* @deprecated Replaced by {@link #createQueueWith}
|
||||
*/
|
||||
@Deprecated
|
||||
Queue createQueue(long persistenceID,
|
||||
final SimpleString address,
|
||||
SimpleString address,
|
||||
SimpleString name,
|
||||
Filter filter,
|
||||
PageSubscription pageSubscription,
|
||||
|
@ -23,7 +23,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
|
||||
|
||||
public interface ScheduledDeliveryHandler {
|
||||
|
||||
boolean checkAndSchedule(MessageReference ref, final boolean tail);
|
||||
boolean checkAndSchedule(MessageReference ref, boolean tail);
|
||||
|
||||
int getScheduledCount();
|
||||
|
||||
|
@ -92,9 +92,9 @@ public interface ServerConsumer extends Consumer {
|
||||
|
||||
void individualAcknowledge(Transaction tx, long messageID) throws Exception;
|
||||
|
||||
void reject(final long messageID) throws Exception;
|
||||
void reject(long messageID) throws Exception;
|
||||
|
||||
void individualCancel(final long messageID, boolean failed) throws Exception;
|
||||
void individualCancel(long messageID, boolean failed) throws Exception;
|
||||
|
||||
void forceDelivery(long sequence);
|
||||
|
||||
|
@ -60,7 +60,7 @@ public interface ServerSession extends SecurityAuth {
|
||||
|
||||
void individualAcknowledge(long consumerID, long messageID) throws Exception;
|
||||
|
||||
void individualCancel(final long consumerID, final long messageID, boolean failed) throws Exception;
|
||||
void individualCancel(long consumerID, long messageID, boolean failed) throws Exception;
|
||||
|
||||
void expire(long consumerID, long messageID) throws Exception;
|
||||
|
||||
@ -149,13 +149,13 @@ public interface ServerSession extends SecurityAuth {
|
||||
boolean durable,
|
||||
boolean autoCreated) throws Exception;
|
||||
|
||||
AddressInfo createAddress(final SimpleString address,
|
||||
AddressInfo createAddress(SimpleString address,
|
||||
Set<RoutingType> routingTypes,
|
||||
final boolean autoCreated) throws Exception;
|
||||
boolean autoCreated) throws Exception;
|
||||
|
||||
AddressInfo createAddress(final SimpleString address,
|
||||
AddressInfo createAddress(SimpleString address,
|
||||
RoutingType routingType,
|
||||
final boolean autoCreated) throws Exception;
|
||||
boolean autoCreated) throws Exception;
|
||||
|
||||
void deleteQueue(SimpleString name) throws Exception;
|
||||
|
||||
@ -164,12 +164,12 @@ public interface ServerSession extends SecurityAuth {
|
||||
SimpleString filterString,
|
||||
boolean browseOnly) throws Exception;
|
||||
|
||||
ServerConsumer createConsumer(final long consumerID,
|
||||
final SimpleString queueName,
|
||||
final SimpleString filterString,
|
||||
final boolean browseOnly,
|
||||
final boolean supportLargeMessage,
|
||||
final Integer credits) throws Exception;
|
||||
ServerConsumer createConsumer(long consumerID,
|
||||
SimpleString queueName,
|
||||
SimpleString filterString,
|
||||
boolean browseOnly,
|
||||
boolean supportLargeMessage,
|
||||
Integer credits) throws Exception;
|
||||
|
||||
QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
|
||||
|
||||
@ -186,11 +186,11 @@ public interface ServerSession extends SecurityAuth {
|
||||
boolean direct,
|
||||
boolean noAutoCreateQueue) throws Exception;
|
||||
|
||||
RoutingStatus doSend(final Transaction tx,
|
||||
final Message msg,
|
||||
final SimpleString originalAddress,
|
||||
final boolean direct,
|
||||
final boolean noAutoCreateQueue) throws Exception;
|
||||
RoutingStatus doSend(Transaction tx,
|
||||
Message msg,
|
||||
SimpleString originalAddress,
|
||||
boolean direct,
|
||||
boolean noAutoCreateQueue) throws Exception;
|
||||
|
||||
RoutingStatus send(Message message, boolean direct, boolean noAutoCreateQueue) throws Exception;
|
||||
|
||||
@ -239,7 +239,7 @@ public interface ServerSession extends SecurityAuth {
|
||||
|
||||
void createSharedQueue(SimpleString address,
|
||||
SimpleString name,
|
||||
final RoutingType routingType,
|
||||
RoutingType routingType,
|
||||
boolean durable,
|
||||
SimpleString filterString) throws Exception;
|
||||
|
||||
|
@ -31,9 +31,9 @@ public abstract class Vote<T> {
|
||||
return map;
|
||||
}
|
||||
|
||||
public abstract void encode(final ActiveMQBuffer buff);
|
||||
public abstract void encode(ActiveMQBuffer buff);
|
||||
|
||||
public abstract void decode(final ActiveMQBuffer buff);
|
||||
public abstract void decode(ActiveMQBuffer buff);
|
||||
|
||||
//whether or note we should ask the target server for an answer or decide ourselves, for instance if we couldn't
|
||||
//connect to the node in the first place.
|
||||
|
@ -157,7 +157,8 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract {
|
||||
if (response != null) {
|
||||
break;
|
||||
}
|
||||
} while (timeLimit > System.currentTimeMillis());
|
||||
}
|
||||
while (timeLimit > System.currentTimeMillis());
|
||||
} finally {
|
||||
if (notification != null) {
|
||||
pendingNotifications.remove(notification);
|
||||
|
@ -139,7 +139,8 @@ public class FileLockNodeManager extends NodeManager {
|
||||
logger.debug("acquired live node lock state = " + (char) state);
|
||||
break;
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
while (true);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -317,7 +318,8 @@ public class FileLockNodeManager extends NodeManager {
|
||||
interrupted = false;
|
||||
throw new IOException("Lock was interrupted");
|
||||
}
|
||||
} while (lock == null);
|
||||
}
|
||||
while (lock == null);
|
||||
return lock;
|
||||
}
|
||||
|
||||
|
@ -84,7 +84,8 @@ public final class InVMNodeManager extends NodeManager {
|
||||
} else if (state == LIVE) {
|
||||
break;
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
while (true);
|
||||
if (failoverPause > 0L) {
|
||||
Thread.sleep(failoverPause);
|
||||
}
|
||||
|
@ -845,7 +845,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||
ref.acknowledge(tx);
|
||||
|
||||
acks++;
|
||||
} while (ref.getMessage().getMessageID() != messageID);
|
||||
}
|
||||
while (ref.getMessage().getMessageID() != messageID);
|
||||
|
||||
if (startedTransaction) {
|
||||
tx.commit();
|
||||
|
@ -293,7 +293,8 @@ public final class SharedNothingBackupActivation extends Activation {
|
||||
replicationEndpoint.getChannel().close();
|
||||
replicationEndpoint.setChannel(null);
|
||||
}
|
||||
} while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);
|
||||
}
|
||||
while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Activation loop finished, current signal = " + signal);
|
||||
|
@ -68,29 +68,29 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
|
||||
|
||||
void setStorageManager(StorageManager storageManager);
|
||||
|
||||
ActiveMQServerControlImpl registerServer(final PostOffice postOffice,
|
||||
final SecurityStore securityStore,
|
||||
final StorageManager storageManager,
|
||||
final Configuration configuration,
|
||||
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
|
||||
final HierarchicalRepository<Set<Role>> securityRepository,
|
||||
final ResourceManager resourceManager,
|
||||
final RemotingService remotingService,
|
||||
final ActiveMQServer messagingServer,
|
||||
final QueueFactory queueFactory,
|
||||
final ScheduledExecutorService scheduledThreadPool,
|
||||
final PagingManager pagingManager,
|
||||
final boolean backup) throws Exception;
|
||||
ActiveMQServerControlImpl registerServer(PostOffice postOffice,
|
||||
SecurityStore securityStore,
|
||||
StorageManager storageManager,
|
||||
Configuration configuration,
|
||||
HierarchicalRepository<AddressSettings> addressSettingsRepository,
|
||||
HierarchicalRepository<Set<Role>> securityRepository,
|
||||
ResourceManager resourceManager,
|
||||
RemotingService remotingService,
|
||||
ActiveMQServer messagingServer,
|
||||
QueueFactory queueFactory,
|
||||
ScheduledExecutorService scheduledThreadPool,
|
||||
PagingManager pagingManager,
|
||||
boolean backup) throws Exception;
|
||||
|
||||
void unregisterServer() throws Exception;
|
||||
|
||||
void registerInJMX(ObjectName objectName, Object managedResource) throws Exception;
|
||||
|
||||
void unregisterFromJMX(final ObjectName objectName) throws Exception;
|
||||
void unregisterFromJMX(ObjectName objectName) throws Exception;
|
||||
|
||||
void registerInRegistry(String resourceName, Object managedResource);
|
||||
|
||||
void unregisterFromRegistry(final String resourceName);
|
||||
void unregisterFromRegistry(String resourceName);
|
||||
|
||||
void registerAddress(AddressInfo addressInfo) throws Exception;
|
||||
|
||||
|
@ -44,7 +44,7 @@ public interface ProtocolManager<P extends BaseInterceptor> {
|
||||
|
||||
ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection);
|
||||
|
||||
void removeHandler(final String name);
|
||||
void removeHandler(String name);
|
||||
|
||||
void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer);
|
||||
|
||||
|
@ -154,7 +154,7 @@ public abstract class CertificateLoginModule extends PropertiesLoader implements
|
||||
* @param certs The distinguished name.
|
||||
* @return The unique name if the certificate is recognized, null otherwise.
|
||||
*/
|
||||
protected abstract String getUserNameForCertificates(final X509Certificate[] certs) throws LoginException;
|
||||
protected abstract String getUserNameForCertificates(X509Certificate[] certs) throws LoginException;
|
||||
|
||||
/**
|
||||
* Should return a set of the roles this user belongs to. The roles
|
||||
@ -164,7 +164,7 @@ public abstract class CertificateLoginModule extends PropertiesLoader implements
|
||||
* getUserNameForDn returned for the user's DN.
|
||||
* @return A Set of the names of the roles this user belongs to.
|
||||
*/
|
||||
protected abstract Set<String> getUserRoles(final String username) throws LoginException;
|
||||
protected abstract Set<String> getUserRoles(String username) throws LoginException;
|
||||
|
||||
protected String getDistinguishedName(final X509Certificate[] certs) {
|
||||
if (certs != null && certs.length > 0 && certs[0] != null) {
|
||||
|
@ -606,7 +606,8 @@ public abstract class ActiveMQTestBase extends Assert {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} while (i++ <= 200 && hasValue);
|
||||
}
|
||||
while (i++ <= 200 && hasValue);
|
||||
|
||||
for (WeakReference<?> ref : references) {
|
||||
Assert.assertNull(ref.get());
|
||||
@ -1144,7 +1145,8 @@ public abstract class ActiveMQTestBase extends Assert {
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
} while (System.currentTimeMillis() - start < timeout);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < timeout);
|
||||
|
||||
String msg = "Timed out waiting for cluster topology of live=" + liveNodes + ",backup=" + backupNodes +
|
||||
" (received live=" + liveNodesCount + ", backup=" + backupNodesCount +
|
||||
@ -1175,7 +1177,8 @@ public abstract class ActiveMQTestBase extends Assert {
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
} while (System.currentTimeMillis() - start < timeout);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < timeout);
|
||||
|
||||
String msg = "Timed out waiting for cluster topology of " + nodes +
|
||||
" (received " +
|
||||
@ -1878,7 +1881,8 @@ public abstract class ActiveMQTestBase extends Assert {
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
} while (System.currentTimeMillis() - start < timeout);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < timeout);
|
||||
|
||||
String msg = "Timed out waiting for bindings (bindingCount = " + bindingCount +
|
||||
" (expecting " +
|
||||
|
@ -19,9 +19,9 @@ package org.apache.activemq.artemis.service.extensions.xa.recovery;
|
||||
|
||||
public interface ActiveMQRegistry {
|
||||
|
||||
void register(final XARecoveryConfig resourceConfig);
|
||||
void register(XARecoveryConfig resourceConfig);
|
||||
|
||||
void unRegister(final XARecoveryConfig resourceConfig);
|
||||
void unRegister(XARecoveryConfig resourceConfig);
|
||||
|
||||
void stop();
|
||||
|
||||
|
@ -46,9 +46,7 @@ under the License.
|
||||
|
||||
<!-- Modifier Checks -->
|
||||
<module name="ModifierOrder"/>
|
||||
<module name="RedundantModifier">
|
||||
<property name="tokens" value="ANNOTATION_FIELD_DEF, INTERFACE_DEF, CLASS_DEF, ENUM_DEF"/>
|
||||
</module>
|
||||
<module name="RedundantModifier"/>
|
||||
|
||||
<!-- Checks for common coding problems -->
|
||||
<module name="EmptyStatement"/>
|
||||
@ -73,7 +71,7 @@ under the License.
|
||||
<module name="RightCurly"/>
|
||||
<module name="RightCurly">
|
||||
<property name="option" value="alone"/>
|
||||
<property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, INSTANCE_INIT"/>
|
||||
<property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT"/>
|
||||
</module>
|
||||
<!-- Checks that there is no whitespace after certain tokens; e.g. "." and "!". -->
|
||||
<module name="NoWhitespaceAfter"/>
|
||||
|
@ -571,7 +571,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||
buffer.put(duplicate);
|
||||
protonTransport.processInput();
|
||||
source.position(source.position() + limit);
|
||||
} while (source.hasRemaining());
|
||||
}
|
||||
while (source.hasRemaining());
|
||||
|
||||
ReferenceCountUtil.release(incoming);
|
||||
|
||||
|
@ -894,7 +894,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (incoming != null);
|
||||
}
|
||||
while (incoming != null);
|
||||
|
||||
super.processDeliveryUpdates(connection, delivery);
|
||||
}
|
||||
|
@ -380,7 +380,8 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
|
||||
serverSessions.add(session);
|
||||
}
|
||||
}
|
||||
} while (running.get() && serverSessions.size() != numberOfSessions && timeout > System.currentTimeMillis());
|
||||
}
|
||||
while (running.get() && serverSessions.size() != numberOfSessions && timeout > System.currentTimeMillis());
|
||||
|
||||
System.err.println("Returning " + serverSessions.size() + " sessions");
|
||||
return serverSessions;
|
||||
|
@ -1354,7 +1354,8 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
|
||||
foundB = consB.getBufferSize() == numberOfMessages / 2;
|
||||
|
||||
Thread.sleep(10);
|
||||
} while ((!foundA || !foundB) && System.currentTimeMillis() < timeout);
|
||||
}
|
||||
while ((!foundA || !foundB) && System.currentTimeMillis() < timeout);
|
||||
|
||||
Assert.assertTrue("ConsumerA didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() +
|
||||
", consB=" +
|
||||
|
@ -306,7 +306,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
|
||||
m.acknowledge();
|
||||
messagesConsumed++;
|
||||
}
|
||||
} while (m != null);
|
||||
}
|
||||
while (m != null);
|
||||
|
||||
assertEquals(messagesProduced.longValue(), messagesConsumed);
|
||||
}
|
||||
|
@ -710,7 +710,8 @@ public class BridgeReconnectTest extends BridgeTestBase {
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
} while (System.currentTimeMillis() - start < 50000);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < 50000);
|
||||
|
||||
throw new IllegalStateException("Failed to get forwarding connection");
|
||||
}
|
||||
|
@ -279,7 +279,8 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(10);
|
||||
} while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
|
||||
if (!exists) {
|
||||
String msg = "Timed out waiting for cluster topology of " + Arrays.toString(nodes) +
|
||||
" (received " +
|
||||
@ -367,7 +368,8 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
} while (System.currentTimeMillis() - start < ActiveMQTestBase.WAIT_TIMEOUT);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < ActiveMQTestBase.WAIT_TIMEOUT);
|
||||
|
||||
throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount +
|
||||
", expecting = " +
|
||||
@ -1042,7 +1044,8 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
||||
} else {
|
||||
log.info("check receive Consumer " + consumerID + " null message");
|
||||
}
|
||||
} while (message != null);
|
||||
}
|
||||
while (message != null);
|
||||
|
||||
}
|
||||
}
|
||||
@ -1221,7 +1224,8 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
||||
message.acknowledge();
|
||||
}
|
||||
}
|
||||
} while (message != null);
|
||||
}
|
||||
while (message != null);
|
||||
}
|
||||
|
||||
for (int messageCount : messageCounts) {
|
||||
@ -1313,7 +1317,8 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
||||
|
||||
ints.add(count);
|
||||
}
|
||||
} while (message != null);
|
||||
}
|
||||
while (message != null);
|
||||
|
||||
int[] res = new int[ints.size()];
|
||||
|
||||
|
@ -253,7 +253,8 @@ public class AsynchronousFailoverTest extends FailoverTestBase {
|
||||
} catch (ActiveMQException e) {
|
||||
fail("Invalid Exception type:" + e.getType());
|
||||
}
|
||||
} while (retry);
|
||||
}
|
||||
while (retry);
|
||||
}
|
||||
|
||||
// create the consumer with retry if failover occurs during createConsumer call
|
||||
@ -272,7 +273,8 @@ public class AsynchronousFailoverTest extends FailoverTestBase {
|
||||
} catch (ActiveMQException e) {
|
||||
fail("Invalid Exception type:" + e.getType());
|
||||
}
|
||||
} while (retry);
|
||||
}
|
||||
while (retry);
|
||||
|
||||
session.start();
|
||||
|
||||
@ -381,7 +383,8 @@ public class AsynchronousFailoverTest extends FailoverTestBase {
|
||||
log.info("#test Exception " + e, e);
|
||||
throw e;
|
||||
}
|
||||
} while (retry);
|
||||
}
|
||||
while (retry);
|
||||
|
||||
logAndSystemOut("#test Finished sending, starting consumption now");
|
||||
|
||||
@ -481,7 +484,8 @@ public class AsynchronousFailoverTest extends FailoverTestBase {
|
||||
logAndSystemOut(e.getMessage(), e);
|
||||
throw e;
|
||||
}
|
||||
} while (retry);
|
||||
}
|
||||
while (retry);
|
||||
} finally {
|
||||
if (session != null) {
|
||||
session.close();
|
||||
|
@ -34,7 +34,7 @@ public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase
|
||||
protected static final String QUEUES_TESTADDRESS = "queues.testaddress";
|
||||
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
||||
protected abstract void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception;
|
||||
protected abstract void setupCluster(MessageLoadBalancingType messageLoadBalancingType) throws Exception;
|
||||
|
||||
protected abstract void setupServers() throws Exception;
|
||||
|
||||
|
@ -271,7 +271,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
|
||||
|
||||
protected abstract TransportConfiguration getAcceptorTransportConfiguration(boolean live);
|
||||
|
||||
protected abstract TransportConfiguration getConnectorTransportConfiguration(final boolean live);
|
||||
protected abstract TransportConfiguration getConnectorTransportConfiguration(boolean live);
|
||||
|
||||
protected ServerLocatorInternal getServerLocator() throws Exception {
|
||||
return (ServerLocatorInternal) addServerLocator(ActiveMQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true), getConnectorTransportConfiguration(false))).setRetryInterval(50);
|
||||
|
@ -121,7 +121,8 @@ public abstract class GroupingFailoverTestBase extends ClusterTestBase {
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
} while (System.currentTimeMillis() - start < ActiveMQTestBase.WAIT_TIMEOUT);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < ActiveMQTestBase.WAIT_TIMEOUT);
|
||||
|
||||
throw new IllegalStateException("Timed out waiting for backup announce");
|
||||
}
|
||||
|
@ -239,7 +239,8 @@ public abstract class MultipleServerFailoverTestBase extends ActiveMQTestBase {
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
} while (System.currentTimeMillis() - start < timeout);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < timeout);
|
||||
|
||||
throw new Exception();
|
||||
}
|
||||
|
@ -136,7 +136,8 @@ public abstract class MultiThreadReattachSupportTestBase extends ActiveMQTestBas
|
||||
|
||||
runnable.checkFail();
|
||||
|
||||
} while (!failer.isExecuted());
|
||||
}
|
||||
while (!failer.isExecuted());
|
||||
|
||||
InVMConnector.resetFailures();
|
||||
|
||||
@ -189,7 +190,7 @@ public abstract class MultiThreadReattachSupportTestBase extends ActiveMQTestBas
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
|
||||
public abstract void run(ClientSessionFactory sf, int threadNum) throws Exception;
|
||||
}
|
||||
|
||||
private class Failer extends TimerTask {
|
||||
|
@ -215,7 +215,8 @@ public class RandomReattachTest extends ActiveMQTestBase {
|
||||
|
||||
do {
|
||||
runnable.run(sf);
|
||||
} while (!failer.isExecuted());
|
||||
}
|
||||
while (!failer.isExecuted());
|
||||
}
|
||||
}
|
||||
|
||||
@ -1290,7 +1291,7 @@ public class RandomReattachTest extends ActiveMQTestBase {
|
||||
|
||||
public abstract class RunnableT {
|
||||
|
||||
abstract void run(final ClientSessionFactory sf) throws Exception;
|
||||
abstract void run(ClientSessionFactory sf) throws Exception;
|
||||
}
|
||||
|
||||
abstract static class AssertionCheckMessageHandler implements MessageHandler {
|
||||
|
@ -135,7 +135,8 @@ public abstract class TopologyClusterTestBase extends ClusterTestBase {
|
||||
if (ok) {
|
||||
return;
|
||||
}
|
||||
} while (System.currentTimeMillis() - start < 5000);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < 5000);
|
||||
Assert.fail("did not contain all expected node ID: " + actual);
|
||||
}
|
||||
|
||||
@ -191,7 +192,8 @@ public abstract class TopologyClusterTestBase extends ClusterTestBase {
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
} while (System.currentTimeMillis() - start < ActiveMQTestBase.WAIT_TIMEOUT);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < ActiveMQTestBase.WAIT_TIMEOUT);
|
||||
|
||||
log.error(clusterDescription(servers[node]));
|
||||
Assert.assertEquals("Timed out waiting for cluster connections for server " + node, expected, nodesCount);
|
||||
|
@ -449,7 +449,8 @@ public class BindingsClusterTest extends JMSClusteredTestBase {
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
} while (System.currentTimeMillis() - start < 50000);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < 50000);
|
||||
|
||||
throw new IllegalStateException("Failed to get forwarding connection");
|
||||
}
|
||||
|
@ -419,7 +419,8 @@ public class JMSFailoverTest extends ActiveMQTestBase {
|
||||
} catch (JMSException e) {
|
||||
new Exception("Exception on receive message", e).printStackTrace();
|
||||
}
|
||||
} while (retryNrs < 10);
|
||||
}
|
||||
while (retryNrs < 10);
|
||||
|
||||
assertNotNull(message);
|
||||
|
||||
|
@ -331,7 +331,8 @@ public class JMSUtil {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(10);
|
||||
} while (System.currentTimeMillis() - start < timeToWait);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < timeToWait);
|
||||
if (!exists) {
|
||||
String msg = "Timed out waiting for cluster topology of " + backupServer +
|
||||
" (received " +
|
||||
|
@ -231,7 +231,8 @@ public class NotificationTest extends ActiveMQTestBase {
|
||||
ClientMessage message = null;
|
||||
do {
|
||||
message = notifConsumer.receive(500);
|
||||
} while (message != null);
|
||||
}
|
||||
while (message != null);
|
||||
}
|
||||
|
||||
protected static ClientMessage[] consumeMessages(final int expected,
|
||||
|
@ -159,7 +159,8 @@ public class SecurityNotificationTest extends ActiveMQTestBase {
|
||||
ClientMessage message = null;
|
||||
do {
|
||||
message = notifConsumer.receive(500);
|
||||
} while (message != null);
|
||||
}
|
||||
while (message != null);
|
||||
}
|
||||
|
||||
protected static ClientMessage[] consumeMessages(final int expected,
|
||||
|
@ -465,7 +465,8 @@ public class MQTTTest extends MQTTTestSupport {
|
||||
assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
|
||||
msg.ack();
|
||||
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
|
||||
} while (msg != null);
|
||||
}
|
||||
while (msg != null);
|
||||
|
||||
// test non-retained message
|
||||
for (String topic : topics) {
|
||||
@ -477,7 +478,8 @@ public class MQTTTest extends MQTTTestSupport {
|
||||
assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
|
||||
msg.ack();
|
||||
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
|
||||
} while (msg != null);
|
||||
}
|
||||
while (msg != null);
|
||||
|
||||
connection.unsubscribe(new String[]{wildcard});
|
||||
connection.disconnect();
|
||||
@ -769,7 +771,8 @@ public class MQTTTest extends MQTTTestSupport {
|
||||
waitCount++;
|
||||
}
|
||||
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
|
||||
} while (msg != null && received++ < subs.length * 2);
|
||||
}
|
||||
while (msg != null && received++ < subs.length * 2);
|
||||
assertEquals("Unexpected number of messages", subs.length * 2, received + 1);
|
||||
|
||||
// make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for
|
||||
@ -928,7 +931,8 @@ public class MQTTTest extends MQTTTestSupport {
|
||||
assertEquals(TOPIC, new String(msg.getPayload()));
|
||||
msg.ack();
|
||||
}
|
||||
} while (msg != null);
|
||||
}
|
||||
while (msg != null);
|
||||
|
||||
// make sure we received all message ids
|
||||
for (short id = 1; id <= TOTAL_MESSAGES; id++) {
|
||||
|
@ -183,7 +183,8 @@ public class ExpiryRunnerTest extends ActiveMQTestBase {
|
||||
m.setExpiration(expiration);
|
||||
producer.send(m);
|
||||
Thread.sleep(100);
|
||||
} while (System.currentTimeMillis() < sendMessagesUntil);
|
||||
}
|
||||
while (System.currentTimeMillis() < sendMessagesUntil);
|
||||
Assert.assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
|
||||
consumer.close();
|
||||
|
||||
@ -197,7 +198,8 @@ public class ExpiryRunnerTest extends ActiveMQTestBase {
|
||||
cm.acknowledge();
|
||||
Assert.assertFalse(dummyMessageHandler.payloads.contains(text));
|
||||
dummyMessageHandler.payloads.add(text);
|
||||
} while (true);
|
||||
}
|
||||
while (true);
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
if (dummyMessageHandler.payloads.isEmpty()) {
|
||||
|
@ -144,27 +144,27 @@ public interface Server extends Remote {
|
||||
boolean supportsLoadBalancing,
|
||||
int dupsOkBatchSize,
|
||||
boolean blockOnAcknowledge,
|
||||
final String... jndiBindings) throws Exception;
|
||||
String... jndiBindings) throws Exception;
|
||||
|
||||
void deployConnectionFactory(String objectName,
|
||||
int prefetchSize,
|
||||
int defaultTempQueueFullSize,
|
||||
int defaultTempQueuePageSize,
|
||||
int defaultTempQueueDownCacheSize,
|
||||
final String... jndiBindings) throws Exception;
|
||||
String... jndiBindings) throws Exception;
|
||||
|
||||
void deployConnectionFactory(String objectName,
|
||||
boolean supportsFailover,
|
||||
boolean supportsLoadBalancing,
|
||||
final String... jndiBindings) throws Exception;
|
||||
String... jndiBindings) throws Exception;
|
||||
|
||||
void deployConnectionFactory(String clientID, String objectName, final String... jndiBindings) throws Exception;
|
||||
void deployConnectionFactory(String clientID, String objectName, String... jndiBindings) throws Exception;
|
||||
|
||||
void deployConnectionFactory(String objectName, int prefetchSize, final String... jndiBindings) throws Exception;
|
||||
void deployConnectionFactory(String objectName, int prefetchSize, String... jndiBindings) throws Exception;
|
||||
|
||||
void deployConnectionFactory(String objectName, final String... jndiBindings) throws Exception;
|
||||
void deployConnectionFactory(String objectName, String... jndiBindings) throws Exception;
|
||||
|
||||
void deployConnectionFactory(String objectName, JMSFactoryType type, final String... jndiBindings) throws Exception;
|
||||
void deployConnectionFactory(String objectName, JMSFactoryType type, String... jndiBindings) throws Exception;
|
||||
|
||||
void undeployConnectionFactory(String objectName) throws Exception;
|
||||
|
||||
|
@ -91,7 +91,8 @@ public class PageStressTest extends ActiveMQTestBase {
|
||||
System.out.println("Received " + msgs);
|
||||
}
|
||||
}
|
||||
} while (msg != null);
|
||||
}
|
||||
while (msg != null);
|
||||
|
||||
session.commit();
|
||||
|
||||
@ -122,7 +123,8 @@ public class PageStressTest extends ActiveMQTestBase {
|
||||
System.out.println("Received " + msgs);
|
||||
}
|
||||
}
|
||||
} while (msg != null);
|
||||
}
|
||||
while (msg != null);
|
||||
|
||||
System.out.println("msgs second time: " + msgs);
|
||||
|
||||
@ -208,7 +210,8 @@ public class PageStressTest extends ActiveMQTestBase {
|
||||
|
||||
}
|
||||
}
|
||||
} while (msg != null);
|
||||
}
|
||||
while (msg != null);
|
||||
|
||||
session.commit();
|
||||
|
||||
|
@ -221,7 +221,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
|
||||
*/
|
||||
public interface ListenerHoldCallback {
|
||||
|
||||
void callbackAdded(final ByteBuffer bytes);
|
||||
void callbackAdded(ByteBuffer bytes);
|
||||
}
|
||||
|
||||
private class CallbackRunnable implements Runnable {
|
||||
|
Loading…
x
Reference in New Issue
Block a user