NO-JIRA reformat AMQPMirrorControllerTarget

This commit is contained in:
Clebert Suconic 2022-06-30 14:52:52 -04:00 committed by clebertsuconic
parent d90179b99c
commit da38fcce71
1 changed files with 37 additions and 56 deletions

View File

@ -77,26 +77,29 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
private static final Logger logger = Logger.getLogger(AMQPMirrorControllerTarget.class);
private static ThreadLocal<MirrorController> controllerThreadLocal = new ThreadLocal<>();
private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
public static void setControllerInUse(MirrorController controller) {
controllerThreadLocal.set(controller);
CONTROLLER_THREAD_LOCAL.set(controller);
}
public static MirrorController getControllerInUse() {
return controllerThreadLocal.get();
return CONTROLLER_THREAD_LOCAL.get();
}
/** Objects of this class can be used by either transaction or by OperationContext.
* It is important that when you're using the transactions you clear any references to
* the operation context. Don't use transaction and OperationContext at the same time
* as that would generate duplicates on the objects cache.
/**
* Objects of this class can be used by either transaction or by OperationContext.
* It is important that when you're using the transactions you clear any references to
* the operation context. Don't use transaction and OperationContext at the same time
* as that would generate duplicates on the objects cache.
*/
class ACKMessageOperation implements IOCallback, Runnable {
Delivery delivery;
/** notice that when you use the Transaction, you need to make sure you don't use the IO*/
/**
* notice that when you use the Transaction, you need to make sure you don't use the IO
*/
public TransactionOperationAbstract tx = new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
@ -135,7 +138,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
@Override
public void onError(int errorCode, String errorMessage) {
logger.warn(errorMessage + "-" + errorMessage);
logger.warn(errorCode + "-" + errorMessage);
}
}
@ -181,10 +184,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
@Override
protected void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx) {
recoverContext();
incrementSettle();
if (logger.isTraceEnabled()) {
logger.trace(server + "::actualdelivery call for " + message);
}
@ -204,29 +205,20 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
if (eventType != null) {
if (eventType.equals(ADD_ADDRESS)) {
AddressInfo addressInfo = parseAddress(message);
if (logger.isDebugEnabled()) {
logger.debug(server + " Adding Address " + addressInfo);
}
addAddress(addressInfo);
} else if (eventType.equals(DELETE_ADDRESS)) {
AddressInfo addressInfo = parseAddress(message);
if (logger.isDebugEnabled()) {
logger.debug(server + " Removing Address " + addressInfo);
}
deleteAddress(addressInfo);
} else if (eventType.equals(CREATE_QUEUE)) {
QueueConfiguration queueConfiguration = parseQueue(message);
if (logger.isDebugEnabled()) {
logger.debug(server + " Creating queue " + queueConfiguration);
}
createQueue(queueConfiguration);
} else if (eventType.equals(DELETE_QUEUE)) {
String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS);
String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, QUEUE);
if (logger.isDebugEnabled()) {
logger.debug(server + " Deleting queue " + queueName + " on address " + address);
}
deleteQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName));
} else if (eventType.equals(POST_ACK)) {
String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, BROKER_ID);
@ -239,17 +231,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, QUEUE);
AmqpValue value = (AmqpValue) message.getBody();
Long messageID = (Long) value.getValue();
if (logger.isDebugEnabled()) {
logger.debug(server + " Post ack queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
}
if (postAcknowledge(queueName, nodeID, messageID, messageAckOperation, ackReason)) {
messageAckOperation = null;
}
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(server + " Sending message " + message);
}
if (sendMessage(message, messageAckOperation)) {
// since the send was successful, we give up the reference here,
// so there won't be any call on afterCompleteOperations
@ -269,7 +255,6 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
@Override
public void initialize() throws Exception {
super.initialize();
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
// Match the settlement mode of the remote instead of relying on the default of MIXED.
receiver.setSenderSettleMode(receiver.getRemoteSenderSettleMode());
@ -279,24 +264,22 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
flow();
}
private QueueConfiguration parseQueue(AMQPMessage message) throws Exception {
AmqpValue bodyvalue = (AmqpValue) message.getBody();
String body = (String) bodyvalue.getValue();
QueueConfiguration queueConfiguration = QueueConfiguration.fromJSON(body);
return queueConfiguration;
private QueueConfiguration parseQueue(AMQPMessage message) {
AmqpValue bodyValue = (AmqpValue) message.getBody();
String body = (String) bodyValue.getValue();
return QueueConfiguration.fromJSON(body);
}
private AddressInfo parseAddress(AMQPMessage message) throws Exception {
AmqpValue bodyvalue = (AmqpValue) message.getBody();
String body = (String) bodyvalue.getValue();
AddressInfo addressInfo = AddressInfo.fromJSON(body);
return addressInfo;
private AddressInfo parseAddress(AMQPMessage message) {
AmqpValue bodyValue = (AmqpValue) message.getBody();
String body = (String) bodyValue.getValue();
return AddressInfo.fromJSON(body);
}
@Override
public void addAddress(AddressInfo addressInfo) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug(server + " Adding address " + addressInfo);
logger.debug(server + " adding address " + addressInfo);
}
server.addAddressInfo(addressInfo);
}
@ -319,12 +302,12 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
@Override
public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug(server + " Adding queue " + queueConfiguration);
logger.debug(server + " adding queue " + queueConfiguration);
}
try {
server.createQueue(queueConfiguration, true);
} catch (Exception ignored) {
logger.debug("Queue could not be created, already existed " + queueConfiguration, ignored);
} catch (Exception e) {
logger.debug("Queue could not be created, already existed " + queueConfiguration, e);
}
}
@ -334,13 +317,17 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
logger.debug(server + " destroy queue " + queueName + " on address = " + addressName + " server " + server.getIdentity());
}
try {
server.destroyQueue(queueName,null, false, true, false, false);
server.destroyQueue(queueName, null, false, true, false, false);
} catch (ActiveMQNonExistentQueueException expected) {
logger.debug(server + " queue " + queueName + " was previously removed", expected);
}
}
public boolean postAcknowledge(String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception {
public boolean postAcknowledge(String queue,
String nodeID,
long messageID,
ACKMessageOperation ackMessage,
AckReason reason) throws Exception {
final Queue targetQueue = server.locateQueue(queue);
if (targetQueue == null) {
@ -447,13 +434,10 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
internalID = internalIDLong;
}
if (logger.isTraceEnabled()) {
logger.trace("sendMessage on server " + server + " for message " + message +
" with internalID = " + internalIDLong + " mirror id " + internalMirrorID);
logger.trace("sendMessage on server " + server + " for message " + message + " with internalID = " + internalIDLong + " mirror id " + internalMirrorID);
}
routingContext.setDuplicateDetection(false); // we do our own duplicate detection here
DuplicateIDCache duplicateIDCache;
@ -499,16 +483,14 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
return true;
}
/**
* @param ref
* @param reason
*/
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {
// Do nothing
}
@Override
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
// Do nothing
}
class PageAck implements ToIntFunction<PagedReference>, BooleanSupplier, Runnable {
@ -527,7 +509,6 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
/**
* Method to retry the ack before a scan
* @return
*/
@Override
public boolean getAsBoolean() {